You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:44 UTC

[incubator-pekko-samples] 01/01: Actually commit them

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-chbatey-latest-cass-version
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit c260dd5b864f82d634d026daf939873a312e034f
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Thu Jan 9 09:51:54 2020 +0000

    Actually commit them
---
 akka-sample-cqrs-scala/build.sbt                          |  2 +-
 akka-sample-cqrs-scala/project/build.properties           |  2 +-
 .../src/main/resources/application.conf                   |  6 ++++++
 .../scala/sample/cqrs/CassandraSessionExtension.scala     |  9 +++------
 .../src/main/scala/sample/cqrs/EventProcessor.scala       | 15 +++++++++++----
 .../src/main/scala/sample/cqrs/ShoppingCartRoutes.scala   |  7 +++----
 6 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 5c535ee..3298508 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,5 +1,5 @@
 val AkkaVersion = "2.6.1"
-val AkkaPersistenceCassandraVersion = "0.100"
+val AkkaPersistenceCassandraVersion = "1.0-SNAPSHOT"
 val AkkaHttpVersion = "10.1.10"
 
 lazy val `akka-sample-cqrs-scala` = project
diff --git a/akka-sample-cqrs-scala/project/build.properties b/akka-sample-cqrs-scala/project/build.properties
index 6adcdc7..00b48d9 100644
--- a/akka-sample-cqrs-scala/project/build.properties
+++ b/akka-sample-cqrs-scala/project/build.properties
@@ -1 +1 @@
-sbt.version=1.3.3
+sbt.version=1.3.6
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 5b51db2..dcbc0f9 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -36,11 +36,17 @@ akka {
 cassandra-journal {
   events-by-tag {
     bucket-size = "Day"
+    flush-interval = 10ms
   }
+  pubsub-notification = on
 }
 
 cassandra-query-journal {
   first-time-bucket = "20191023T00:00"
+  events-by-tag {
+    eventual-consistency-delay = 150ms
+  }
+//  refresh-interval = 200ms
 }
 
 event-processor {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
index b5bbfe4..e270fe3 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
@@ -1,16 +1,14 @@
 package sample.cqrs
 
 import scala.concurrent.Future
-
 import akka.Done
 import akka.actor.typed.ActorSystem
 import akka.actor.typed.Extension
 import akka.actor.typed.ExtensionId
 import akka.actor.typed.scaladsl.adapter._
+import akka.cassandra.session.DefaultSessionProvider
+import akka.cassandra.session.scaladsl.CassandraSession
 import akka.event.Logging
-import akka.persistence.cassandra.ConfigSessionProvider
-import akka.persistence.cassandra.session.CassandraSessionSettings
-import akka.persistence.cassandra.session.scaladsl.CassandraSession
 
 object CassandraSessionExtension extends ExtensionId[CassandraSessionExtension] {
 
@@ -27,8 +25,7 @@ class CassandraSessionExtension(system: ActorSystem[_]) extends Extension {
     val sessionConfig = system.settings.config.getConfig("cassandra-journal")
     new CassandraSession(
       system.toClassic,
-      new ConfigSessionProvider(system.toClassic, sessionConfig),
-      CassandraSessionSettings(sessionConfig),
+      new DefaultSessionProvider(system.toClassic, sessionConfig),
       system.executionContext,
       Logging(system.toClassic, getClass),
       metricsCategory = "sample",
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
index ec420d1..dc1a560 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
@@ -4,7 +4,6 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.reflect.ClassTag
-
 import akka.Done
 import akka.NotUsed
 import akka.actor.typed.ActorSystem
@@ -26,8 +25,9 @@ import akka.stream.SharedKillSwitch
 import akka.stream.scaladsl.RestartSource
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
-import com.datastax.driver.core.PreparedStatement
-import com.datastax.driver.core.Row
+import com.datastax.oss.driver.api.core.cql.PreparedStatement
+import com.datastax.oss.driver.api.core.cql.Row
+import com.datastax.oss.driver.api.core.uuid.Uuids
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -110,6 +110,13 @@ abstract class EventProcessorStream[Event: ClassTag](
     query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
       eventEnvelope.event match {
         case event: Event =>
+          eventEnvelope.offset match {
+            case TimeBasedUUID(offset) =>
+              // these times are from different nodes so don't rely on this being accurate
+              val eventDelay = System.currentTimeMillis() - Uuids.unixTimestamp(offset)
+              log.info("Event eventual consistency {}", eventDelay.millis)
+            case _ =>
+          }
           processEvent(event, PersistenceId.ofUniqueId(eventEnvelope.persistenceId), eventEnvelope.sequenceNr).map(_ =>
             eventEnvelope.offset)
         case other =>
@@ -130,7 +137,7 @@ abstract class EventProcessorStream[Event: ClassTag](
   private def extractOffset(maybeRow: Option[Row]): Offset = {
     maybeRow match {
       case Some(row) =>
-        val uuid = row.getUUID("timeUuidOffset")
+        val uuid = row.getUuid("timeUuidOffset")
         if (uuid == null) {
           NoOffset
         } else {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
index 7bf093d..6e03597 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
@@ -1,7 +1,6 @@
 package sample.cqrs
 
 import scala.concurrent.Future
-import scala.concurrent.duration._
 
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
@@ -39,7 +38,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
                 case ShoppingCart.Accepted(summary) =>
                   complete(StatusCodes.OK -> summary)
                 case ShoppingCart.Rejected(reason) =>
-                  complete(StatusCodes.BadRequest, reason)
+                  complete(StatusCodes.BadRequest -> reason)
               }
             }
           },
@@ -57,7 +56,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
                   case ShoppingCart.Accepted(summary) =>
                     complete(StatusCodes.OK -> summary)
                   case ShoppingCart.Rejected(reason) =>
-                    complete(StatusCodes.BadRequest, reason)
+                    complete(StatusCodes.BadRequest -> reason)
                 }
             }
           },
@@ -76,7 +75,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
                   case ShoppingCart.Accepted(summary) =>
                     complete(StatusCodes.OK -> summary)
                   case ShoppingCart.Rejected(reason) =>
-                    complete(StatusCodes.BadRequest, reason)
+                    complete(StatusCodes.BadRequest -> reason)
                 }
               }
             })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org