You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:44 UTC
[incubator-pekko-samples] 01/01: Actually commit them
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-chbatey-latest-cass-version
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit c260dd5b864f82d634d026daf939873a312e034f
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Thu Jan 9 09:51:54 2020 +0000
Actually commit them
---
akka-sample-cqrs-scala/build.sbt | 2 +-
akka-sample-cqrs-scala/project/build.properties | 2 +-
.../src/main/resources/application.conf | 6 ++++++
.../scala/sample/cqrs/CassandraSessionExtension.scala | 9 +++------
.../src/main/scala/sample/cqrs/EventProcessor.scala | 15 +++++++++++----
.../src/main/scala/sample/cqrs/ShoppingCartRoutes.scala | 7 +++----
6 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 5c535ee..3298508 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,5 +1,5 @@
val AkkaVersion = "2.6.1"
-val AkkaPersistenceCassandraVersion = "0.100"
+val AkkaPersistenceCassandraVersion = "1.0-SNAPSHOT"
val AkkaHttpVersion = "10.1.10"
lazy val `akka-sample-cqrs-scala` = project
diff --git a/akka-sample-cqrs-scala/project/build.properties b/akka-sample-cqrs-scala/project/build.properties
index 6adcdc7..00b48d9 100644
--- a/akka-sample-cqrs-scala/project/build.properties
+++ b/akka-sample-cqrs-scala/project/build.properties
@@ -1 +1 @@
-sbt.version=1.3.3
+sbt.version=1.3.6
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 5b51db2..dcbc0f9 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -36,11 +36,17 @@ akka {
cassandra-journal {
events-by-tag {
bucket-size = "Day"
+ flush-interval = 10ms
}
+ pubsub-notification = on
}
cassandra-query-journal {
first-time-bucket = "20191023T00:00"
+ events-by-tag {
+ eventual-consistency-delay = 150ms
+ }
+// refresh-interval = 200ms
}
event-processor {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
index b5bbfe4..e270fe3 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CassandraSessionExtension.scala
@@ -1,16 +1,14 @@
package sample.cqrs
import scala.concurrent.Future
-
import akka.Done
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.scaladsl.adapter._
+import akka.cassandra.session.DefaultSessionProvider
+import akka.cassandra.session.scaladsl.CassandraSession
import akka.event.Logging
-import akka.persistence.cassandra.ConfigSessionProvider
-import akka.persistence.cassandra.session.CassandraSessionSettings
-import akka.persistence.cassandra.session.scaladsl.CassandraSession
object CassandraSessionExtension extends ExtensionId[CassandraSessionExtension] {
@@ -27,8 +25,7 @@ class CassandraSessionExtension(system: ActorSystem[_]) extends Extension {
val sessionConfig = system.settings.config.getConfig("cassandra-journal")
new CassandraSession(
system.toClassic,
- new ConfigSessionProvider(system.toClassic, sessionConfig),
- CassandraSessionSettings(sessionConfig),
+ new DefaultSessionProvider(system.toClassic, sessionConfig),
system.executionContext,
Logging(system.toClassic, getClass),
metricsCategory = "sample",
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
index ec420d1..dc1a560 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessor.scala
@@ -4,7 +4,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.ClassTag
-
import akka.Done
import akka.NotUsed
import akka.actor.typed.ActorSystem
@@ -26,8 +25,9 @@ import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.RestartSource
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
-import com.datastax.driver.core.PreparedStatement
-import com.datastax.driver.core.Row
+import com.datastax.oss.driver.api.core.cql.PreparedStatement
+import com.datastax.oss.driver.api.core.cql.Row
+import com.datastax.oss.driver.api.core.uuid.Uuids
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -110,6 +110,13 @@ abstract class EventProcessorStream[Event: ClassTag](
query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
eventEnvelope.event match {
case event: Event =>
+ eventEnvelope.offset match {
+ case TimeBasedUUID(offset) =>
+ // these times are from different nodes so don't rely on this being accurate
+ val eventDelay = System.currentTimeMillis() - Uuids.unixTimestamp(offset)
+ log.info("Event eventual consistency {}", eventDelay.millis)
+ case _ =>
+ }
processEvent(event, PersistenceId.ofUniqueId(eventEnvelope.persistenceId), eventEnvelope.sequenceNr).map(_ =>
eventEnvelope.offset)
case other =>
@@ -130,7 +137,7 @@ abstract class EventProcessorStream[Event: ClassTag](
private def extractOffset(maybeRow: Option[Row]): Offset = {
maybeRow match {
case Some(row) =>
- val uuid = row.getUUID("timeUuidOffset")
+ val uuid = row.getUuid("timeUuidOffset")
if (uuid == null) {
NoOffset
} else {
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
index 7bf093d..6e03597 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
@@ -1,7 +1,6 @@
package sample.cqrs
import scala.concurrent.Future
-import scala.concurrent.duration._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
@@ -39,7 +38,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
case ShoppingCart.Accepted(summary) =>
complete(StatusCodes.OK -> summary)
case ShoppingCart.Rejected(reason) =>
- complete(StatusCodes.BadRequest, reason)
+ complete(StatusCodes.BadRequest -> reason)
}
}
},
@@ -57,7 +56,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
case ShoppingCart.Accepted(summary) =>
complete(StatusCodes.OK -> summary)
case ShoppingCart.Rejected(reason) =>
- complete(StatusCodes.BadRequest, reason)
+ complete(StatusCodes.BadRequest -> reason)
}
}
},
@@ -76,7 +75,7 @@ class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
case ShoppingCart.Accepted(summary) =>
complete(StatusCodes.OK -> summary)
case ShoppingCart.Rejected(reason) =>
- complete(StatusCodes.BadRequest, reason)
+ complete(StatusCodes.BadRequest -> reason)
}
}
})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org