You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:43 UTC
[incubator-pekko-samples] branch wip-chbatey-latest-cass-version created (now c260dd5)
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a change to branch wip-chbatey-latest-cass-version
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
at c260dd5 Actually commit them
This branch includes the following new commits:
new c260dd5 Actually commit them
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org
[incubator-pekko-samples] 01/01: Actually commit them
Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-chbatey-latest-cass-version
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit c260dd5b864f82d634d026daf939873a312e034f
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Thu Jan 9 09:51:54 2020 +0000
Actually commit them
---
akka-sample-cqrs-scala/build.sbt | 2 +-
akka-sample-cqrs-scala/project/build.properties | 2 +-
.../src/main/resources/application.conf | 6 ++++++
.../scala/sample/cqrs/CassandraSessionExtension.scala | 9 +++------
.../src/main/scala/sample/cqrs/EventProcessor.scala | 15 +++++++++++----
.../src/main/scala/sample/cqrs/ShoppingCartRoutes.scala | 7 +++----
6 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 5c535ee..3298508 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,5 +1,5 @@
val AkkaVersion = "2.6.1"
-val AkkaPersistenceCassandraVersion = "0.100"
+val AkkaPersistenceCassandraVersion = "1.0-SNAPSHOT"
val AkkaHttpVersion = "10.1.10"
lazy val `akka-sample-cqrs-scala` = project
diff --git a/akka-sample-cqrs-scala/project/build.properties b/akka-sample-cqrs-scala/project/build.properties
index 6adcdc7..00b48d9 100644
--- a/akka-sample-cqrs-scala/project/build.properties
+++ b/akka-sample-cqrs-scala/project/build.properties
@@ -1 +1 @@
-sbt.version=1.3.3
+sbt.version=1.3.6
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 5b51db2..dcbc0f9 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -36,11 +36,17 @@ akka {
cassandra-journal {
events-by-tag {
bucket-size = "Day"
+ flush-interval = 10ms
}
+ pubsub-notification = on
}
cassandra-query-journal {
first-time-bucket = "20191023T00:00"
+ events-by-tag {
+ eventual-consistency-delay = 150ms
+ }
+// refresh-interval = 200ms
}
event-processor {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
index b5bbfe4..e270fe3 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
@@ -1,16 +1,14 @@
package sample.cqrs
import scala.concurrent.Future
-
import akka.Done
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.scaladsl.adapter._
+import akka.cassandra.session.DefaultSessionProvider
+import akka.cassandra.session.scaladsl.CassandraSession
import akka.event.Logging
-import akka.persistence.cassandra.ConfigSessionProvider
-import akka.persistence.cassandra.session.CassandraSessionSettings
-import akka.persistence.cassandra.session.scaladsl.CassandraSession
object CassandraSessionExtension extends ExtensionId[CassandraSessionExtension] {
@@ -27,8 +25,7 @@ class CassandraSessionExtension(system: ActorSystem[_]) extends Extension {
val sessionConfig = system.settings.config.getConfig("cassandra-journal")
new CassandraSession(
system.toClassic,
- new ConfigSessionProvider(system.toClassic, sessionConfig),
- CassandraSessionSettings(sessionConfig),
+ new DefaultSessionProvider(system.toClassic, sessionConfig),
system.executionContext,
Logging(system.toClassic, getClass),
metricsCategory = "sample",
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
index ec420d1..dc1a560 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
@@ -4,7 +4,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.ClassTag
-
import akka.Done
import akka.NotUsed
import akka.actor.typed.ActorSystem
@@ -26,8 +25,9 @@ import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
-import com.datastax.driver.core.PreparedStatement
-import com.datastax.driver.core.Row
+import com.datastax.oss.driver.api.core.cql.PreparedStatement
+import com.datastax.oss.driver.api.core.cql.Row
+import com.datastax.oss.driver.api.core.uuid.Uuids
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -110,6 +110,13 @@ abstract class EventProcessorStream[Event: ClassTag](
query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
eventEnvelope.event match {
case event: Event =>
+ eventEnvelope.offset match {
+ case TimeBasedUUID(offset) =>
+ // these times are from different nodes so don't rely on this being accurate
+ val eventDelay = System.currentTimeMillis() - Uuids.unixTimestamp(offset)
+ log.info("Event eventual consistency {}", eventDelay.millis)
+ case _ =>
+ }
processEvent(event, PersistenceId.ofUniqueId(eventEnvelope.persistenceId), eventEnvelope.sequenceNr).map(_ =>
eventEnvelope.offset)
case other =>
@@ -130,7 +137,7 @@ abstract class EventProcessorStream[Event: ClassTag](
private def extractOffset(maybeRow: Option[Row]): Offset = {
maybeRow match {
case Some(row) =>
- val uuid = row.getUUID("timeUuidOffset")
+ val uuid = row.getUuid("timeUuidOffset")
if (uuid == null) {
NoOffset
} else {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
index 7bf093d..6e03597 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
@@ -1,7 +1,6 @@
package sample.cqrs
import scala.concurrent.Future
-import scala.concurrent.duration._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
@@ -39,7 +38,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
case ShoppingCart.Accepted(summary) =>
complete(StatusCodes.OK -> summary)
case ShoppingCart.Rejected(reason) =>
- complete(StatusCodes.BadRequest, reason)
+ complete(StatusCodes.BadRequest -> reason)
}
}
},
@@ -57,7 +56,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
case ShoppingCart.Accepted(summary) =>
complete(StatusCodes.OK -> summary)
case ShoppingCart.Rejected(reason) =>
- complete(StatusCodes.BadRequest, reason)
+ complete(StatusCodes.BadRequest -> reason)
}
}
},
@@ -76,7 +75,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
case ShoppingCart.Accepted(summary) =>
complete(StatusCodes.OK -> summary)
case ShoppingCart.Rejected(reason) =>
- complete(StatusCodes.BadRequest, reason)
+ complete(StatusCodes.BadRequest -> reason)
}
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org