You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:43 UTC

[incubator-pekko-samples] branch wip-chbatey-latest-cass-version created (now c260dd5)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch wip-chbatey-latest-cass-version
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


      at c260dd5  Actually commit them

This branch includes the following new commits:

     new c260dd5  Actually commit them

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 01/01: Actually commit them

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-chbatey-latest-cass-version
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit c260dd5b864f82d634d026daf939873a312e034f
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Thu Jan 9 09:51:54 2020 +0000

    Actually commit them
---
 akka-sample-cqrs-scala/build.sbt                          |  2 +-
 akka-sample-cqrs-scala/project/build.properties           |  2 +-
 .../src/main/resources/application.conf                   |  6 ++++++
 .../scala/sample/cqrs/CassandraSessionExtension.scala     |  9 +++------
 .../src/main/scala/sample/cqrs/EventProcessor.scala       | 15 +++++++++++----
 .../src/main/scala/sample/cqrs/ShoppingCartRoutes.scala   |  7 +++----
 6 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 5c535ee..3298508 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,5 +1,5 @@
 val AkkaVersion = "2.6.1"
-val AkkaPersistenceCassandraVersion = "0.100"
+val AkkaPersistenceCassandraVersion = "1.0-SNAPSHOT"
 val AkkaHttpVersion = "10.1.10"
 
 lazy val `akka-sample-cqrs-scala` = project
diff --git a/akka-sample-cqrs-scala/project/build.properties b/akka-sample-cqrs-scala/project/build.properties
index 6adcdc7..00b48d9 100644
--- a/akka-sample-cqrs-scala/project/build.properties
+++ b/akka-sample-cqrs-scala/project/build.properties
@@ -1 +1 @@
-sbt.version=1.3.3
+sbt.version=1.3.6
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 5b51db2..dcbc0f9 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -36,11 +36,17 @@ akka {
 cassandra-journal {
   events-by-tag {
     bucket-size = "Day"
+    flush-interval = 10ms
   }
+  pubsub-notification = on
 }
 
 cassandra-query-journal {
   first-time-bucket = "20191023T00:00"
+  events-by-tag {
+    eventual-consistency-delay = 150ms
+  }
+//  refresh-interval = 200ms
 }
 
 event-processor {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
index b5bbfe4..e270fe3 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
@@ -1,16 +1,14 @@
 package sample.cqrs
 
 import scala.concurrent.Future
-
 import akka.Done
 import akka.actor.typed.ActorSystem
 import akka.actor.typed.Extension
 import akka.actor.typed.ExtensionId
 import akka.actor.typed.scaladsl.adapter._
+import akka.cassandra.session.DefaultSessionProvider
+import akka.cassandra.session.scaladsl.CassandraSession
 import akka.event.Logging
-import akka.persistence.cassandra.ConfigSessionProvider
-import akka.persistence.cassandra.session.CassandraSessionSettings
-import akka.persistence.cassandra.session.scaladsl.CassandraSession
 
 object CassandraSessionExtension extends ExtensionId[CassandraSessionExtension] {
 
@@ -27,8 +25,7 @@ class CassandraSessionExtension(system: ActorSystem[_]) extends Extension {
     val sessionConfig = system.settings.config.getConfig("cassandra-journal")
     new CassandraSession(
       system.toClassic,
-      new ConfigSessionProvider(system.toClassic, sessionConfig),
-      CassandraSessionSettings(sessionConfig),
+      new DefaultSessionProvider(system.toClassic, sessionConfig),
       system.executionContext,
       Logging(system.toClassic, getClass),
       metricsCategory = "sample",
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
index ec420d1..dc1a560 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
@@ -4,7 +4,6 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.reflect.ClassTag
-
 import akka.Done
 import akka.NotUsed
 import akka.actor.typed.ActorSystem
@@ -26,8 +25,9 @@ import akka.stream.SharedKillSwitch
 import akka.stream.scaladsl.RestartSource
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
-import com.datastax.driver.core.PreparedStatement
-import com.datastax.driver.core.Row
+import com.datastax.oss.driver.api.core.cql.PreparedStatement
+import com.datastax.oss.driver.api.core.cql.Row
+import com.datastax.oss.driver.api.core.uuid.Uuids
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -110,6 +110,13 @@ abstract class EventProcessorStream[Event: ClassTag](
     query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
       eventEnvelope.event match {
         case event: Event =>
+          eventEnvelope.offset match {
+            case TimeBasedUUID(offset) =>
+              // these times are from different nodes so don't rely on this being accurate
+              val eventDelay = System.currentTimeMillis() - Uuids.unixTimestamp(offset)
+              log.info("Event eventual consistency {}", eventDelay.millis)
+            case _ =>
+          }
           processEvent(event, PersistenceId.ofUniqueId(eventEnvelope.persistenceId), eventEnvelope.sequenceNr).map(_ =>
             eventEnvelope.offset)
         case other =>
@@ -130,7 +137,7 @@ abstract class EventProcessorStream[Event: ClassTag](
   private def extractOffset(maybeRow: Option[Row]): Offset = {
     maybeRow match {
       case Some(row) =>
-        val uuid = row.getUUID("timeUuidOffset")
+        val uuid = row.getUuid("timeUuidOffset")
         if (uuid == null) {
           NoOffset
         } else {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
index 7bf093d..6e03597 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
@@ -1,7 +1,6 @@
 package sample.cqrs
 
 import scala.concurrent.Future
-import scala.concurrent.duration._
 
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
@@ -39,7 +38,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
                 case ShoppingCart.Accepted(summary) =>
                   complete(StatusCodes.OK -> summary)
                 case ShoppingCart.Rejected(reason) =>
-                  complete(StatusCodes.BadRequest, reason)
+                  complete(StatusCodes.BadRequest -> reason)
               }
             }
           },
@@ -57,7 +56,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
                   case ShoppingCart.Accepted(summary) =>
                     complete(StatusCodes.OK -> summary)
                   case ShoppingCart.Rejected(reason) =>
-                    complete(StatusCodes.BadRequest, reason)
+                    complete(StatusCodes.BadRequest -> reason)
                 }
             }
           },
@@ -76,7 +75,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
                   case ShoppingCart.Accepted(summary) =>
                     complete(StatusCodes.OK -> summary)
                   case ShoppingCart.Rejected(reason) =>
-                    complete(StatusCodes.BadRequest, reason)
+                    complete(StatusCodes.BadRequest -> reason)
                 }
               }
             })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org