You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:36 UTC
[incubator-pekko-samples] 01/01: Basic testing
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-chbatey-cqrs-testing
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 3cf2ab6fd4bf3850288119f1256b7bf8a6877e57
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Fri Oct 2 15:10:52 2020 +0100
Basic testing
---
akka-sample-cqrs-scala/README.md | 66 +------
akka-sample-cqrs-scala/build.sbt | 14 +-
akka-sample-cqrs-scala/projection.sql | 21 ++
.../src/main/resources/application.conf | 43 +++--
.../projection/testing}/CborSerializable.scala | 2 +-
.../testing/ConfigurablePersistentActor.scala | 47 +++++
.../testing}/EventProcessorSettings.scala | 7 +-
.../scala/akka/projection/testing/Guardian.scala | 87 +++++++++
.../akka/projection/testing/HikariFactory.scala | 9 +
.../projection/testing/HikariJdbcSession.scala | 23 +++
.../projection/testing/HttpServer.scala} | 15 +-
.../akka/projection/testing/LoadGeneration.scala | 85 ++++++++
.../main/scala/akka/projection/testing/Main.scala | 33 ++++
.../projection/testing/ProjectionHandler.scala | 19 ++
.../scala/akka/projection/testing/TestRoutes.scala | 30 +++
.../akka/projection/testing/TestValidation.scala | 45 +++++
.../src/main/scala/sample/cqrs/Main.scala | 153 ---------------
.../src/main/scala/sample/cqrs/ShoppingCart.scala | 215 ---------------------
.../cqrs/ShoppingCartProjectionHandler.scala | 27 ---
.../scala/sample/cqrs/ShoppingCartRoutes.scala | 110 -----------
.../src/test/resources/logback-test.xml | 17 --
.../test/scala/sample/cqrs/IntegrationSpec.scala | 198 -------------------
.../test/scala/sample/cqrs/ProjectionSpec.scala | 87 ---------
.../test/scala/sample/cqrs/ShoppingCartSpec.scala | 86 ---------
24 files changed, 450 insertions(+), 989 deletions(-)
diff --git a/akka-sample-cqrs-scala/README.md b/akka-sample-cqrs-scala/README.md
index 77ec65b..52ee6d2 100644
--- a/akka-sample-cqrs-scala/README.md
+++ b/akka-sample-cqrs-scala/README.md
@@ -1,66 +1,2 @@
-This tutorial contains a sample illustrating an CQRS design with [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html), [Akka Cluster Singleton](https://doc.akka.io/docs/akka/2.6/typed/cluster-singleton.html), [Akka Persistence](https://doc.akka.io/docs/akka/2.6/typed/persistence.html) and [Akka Persistence Query](https://doc.akka.io/docs/akka/2.6/persistence-query.html).
+# Projection latency testbed
-## Overview
-
-This sample application implements a CQRS-ES design that will side-effect in the read model on selected events persisted to Cassandra by the write model. In this sample, the side-effect is logging a line.
-A more practical example would be to send a message to a Kafka topic or update a relational database.
-
-## Write model
-
-The write model is a shopping cart.
-
-The implementation is based on a sharded actor: each `ShoppingCart` is an [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html) entity. The entity actor `ShoppingCart` is an [EventSourcedBehavior](https://doc.akka.io/docs/akka/2.6/typed/persistence.html).
-
-Events from the shopping carts are tagged and consumed by the read model.
-
-## Read model
-
-The read model is implemented in such a way that 'load' is sharded over a number of processors. This number is `event-processor.parallelism`.
-This is implemented using [Akka Projections](https://doc.akka.io/docs/akka-projection/current) which is then running on top of
- [Sharded Daemon Process](https://doc.akka.io/docs/akka/current/typed/cluster-sharded-daemon-process.html).
-
-
-## Running the sample code
-
-1. Start a Cassandra server by running:
-
-```
-sbt "runMain sample.cqrs.Main cassandra"
-```
-
-2. Start a node that runs the write model:
-
-```
-sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2551"
-```
-
-3. Start a node that runs the read model:
-
-```
-sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2552"
-```
-
-4. More write or read nodes can be started started by defining roles and port:
-
-```
-sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2553"
-sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2554"
-```
-
-Try it with curl:
-
-```
-# add item to cart
-curl -X POST -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' http://127.0.0.1:8051/shopping/carts
-
-# get cart
-curl http://127.0.0.1:8051/shopping/carts/cart1
-
-# update quantity of item
-curl -X PUT -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' http://127.0.0.1:8051/shopping/carts
-
-# check out cart
-curl -X POST -H "Content-Type: application/json" -d '{}' http://127.0.0.1:8051/shopping/carts/cart1/checkout
-```
-
-or same `curl` commands to port 8052.
diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 43cb70d..a67e25b 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,12 +1,12 @@
-val AkkaVersion = "2.6.8"
+val AkkaVersion = "2.6.9"
val AkkaPersistenceCassandraVersion = "1.0.1"
val AkkaHttpVersion = "10.2.0"
-val AkkaProjectionVersion = "0.3"
+val AkkaProjectionVersion = "1.0.0"
-lazy val `akka-sample-cqrs-scala` = project
+lazy val `akka-projection-testing` = project
.in(file("."))
.settings(
- organization := "com.lightbend.akka.samples",
+ organization := "akka.projection.testing",
version := "1.0",
scalaVersion := "2.13.1",
scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"),
@@ -20,19 +20,23 @@ lazy val `akka-sample-cqrs-scala` = project
"com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % AkkaPersistenceCassandraVersion,
"com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion,
"com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionVersion,
+ "com.lightbend.akka" %% "akka-projection-jdbc" % AkkaProjectionVersion,
+ "com.zaxxer" % "HikariCP" % "3.4.5",
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
"ch.qos.logback" % "logback-classic" % "1.2.3",
+ "org.postgresql" % "postgresql" % "42.2.14",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
"com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test,
+
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
"commons-io" % "commons-io" % "2.4" % Test),
fork in run := false,
Global / cancelable := false, // ctrl-c
- mainClass in (Compile, run) := Some("sample.cqrs.Main"),
+ mainClass in (Compile, run) := Some("akka.projection.testing.Main"),
// disable parallel tests
parallelExecution in Test := false,
// show full stack traces and test case durations
diff --git a/akka-sample-cqrs-scala/projection.sql b/akka-sample-cqrs-scala/projection.sql
new file mode 100644
index 0000000..154a945
--- /dev/null
+++ b/akka-sample-cqrs-scala/projection.sql
@@ -0,0 +1,21 @@
+
+create table if not exists "AKKA_PROJECTION_OFFSET_STORE" (
+ "PROJECTION_NAME" VARCHAR(255) NOT NULL,
+ "PROJECTION_KEY" VARCHAR(255) NOT NULL,
+ "CURRENT_OFFSET" VARCHAR(255) NOT NULL,
+ "MANIFEST" VARCHAR(4) NOT NULL,
+ "MERGEABLE" BOOLEAN NOT NULL,
+ "LAST_UPDATED" BIGINT NOT NULL,
+ constraint "PK_PROJECTION_ID" primary key ("PROJECTION_NAME","PROJECTION_KEY")
+);
+
+create index if not exists "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME");
+
+create table if not exists events (
+ name varchar(256),
+ event varchar(256),
+ constraint pkey primary key (name, event)
+);
+
+
+
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 9bcb675..da92bcf 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -1,11 +1,29 @@
akka {
+
+ projection {
+ jdbc {
+ dialect = "postgres-dialect"
+ offset-store {
+ schema = ""
+ table = "AKKA_PROJECTION_OFFSET_STORE"
+ }
+ blocking-jdbc-dispatcher {
+ type = Dispatcher
+ executor = "thread-pool-executor"
+ thread-pool-executor {
+ fixed-pool-size = 10
+ }
+ }
+ }
+ }
+
loglevel = DEBUG
actor {
provider = cluster
serialization-bindings {
- "sample.cqrs.CborSerializable" = jackson-cbor
+ "akka.projection.testing.CborSerializable" = jackson-cbor
}
}
@@ -18,8 +36,8 @@ akka {
cluster {
seed-nodes = [
- "akka://Shopping@127.0.0.1:2551",
- "akka://Shopping@127.0.0.1:2552"
+ "akka://test@127.0.0.1:2551",
+ "akka://test@127.0.0.1:2552"
]
roles = ["write-model", "read-model"]
@@ -37,13 +55,17 @@ akka {
# Configuration for akka-persistence-cassandra
akka.persistence.cassandra {
+ journal {
+ keyspace = "akka_testing"
+ }
+
events-by-tag {
- bucket-size = "Day"
+ bucket-size = "Hour"
# for reduced latency
eventual-consistency-delay = 200ms
flush-interval = 50ms
pubsub-notification = on
- first-time-bucket = "20200115T00:00"
+ first-time-bucket = "20201001T00:00"
}
query {
@@ -61,14 +83,13 @@ datastax-java-driver {
advanced.reconnect-on-init = on
}
-akka.projection.cassandra.offset-store.keyspace = "akka_cqrs_sample"
-
event-processor {
- tag-prefix = "carts-slice" // even processor tag prefix
- parallelism = 4 // number of event processors
+ parallelism = 4
+}
+
+test {
+
}
-shopping.http.port = 0
-shopping.askTimeout = 5 s
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
similarity index 72%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
index cbec29e..a3a147e 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
@@ -1,4 +1,4 @@
-package sample.cqrs
+package akka.projection.testing
/**
* Marker trait for serialization with Jackson CBOR
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala
new file mode 100644
index 0000000..579d208
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala
@@ -0,0 +1,47 @@
+package akka.projection.testing
+
+import akka.Done
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.pattern.StatusReply
+import akka.persistence.typed.PersistenceId
+import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
+
+object ConfigurablePersistentActor {
+
+ val Key: EntityTypeKey[Command] = EntityTypeKey[Command]("configurable")
+
+ def init(settings: EventProcessorSettings, system: ActorSystem[_]): ActorRef[ShardingEnvelope[Command]] = {
+ ClusterSharding(system).init(Entity(Key)(ctx => apply(settings, ctx.entityId))
+ .withRole("write-model"))
+ }
+
+ trait Command
+
+ final case class PersistAndAck(toPersist: String, replyTo: ActorRef[StatusReply[Done]], testName: String) extends Command
+
+ final case class Persist(toPersist: String, testName: String) extends Command
+
+ final case class Event(testName: String, payload: String, timeCreated: Long = System.currentTimeMillis()) extends CborSerializable
+
+ final case class State(eventsProcessed: Long) extends CborSerializable
+
+ def apply(settings: EventProcessorSettings, persistenceId: String): Behavior[Command] =
+ Behaviors.setup { ctx =>
+ EventSourcedBehavior[Command, Event, State](
+ persistenceId = PersistenceId.ofUniqueId(persistenceId),
+ State(0),
+ (_, command) => command match {
+ case Persist(toPersist, testName) =>
+ Effect.persist(Event(testName, toPersist))
+ case PersistAndAck(toPersist, ack, testName) =>
+ ctx.log.info("persisting event {}", command)
+ Effect.persist(Event(testName, toPersist)).thenRun(_ => ack ! StatusReply.ack())
+ },
+ (state, _) => state.copy(eventsProcessed = state.eventsProcessed + 1)).withTagger(event =>
+ Set("tag-" + math.abs(event.hashCode() % settings.parallelism)))
+ }
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
similarity index 63%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
index 774a221..69430ec 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
@@ -1,4 +1,4 @@
-package sample.cqrs
+package akka.projection.testing
import akka.actor.typed.ActorSystem
import com.typesafe.config.Config
@@ -10,10 +10,9 @@ object EventProcessorSettings {
}
def apply(config: Config): EventProcessorSettings = {
- val tagPrefix: String = config.getString("tag-prefix")
val parallelism: Int = config.getInt("parallelism")
- EventProcessorSettings(tagPrefix, parallelism)
+ EventProcessorSettings(parallelism)
}
}
-final case class EventProcessorSettings(tagPrefix: String, parallelism: Int)
+final case class EventProcessorSettings(parallelism: Int)
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala
new file mode 100644
index 0000000..c662600
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala
@@ -0,0 +1,87 @@
+package akka.projection.testing
+
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
+import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardedDaemonProcessSettings, ShardingEnvelope}
+import akka.cluster.typed.Cluster
+import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
+import akka.persistence.query.Offset
+import akka.projection.eventsourced.EventEnvelope
+import akka.projection.eventsourced.scaladsl.EventSourcedProvider
+import akka.projection.jdbc.scaladsl.JdbcProjection
+import akka.projection.scaladsl.ExactlyOnceProjection
+import akka.projection.testing.LoadGeneration.{Result, RunTest}
+import akka.projection.{ProjectionBehavior, ProjectionId}
+import akka.util.Timeout
+import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
+
+import scala.concurrent.duration.DurationInt
+import scala.util.{Failure, Success}
+
+object Guardian {
+
+ def createProjectionFor(
+ settings: EventProcessorSettings,
+ index: Int,
+ factory: HikariFactory
+ )(implicit system: ActorSystem[_]): ExactlyOnceProjection[Offset, EventEnvelope[ConfigurablePersistentActor.Event]] = {
+ val tag = s"tag-$index"
+ val sourceProvider = EventSourcedProvider.eventsByTag[ConfigurablePersistentActor.Event](
+ system = system,
+ readJournalPluginId = CassandraReadJournal.Identifier,
+ tag = tag)
+ JdbcProjection.exactlyOnce(
+ projectionId = ProjectionId("test-projection-id", tag),
+ sourceProvider,
+ () => factory.newSession(),
+ () => new ProjectionHandler(tag, system)
+ )
+ }
+
+ def apply(): Behavior[String] = {
+ Behaviors.setup[String] { context =>
+ implicit val system: ActorSystem[_] = context.system
+ val config = new HikariConfig
+ config.setJdbcUrl("jdbc:postgresql://127.0.0.1:5432/")
+ config.setUsername("docker")
+ config.setPassword("docker")
+ config.setMaximumPoolSize(19)
+ config.setAutoCommit(false)
+ val dataSource = new HikariDataSource(config)
+ val settings = EventProcessorSettings(system)
+ val shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]] = ConfigurablePersistentActor.init(settings, system)
+ if (Cluster(system).selfMember.hasRole("read-model")) {
+
+ val dbSessionFactory = new HikariFactory(dataSource)
+
+ // we only want to run the daemon processes on the read-model nodes
+ val shardingSettings = ClusterShardingSettings(system)
+ val shardedDaemonProcessSettings =
+ ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model"))
+
+ ShardedDaemonProcess(system).init(
+ name = "test-projection",
+ settings.parallelism,
+ n => ProjectionBehavior(createProjectionFor(settings, n, dbSessionFactory)),
+ shardedDaemonProcessSettings,
+ Some(ProjectionBehavior.Stop))
+ }
+
+ // TODO move to route
+ implicit val timeout: Timeout = 10.seconds
+ val loadGeneration: ActorRef[LoadGeneration.RunTest] = context.spawn(LoadGeneration(shardRegion, dataSource), "load-generation")
+ context.ask[RunTest, Result](loadGeneration, replyTo => LoadGeneration.RunTest(s"test-${System.currentTimeMillis()}", 2, 5, replyTo)) {
+ case Success(value) =>
+ context.log.info("Test passed {}", value)
+ "success"
+ case Failure(t) =>
+ context.log.error("Test failed",t )
+ "failure"
+ }
+
+
+ Behaviors.empty
+ }
+ }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala
new file mode 100644
index 0000000..742a44b
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala
@@ -0,0 +1,9 @@
+package akka.projection.testing
+
+import javax.sql.DataSource
+
+class HikariFactory(val dataSource: DataSource) {
+ def newSession(): HikariJdbcSession = {
+ new HikariJdbcSession(dataSource)
+ }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala
new file mode 100644
index 0000000..057b8d1
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala
@@ -0,0 +1,23 @@
+package akka.projection.testing
+
+import java.sql.Connection
+
+import akka.japi.function
+import akka.projection.jdbc.JdbcSession
+import javax.sql.DataSource
+
+
+
+class HikariJdbcSession(source: DataSource) extends JdbcSession {
+
+ private val connection = source.getConnection
+
+ override def withConnection[Result](func: function.Function[Connection, Result]): Result =
+ func(connection)
+
+ override def commit(): Unit = connection.commit()
+
+ override def rollback(): Unit = connection.rollback()
+
+ override def close(): Unit = connection.close()
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
similarity index 59%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
index 4a9c313..2444cee 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
@@ -1,18 +1,13 @@
-package sample.cqrs
+package akka.projection.testing
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-
-import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
-import akka.Done
-class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) {
- private val shutdown = CoordinatedShutdown(system)
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+class HttpServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) {
import system.executionContext
def start(): Unit = {
@@ -21,7 +16,7 @@ class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[
.onComplete {
case Success(binding) =>
val address = binding.localAddress
- system.log.info("Shopping online at http://{}:{}/", address.getHostString, address.getPort)
+ system.log.info("Online at http://{}:{}/", address.getHostString, address.getPort)
case Failure(ex) =>
system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala
new file mode 100644
index 0000000..35fb682
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala
@@ -0,0 +1,85 @@
+package akka.projection.testing
+
+import akka.{Done, NotUsed}
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, Behavior, Terminated}
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.pattern.StatusReply
+import akka.projection.testing.LoadGeneration.{Failed, Result, RunTest}
+import akka.projection.testing.LoadTest.Start
+import akka.stream.scaladsl.Source
+import akka.util.Timeout
+import javax.sql.DataSource
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.DurationInt
+import scala.util.{Failure, Success}
+
+object LoadGeneration {
+
+ case class RunTest(name: String, actors: Int, eventsPerActor: Int, reply: ActorRef[Result])
+
+ sealed trait Result
+
+ case class Pass() extends Result
+
+ case class Failed(t: Option[Throwable], expected: Int, got: Int) extends Result
+
+ def apply(shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[RunTest] = Behaviors.setup { ctx =>
+ Behaviors.receiveMessage[RunTest] {
+ case rt@RunTest(name, actors, eventsPerActor, reply) =>
+ ctx.spawn(LoadTest(name, shardRegion, source), s"test-$name") ! Start(rt)
+ Behaviors.same
+ }
+ }
+
+}
+
+object LoadTest {
+
+ sealed trait Command
+
+ case class Start(test: RunTest) extends Command
+
+ private case class StartValidation() extends Command
+
+ private case class LoadGenerationFailed(t: Throwable) extends Command
+
+ def apply(testName: String, shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[Command] = Behaviors.setup { ctx =>
+ import akka.actor.typed.scaladsl.AskPattern._
+ implicit val timeout: Timeout = 30.seconds
+ implicit val system = ctx.system
+ implicit val ec: ExecutionContext = system.executionContext
+ Behaviors.receiveMessage[Command] {
+ case Start(RunTest(_, actors, eventsPerActor, replyTo)) =>
+ ctx.log.info("Starting load generation")
+ val expected = actors * eventsPerActor
+ val testRun: Source[StatusReply[Done], NotUsed] = Source(1 to actors)
+ .flatMapConcat(id =>
+ Source(1 to eventsPerActor)
+ .mapAsync(1)(message => shardRegion.ask[StatusReply[Done]] { replyTo =>
+ ShardingEnvelope(s"$id", ConfigurablePersistentActor.PersistAndAck(s"actor-$id-message-$message", replyTo, testName))
+ }))
+ ctx.pipeToSelf(testRun.run()) {
+ case Success(_) => StartValidation()
+ case Failure(t) => LoadGenerationFailed(t)
+ }
+ Behaviors.receiveMessage[Command] {
+ case StartValidation() =>
+ ctx.log.info("Starting validation")
+ val validation = ctx.spawn(TestValidation(replyTo, testName, expected, source: DataSource), s"TestValidation=$testName")
+ ctx.watch(validation)
+ Behaviors.same
+ case LoadGenerationFailed(t) =>
+ ctx.log.error("Load generation failed", t)
+ replyTo ! Failed(Some(t), -1, -1)
+ Behaviors.stopped
+ }.receiveSignal {
+ case (ctx, Terminated(_)) =>
+ ctx.log.info("Validation finished, terminating")
+ Behaviors.stopped
+ }
+ }
+ }
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala
new file mode 100644
index 0000000..09690e3
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala
@@ -0,0 +1,33 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import com.typesafe.config.{Config, ConfigFactory}
+
+object Main {
+
+ def main(args: Array[String]): Unit = {
+ args.headOption match {
+
+ case Some(portString) if portString.matches("""\d+""") =>
+ val port = portString.toInt
+ val httpPort = ("80" + portString.takeRight(2)).toInt
+ startNode(port, httpPort)
+
+ case None =>
+ throw new IllegalArgumentException("port number required argument")
+ }
+ }
+
+ def startNode(port: Int, httpPort: Int): Unit = {
+ ActorSystem[String](Guardian(), "test", config(port, httpPort))
+
+ }
+
+ def config(port: Int, httpPort: Int): Config =
+ ConfigFactory.parseString(
+ s"""
+ akka.remote.artery.canonical.port = $port
+ test.http.port = $httpPort
+ """).withFallback(ConfigFactory.load())
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala
new file mode 100644
index 0000000..bf8e7a0
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala
@@ -0,0 +1,19 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import akka.projection.eventsourced.EventEnvelope
+import akka.projection.jdbc.scaladsl.JdbcHandler
+import org.slf4j.{Logger, LoggerFactory}
+
+class ProjectionHandler(tag: String, system: ActorSystem[_])
+ extends JdbcHandler[EventEnvelope[ConfigurablePersistentActor.Event], HikariJdbcSession] {
+ private val log: Logger = LoggerFactory.getLogger(getClass)
+
+ override def process(session: HikariJdbcSession, envelope: EventEnvelope[ConfigurablePersistentActor.Event]): Unit = {
+ log.info("Event {} for tag {}", envelope.event.payload, tag)
+ session.withConnection(connection =>
+ connection.createStatement()
+ .execute(s"insert into events(name, event) values ('${envelope.event.testName}','${envelope.event.payload}')")
+ )
+ }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala
new file mode 100644
index 0000000..a1b40ce
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala
@@ -0,0 +1,30 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import spray.json.DefaultJsonProtocol._
+import spray.json.RootJsonFormat
+
+object TestRoutes {
+ case class RunTest(name: String, nrActors: Long, messagesPerActor: Long)
+ case class TestResult(pass: Boolean, expected: Long, got: Long)
+
+ implicit val runTestFormat: RootJsonFormat[RunTest] = jsonFormat3(RunTest)
+ implicit val testResultFormat: RootJsonFormat[TestResult] = jsonFormat3(TestResult)
+}
+
+//class TestRoutes()(implicit val system: ActorSystem[_]) {
+// import TestRoutes._
+// val route: Route = path("test") {
+// post {
+// entity(as[RunTest]) { runTest =>
+//
+//
+// }
+// }
+// }
+//
+//}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala
new file mode 100644
index 0000000..0976a82
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala
@@ -0,0 +1,45 @@
+package akka.projection.testing
+
+import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.scaladsl.Behaviors
+import akka.projection.testing.LoadGeneration.{Pass, Result}
+import javax.sql.DataSource
+
+object TestValidation {
+ // FIXME blocking, dispatcher
+ // FIXME timeout
+ def apply(replyTo: ActorRef[Result], testName: String, expectedNrEvents: Long, source: DataSource): Behavior[String] = {
+ import scala.concurrent.duration._
+ Behaviors.setup { ctx =>
+ def validate(): Boolean = {
+ val connection = source.getConnection
+ try {
+ val resultSet = connection.createStatement().executeQuery(s"select count(*) from events where name = '$testName'")
+ if (resultSet.next()) {
+ val count = resultSet.getInt("count")
+ ctx.log.info("Expected {} got {}!", expectedNrEvents, count)
+ expectedNrEvents == count
+ } else {
+ throw new RuntimeException("Expected single row")
+ }
+ } finally {
+ connection.close()
+ }
+ }
+
+ Behaviors.withTimers { timers =>
+ timers.startTimerAtFixedRate("test", 2.seconds)
+ Behaviors.receiveMessage {
+ case "test" =>
+ if (validate()) {
+ ctx.log.info("Validated. Stopping")
+ replyTo ! Pass()
+ Behaviors.stopped
+ } else {
+ Behaviors.same
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala
deleted file mode 100644
index 795d407..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-import java.util.concurrent.CountDownLatch
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.typed.{ ClusterShardingSettings, ShardedDaemonProcessSettings }
-import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
-import akka.cluster.typed.Cluster
-import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.persistence.query.Offset
-import akka.projection.{ ProjectionBehavior, ProjectionId }
-import akka.projection.scaladsl.AtLeastOnceProjection
-import akka.projection.cassandra.scaladsl.CassandraProjection
-import akka.projection.eventsourced.EventEnvelope
-import akka.projection.eventsourced.scaladsl.EventSourcedProvider
-import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-
-object Main {
-
- def main(args: Array[String]): Unit = {
- args.headOption match {
-
- case Some(portString) if portString.matches("""\d+""") =>
- val port = portString.toInt
- val httpPort = ("80" + portString.takeRight(2)).toInt
- startNode(port, httpPort)
-
- case Some("cassandra") =>
- startCassandraDatabase()
- println("Started Cassandra, press Ctrl + C to kill")
- new CountDownLatch(1).await()
-
- case None =>
- throw new IllegalArgumentException("port number, or cassandra required argument")
- }
- }
-
- def startNode(port: Int, httpPort: Int): Unit = {
- val system =
- ActorSystem[Nothing](Guardian(), "Shopping", config(port, httpPort))
-
- if (Cluster(system).selfMember.hasRole("read-model"))
- createTables(system)
- }
-
- def config(port: Int, httpPort: Int): Config =
- ConfigFactory.parseString(s"""
- akka.remote.artery.canonical.port = $port
- shopping.http.port = $httpPort
- """).withFallback(ConfigFactory.load())
-
- /**
- * To make the sample easier to run we kickstart a Cassandra instance to
- * act as the journal. Cassandra is a great choice of backend for Akka Persistence but
- * in a real application a pre-existing Cassandra cluster should be used.
- */
- def startCassandraDatabase(): Unit = {
- val databaseDirectory = new File("target/cassandra-db")
- CassandraLauncher.start(databaseDirectory, CassandraLauncher.DefaultTestConfigResource, clean = false, port = 9042)
- }
-
- def createTables(system: ActorSystem[_]): Unit = {
- val session =
- CassandraSessionRegistry(system).sessionFor("alpakka.cassandra")
-
- // TODO use real replication strategy in real application
- val keyspaceStmt =
- """
- CREATE KEYSPACE IF NOT EXISTS akka_cqrs_sample
- WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
- """
-
- val offsetTableStmt =
- """
- CREATE TABLE IF NOT EXISTS akka_cqrs_sample.offset_store (
- projection_name text,
- partition int,
- projection_key text,
- offset text,
- manifest text,
- last_updated timestamp,
- PRIMARY KEY ((projection_name, partition), projection_key)
- )
- """
-
- // ok to block here, main thread
- Await.ready(session.executeDDL(keyspaceStmt), 30.seconds)
- system.log.info("Created akka_cqrs_sample keyspace")
- Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
- system.log.info("Created akka_cqrs_sample.offset_store table")
-
- }
-
-}
-
-object Guardian {
-
- def createProjectionFor(
- system: ActorSystem[_],
- settings: EventProcessorSettings,
- index: Int): AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = {
- val tag = s"${settings.tagPrefix}-$index"
- val sourceProvider = EventSourcedProvider.eventsByTag[ShoppingCart.Event](
- system = system,
- readJournalPluginId = CassandraReadJournal.Identifier,
- tag = tag)
- CassandraProjection.atLeastOnce(
- projectionId = ProjectionId("shopping-carts", tag),
- sourceProvider,
- handler = () => new ShoppingCartProjectionHandler(tag, system))
- }
-
- def apply(): Behavior[Nothing] = {
- Behaviors.setup[Nothing] { context =>
- val system = context.system
-
- val settings = EventProcessorSettings(system)
-
- val httpPort = context.system.settings.config.getInt("shopping.http.port")
-
- ShoppingCart.init(system, settings)
-
- if (Cluster(system).selfMember.hasRole("read-model")) {
-
- // we only want to run the daemon processes on the read-model nodes
- val shardingSettings = ClusterShardingSettings(system)
- val shardedDaemonProcessSettings =
- ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model"))
-
- ShardedDaemonProcess(system).init(
- name = "ShoppingCartProjection",
- settings.parallelism,
- n => ProjectionBehavior(createProjectionFor(system, settings, n)),
- shardedDaemonProcessSettings,
- Some(ProjectionBehavior.Stop))
- }
-
- val routes = new ShoppingCartRoutes()(context.system)
- new ShoppingCartServer(routes.shopping, httpPort)(context.system).start()
-
- Behaviors.empty
- }
- }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala
deleted file mode 100644
index c800f0b..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala
+++ /dev/null
@@ -1,215 +0,0 @@
-package sample.cqrs
-
-import java.time.Instant
-
-import scala.concurrent.duration._
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
-import akka.actor.typed.SupervisorStrategy
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.sharding.typed.scaladsl.Entity
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.pattern.StatusReply
-import akka.persistence.typed.PersistenceId
-import akka.persistence.typed.scaladsl.RetentionCriteria
-import akka.persistence.typed.scaladsl.Effect
-import akka.persistence.typed.scaladsl.EventSourcedBehavior
-import akka.persistence.typed.scaladsl.ReplyEffect
-
-/**
- * This is an event sourced actor. It has a state, [[ShoppingCart.State]], which
- * stores the current shopping cart items and whether it's checked out.
- *
- * Event sourced actors are interacted with by sending them commands,
- * see classes implementing [[ShoppingCart.Command]].
- *
- * Commands get translated to events, see classes implementing [[ShoppingCart.Event]].
- * It's the events that get persisted by the entity. Each event will have an event handler
- * registered for it, and an event handler updates the current state based on the event.
- * This will be done when the event is first created, and it will also be done when the entity is
- * loaded from the database - each event will be replayed to recreate the state
- * of the entity.
- */
-object ShoppingCart {
-
- /**
- * The current state held by the persistent entity.
- */
- final case class State(items: Map[String, Int], checkoutDate: Option[Instant]) extends CborSerializable {
-
- def isCheckedOut: Boolean =
- checkoutDate.isDefined
-
- def hasItem(itemId: String): Boolean =
- items.contains(itemId)
-
- def isEmpty: Boolean =
- items.isEmpty
-
- def updateItem(itemId: String, quantity: Int): State = {
- quantity match {
- case 0 => copy(items = items - itemId)
- case _ => copy(items = items + (itemId -> quantity))
- }
- }
-
- def removeItem(itemId: String): State =
- copy(items = items - itemId)
-
- def checkout(now: Instant): State =
- copy(checkoutDate = Some(now))
-
- def toSummary: Summary =
- Summary(items, isCheckedOut)
- }
- object State {
- val empty = State(items = Map.empty, checkoutDate = None)
- }
-
- /**
- * This interface defines all the commands that the ShoppingCart persistent actor supports.
- */
- sealed trait Command extends CborSerializable
-
- /**
- * A command to add an item to the cart.
- *
- * It can reply with `StatusReply[Summary]`, which is sent back to the caller when
- * all the events emitted by this command are successfully persisted.
- */
- final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
- /**
- * A command to remove an item from the cart.
- */
- final case class RemoveItem(itemId: String, replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
- /**
- * A command to adjust the quantity of an item in the cart.
- */
- final case class AdjustItemQuantity(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]])
- extends Command
-
- /**
- * A command to checkout the shopping cart.
- */
- final case class Checkout(replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
- /**
- * A command to get the current state of the shopping cart.
- */
- final case class Get(replyTo: ActorRef[Summary]) extends Command
-
- /**
- * Summary of the shopping cart state, used in reply messages.
- */
- final case class Summary(items: Map[String, Int], checkedOut: Boolean) extends CborSerializable
-
- /**
- * This interface defines all the events that the ShoppingCart supports.
- */
- sealed trait Event extends CborSerializable {
- def cartId: String
- }
-
- final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event
-
- final case class ItemRemoved(cartId: String, itemId: String) extends Event
-
- final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event
-
- final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
-
- val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart")
-
- def init(system: ActorSystem[_], eventProcessorSettings: EventProcessorSettings): Unit = {
- ClusterSharding(system).init(Entity(EntityKey) { entityContext =>
- val n = math.abs(entityContext.entityId.hashCode % eventProcessorSettings.parallelism)
- val eventProcessorTag = eventProcessorSettings.tagPrefix + "-" + n
- ShoppingCart(entityContext.entityId, Set(eventProcessorTag))
- }.withRole("write-model"))
- }
-
- def apply(cartId: String, eventProcessorTags: Set[String]): Behavior[Command] = {
- EventSourcedBehavior
- .withEnforcedReplies[Command, Event, State](
- PersistenceId(EntityKey.name, cartId),
- State.empty,
- (state, command) =>
- //The shopping cart behavior changes if it's checked out or not.
- // The commands are handled differently for each case.
- if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command)
- else openShoppingCart(cartId, state, command),
- (state, event) => handleEvent(state, event))
- .withTagger(_ => eventProcessorTags)
- .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
- .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
- }
-
- private def openShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] =
- command match {
- case AddItem(itemId, quantity, replyTo) =>
- if (state.hasItem(itemId))
- Effect.reply(replyTo)(StatusReply.Error(s"Item '$itemId' was already added to this shopping cart"))
- else if (quantity <= 0)
- Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero"))
- else
- Effect
- .persist(ItemAdded(cartId, itemId, quantity))
- .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-
- case RemoveItem(itemId, replyTo) =>
- if (state.hasItem(itemId))
- Effect
- .persist(ItemRemoved(cartId, itemId))
- .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
- else
- Effect.reply(replyTo)(StatusReply.Success(state.toSummary)) // removing an item is idempotent
-
- case AdjustItemQuantity(itemId, quantity, replyTo) =>
- if (quantity <= 0)
- Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero"))
- else if (state.hasItem(itemId))
- Effect
- .persist(ItemQuantityAdjusted(cartId, itemId, quantity))
- .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
- else
- Effect.reply(replyTo)(
- StatusReply.Error(s"Cannot adjust quantity for item '$itemId'. Item not present on cart"))
-
- case Checkout(replyTo) =>
- if (state.isEmpty)
- Effect.reply(replyTo)(StatusReply.Error("Cannot checkout an empty shopping cart"))
- else
- Effect
- .persist(CheckedOut(cartId, Instant.now()))
- .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-
- case Get(replyTo) =>
- Effect.reply(replyTo)(state.toSummary)
- }
-
- private def checkedOutShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] =
- command match {
- case Get(replyTo) =>
- Effect.reply(replyTo)(state.toSummary)
- case cmd: AddItem =>
- Effect.reply(cmd.replyTo)(StatusReply.Error("Can't add an item to an already checked out shopping cart"))
- case cmd: RemoveItem =>
- Effect.reply(cmd.replyTo)(StatusReply.Error("Can't remove an item from an already checked out shopping cart"))
- case cmd: AdjustItemQuantity =>
- Effect.reply(cmd.replyTo)(StatusReply.Error("Can't adjust item on an already checked out shopping cart"))
- case cmd: Checkout =>
- Effect.reply(cmd.replyTo)(StatusReply.Error("Can't checkout already checked out shopping cart"))
- }
-
- private def handleEvent(state: State, event: Event) = {
- event match {
- case ItemAdded(_, itemId, quantity) => state.updateItem(itemId, quantity)
- case ItemRemoved(_, itemId) => state.removeItem(itemId)
- case ItemQuantityAdjusted(_, itemId, quantity) => state.updateItem(itemId, quantity)
- case CheckedOut(_, eventTime) => state.checkout(eventTime)
- }
- }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala
deleted file mode 100644
index cf965a8..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package sample.cqrs
-
-import akka.Done
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.eventstream.EventStream
-import akka.projection.eventsourced.EventEnvelope
-import akka.projection.scaladsl.Handler
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.Future
-
-class ShoppingCartProjectionHandler(tag: String, system: ActorSystem[_])
- extends Handler[EventEnvelope[ShoppingCart.Event]] {
- val log = LoggerFactory.getLogger(getClass)
-
- override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
-
- log.info(
- "EventProcessor({}) consumed {} from {} with seqNr {}",
- tag,
- envelope.event,
- envelope.persistenceId,
- envelope.sequenceNr)
- system.eventStream ! EventStream.Publish(envelope.event)
- Future.successful(Done)
- }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
deleted file mode 100644
index acd5b89..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-package sample.cqrs
-
-import scala.concurrent.Future
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.http.scaladsl.model.StatusCodes
-import akka.http.scaladsl.server.Route
-import akka.pattern.StatusReply
-import akka.util.Timeout
-
-object ShoppingCartRoutes {
- final case class AddItem(cartId: String, itemId: String, quantity: Int)
- final case class UpdateItem(cartId: String, itemId: String, quantity: Int)
-}
-
-class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
-
- implicit private val timeout: Timeout =
- Timeout.create(system.settings.config.getDuration("shopping.askTimeout"))
- private val sharding = ClusterSharding(system)
-
- import ShoppingCartRoutes._
- import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
- import akka.http.scaladsl.server.Directives._
- import JsonFormats._
-
- val shopping: Route =
- pathPrefix("shopping") {
- pathPrefix("carts") {
- concat(
- post {
- entity(as[AddItem]) {
- data =>
- val entityRef =
- sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId)
- val reply: Future[StatusReply[ShoppingCart.Summary]] =
- entityRef.ask(ShoppingCart.AddItem(data.itemId, data.quantity, _))
- onSuccess(reply) {
- case StatusReply.Success(summary: ShoppingCart.Summary) =>
- complete(StatusCodes.OK -> summary)
- case StatusReply.Error(reason) =>
- complete(StatusCodes.BadRequest -> reason)
- }
- }
- },
- put {
- entity(as[UpdateItem]) {
- data =>
- val entityRef =
- sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId)
-
- def command(replyTo: ActorRef[StatusReply[ShoppingCart.Summary]]) =
- if (data.quantity == 0)
- ShoppingCart.RemoveItem(data.itemId, replyTo)
- else
- ShoppingCart.AdjustItemQuantity(data.itemId, data.quantity, replyTo)
-
- val reply: Future[StatusReply[ShoppingCart.Summary]] =
- entityRef.ask(command(_))
- onSuccess(reply) {
- case StatusReply.Success(summary: ShoppingCart.Summary) =>
- complete(StatusCodes.OK -> summary)
- case StatusReply.Error(reason) =>
- complete(StatusCodes.BadRequest -> reason)
- }
- }
- },
- pathPrefix(Segment) { cartId =>
- concat(get {
- val entityRef =
- sharding.entityRefFor(ShoppingCart.EntityKey, cartId)
- onSuccess(entityRef.ask(ShoppingCart.Get)) { summary =>
- if (summary.items.isEmpty) complete(StatusCodes.NotFound)
- else complete(summary)
- }
- }, path("checkout") {
- post {
- val entityRef =
- sharding.entityRefFor(ShoppingCart.EntityKey, cartId)
- val reply: Future[StatusReply[ShoppingCart.Summary]] =
- entityRef.ask(ShoppingCart.Checkout(_))
- onSuccess(reply) {
- case StatusReply.Success(summary: ShoppingCart.Summary) =>
- complete(StatusCodes.OK -> summary)
- case StatusReply.Error(reason) =>
- complete(StatusCodes.BadRequest -> reason)
- }
- }
- })
- })
- }
- }
-
-}
-
-object JsonFormats {
-
- import spray.json.RootJsonFormat
- // import the default encoders for primitive types (Int, String, Lists etc)
- import spray.json.DefaultJsonProtocol._
-
- implicit val summaryFormat: RootJsonFormat[ShoppingCart.Summary] =
- jsonFormat2(ShoppingCart.Summary)
- implicit val addItemFormat: RootJsonFormat[ShoppingCartRoutes.AddItem] =
- jsonFormat3(ShoppingCartRoutes.AddItem)
- implicit val updateItemFormat: RootJsonFormat[ShoppingCartRoutes.UpdateItem] =
- jsonFormat3(ShoppingCartRoutes.UpdateItem)
-
-}
diff --git a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml b/akka-sample-cqrs-scala/src/test/resources/logback-test.xml
deleted file mode 100644
index 179c899..0000000
--- a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<configuration>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg%n</pattern>
- </encoder>
- </appender>
-
- <logger name="com.datastax.oss.driver" level="WARN"/>
- <logger name="org.apache.cassandra" level="ERROR"/>
- <logger name="com.codahale.metrics" level="INFO"/>
-
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala
deleted file mode 100644
index 8f1930a..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-import java.util.UUID
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import akka.actor.testkit.typed.scaladsl.ActorTestKit
-import akka.actor.typed.eventstream.EventStream
-import akka.cluster.MemberStatus
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.typed.Cluster
-import akka.cluster.typed.Join
-import akka.pattern.StatusReply
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.persistence.testkit.scaladsl.PersistenceInit
-import akka.persistence.typed.PersistenceId
-import akka.persistence.typed.scaladsl.Effect
-import akka.persistence.typed.scaladsl.EventSourcedBehavior
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.TestSuite
-import org.scalatest.concurrent.Eventually
-import org.scalatest.concurrent.PatienceConfiguration
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.time.Span
-import org.scalatest.wordspec.AnyWordSpecLike
-
-object IntegrationSpec {
- val config: Config = ConfigFactory.parseString(s"""
- akka.cluster {
- seed-nodes = []
- }
-
- akka.persistence.cassandra {
- events-by-tag {
- eventual-consistency-delay = 200ms
- }
-
- query {
- refresh-interval = 500 ms
- }
-
- journal.keyspace-autocreate = on
- journal.tables-autocreate = on
- snapshot.keyspace-autocreate = on
- snapshot.tables-autocreate = on
- }
- datastax-java-driver {
- basic.contact-points = ["127.0.0.1:19042"]
- basic.load-balancing-policy.local-datacenter = "datacenter1"
- }
-
- event-processor {
- keep-alive-interval = 1 seconds
- }
- akka.loglevel = DEBUG
- akka.actor.testkit.typed.single-expect-default = 5s
- # For LoggingTestKit
- akka.actor.testkit.typed.filter-leeway = 5s
- akka.actor.testkit.typed.throw-on-shutdown-timeout = off
- """).withFallback(ConfigFactory.load())
-}
-
-class IntegrationSpec
- extends TestSuite
- with Matchers
- with BeforeAndAfterAll
- with AnyWordSpecLike
- with ScalaFutures
- with Eventually {
-
- implicit private val patience: PatienceConfig =
- PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis))
-
- private val databaseDirectory = new File("target/cassandra-IntegrationSpec")
-
- private def roleConfig(role: String): Config =
- ConfigFactory.parseString(s"akka.cluster.roles = [$role]")
-
- // one TestKit (ActorSystem) per cluster node
- private val testKit1 = ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config))
- private val testKit2 =
- ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config))
- private val testKit3 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config))
- private val testKit4 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config))
-
- private val systems3 = List(testKit1.system, testKit2.system, testKit3.system)
-
- override protected def beforeAll(): Unit = {
- CassandraLauncher.start(
- databaseDirectory,
- CassandraLauncher.DefaultTestConfigResource,
- clean = true,
- port = 19042, // default is 9042, but use different for test
- CassandraLauncher.classpathForResources("logback-test.xml"))
-
- // avoid concurrent creation of keyspace and tables
- initializePersistence()
- Main.createTables(testKit1.system)
-
- super.beforeAll()
- }
-
- private def initializePersistence(): Unit = {
- val timeout = 10.seconds
- val done = PersistenceInit.initializeDefaultPlugins(testKit1.system, timeout)
- Await.result(done, timeout)
- }
-
- override protected def afterAll(): Unit = {
- super.afterAll()
-
- testKit4.shutdownTestKit()
- testKit3.shutdownTestKit()
- testKit2.shutdownTestKit()
- testKit1.shutdownTestKit()
-
- CassandraLauncher.stop()
- FileUtils.deleteDirectory(databaseDirectory)
- }
-
- "Shopping Cart application" should {
- "init and join Cluster" in {
- testKit1.spawn[Nothing](Guardian(), "guardian")
- testKit2.spawn[Nothing](Guardian(), "guardian")
- testKit3.spawn[Nothing](Guardian(), "guardian")
- // node4 is initialized and joining later
-
- systems3.foreach { sys =>
- Cluster(sys).manager ! Join(Cluster(testKit1.system).selfMember.address)
- }
-
- // let the nodes join and become Up
- eventually(PatienceConfiguration.Timeout(10.seconds)) {
- systems3.foreach { sys =>
- Cluster(sys).selfMember.status should ===(MemberStatus.Up)
- }
- }
- }
-
- "update and consume from different nodes" in {
- val cart1 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-1")
- val probe1 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
- val cart2 = ClusterSharding(testKit2.system).entityRefFor(ShoppingCart.EntityKey, "cart-2")
- val probe2 = testKit2.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
- val eventProbe3 = testKit3.createTestProbe[ShoppingCart.Event]()
- testKit3.system.eventStream ! EventStream.Subscribe(eventProbe3.ref)
-
- // update from node1, consume event from node3
- cart1 ! ShoppingCart.AddItem("foo", 42, probe1.ref)
- probe1.receiveMessage().isSuccess should ===(true)
- eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-1", "foo", 42))
-
- // update from node2, consume event from node3
- cart2 ! ShoppingCart.AddItem("bar", 17, probe2.ref)
- probe2.receiveMessage().isSuccess should ===(true)
- cart2 ! ShoppingCart.AdjustItemQuantity("bar", 18, probe2.ref)
- probe2.receiveMessage().isSuccess should ===(true)
- eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-2", "bar", 17))
- eventProbe3.expectMessage(ShoppingCart.ItemQuantityAdjusted("cart-2", "bar", 18))
- }
-
- "continue even processing from offset" in {
- // give it time to write the offset before shutting down
- Thread.sleep(1000)
- testKit3.shutdownTestKit()
-
- val eventProbe4 = testKit4.createTestProbe[ShoppingCart.Event]()
- testKit4.system.eventStream ! EventStream.Subscribe(eventProbe4.ref)
-
- testKit4.spawn[Nothing](Guardian(), "guardian")
-
- Cluster(testKit4.system).manager ! Join(Cluster(testKit1.system).selfMember.address)
-
- // let the node join and become Up
- eventually(PatienceConfiguration.Timeout(10.seconds)) {
- Cluster(testKit4.system).selfMember.status should ===(MemberStatus.Up)
- }
-
- val cart3 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-3")
- val probe3 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
- // update from node1, consume event from node4
- cart3 ! ShoppingCart.AddItem("abc", 43, probe3.ref)
- probe3.receiveMessage().isSuccess should ===(true)
- // note that node4 is new, but continues reading from previous offset, i.e. not receiving events
- // that have already been consumed
- eventProbe4.expectMessage(ShoppingCart.ItemAdded("cart-3", "abc", 43))
- }
-
- }
-}
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala
deleted file mode 100644
index 5932ea9..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-
-import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import akka.actor.typed.eventstream.EventStream
-import akka.pattern.StatusReply
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.projection.testkit.scaladsl.ProjectionTestKit
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.wordspec.AnyWordSpecLike
-
-object ProjectionSpec {
- def config =
- ConfigFactory.parseString("""
- akka.actor.provider=local
- akka.persistence.cassandra {
- events-by-tag {
- eventual-consistency-delay = 200ms
- }
-
- query {
- refresh-interval = 500 ms
- }
-
- journal.keyspace-autocreate = on
- journal.tables-autocreate = on
- snapshot.keyspace-autocreate = on
- snapshot.tables-autocreate = on
- }
- datastax-java-driver {
- basic.contact-points = ["127.0.0.1:19042"]
- basic.load-balancing-policy.local-datacenter = "datacenter1"
- }
- """).withFallback(ConfigFactory.load()) // re-use application.conf other settings
-}
-
-class ProjectionSpec
- extends ScalaTestWithActorTestKit(ProjectionSpec.config)
- with AnyWordSpecLike
- with BeforeAndAfterAll {
- val projectionTestKit = ProjectionTestKit(testKit)
- val settings = EventProcessorSettings(system)
-
- val databaseDirectory = new File("target/cassandra-ProjectionSpec")
-
- override protected def beforeAll(): Unit = {
- CassandraLauncher.start(
- databaseDirectory,
- CassandraLauncher.DefaultTestConfigResource,
- clean = true,
- port = 19042, // default is 9042, but use different for test
- CassandraLauncher.classpathForResources("logback-test.xml"))
-
- Main.createTables(system)
-
- super.beforeAll()
- }
-
- override protected def afterAll(): Unit = {
- super.afterAll()
- CassandraLauncher.stop()
- FileUtils.deleteDirectory(databaseDirectory)
- }
-
- "The events from the Shopping Cart" should {
-
- "be published to the system event stream by the projection" in {
- val cartProbe = createTestProbe[Any]()
- val cart = spawn(ShoppingCart("cart-1", Set(s"${settings.tagPrefix}-0")))
- cart ! ShoppingCart.AddItem("25", 12, cartProbe.ref)
- cartProbe.expectMessageType[StatusReply[ShoppingCart.Summary]].isSuccess should ===(true)
-
- val eventProbe = createTestProbe[ShoppingCart.Event]()
- system.eventStream ! EventStream.Subscribe(eventProbe.ref)
- projectionTestKit.run(Guardian.createProjectionFor(system, settings, 0)) {
- val added = eventProbe.expectMessageType[ShoppingCart.ItemAdded]
- added.cartId should ===("cart-1")
- added.itemId should ===("25")
- added.quantity should ===(12)
- }
- }
- }
-
-}
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala
deleted file mode 100644
index 898f923..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package sample.cqrs
-
-import java.util.UUID
-
-import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import akka.pattern.StatusReply
-import org.scalatest.wordspec.AnyWordSpecLike
-
-class ShoppingCartSpec extends ScalaTestWithActorTestKit(s"""
- akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
- akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
- akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
- """) with AnyWordSpecLike {
-
- private var counter = 0
- def newCartId(): String = {
- counter += 1
- s"cart-$counter"
- }
-
- "The Shopping Cart" should {
-
- "add item" in {
- val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
- val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
- cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)))
- }
-
- "reject already added item" in {
- val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
- val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
- cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should ===(true)
- cart ! ShoppingCart.AddItem("foo", 13, probe.ref)
- probe.receiveMessage().isError should ===(true)
- }
-
- "remove item" in {
- val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
- val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
- cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should ===(true)
- cart ! ShoppingCart.RemoveItem("foo", probe.ref)
- probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map.empty, checkedOut = false)))
- }
-
- "adjust quantity" in {
- val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
- val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
- cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should ===(true)
- cart ! ShoppingCart.AdjustItemQuantity("foo", 43, probe.ref)
- probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false)))
- }
-
- "checkout" in {
- val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
- val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
- cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should ===(true)
- cart ! ShoppingCart.Checkout(probe.ref)
- probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true)))
-
- cart ! ShoppingCart.AddItem("bar", 13, probe.ref)
- probe.receiveMessage().isError should ===(true)
- }
-
- "keep its state" in {
- val cartId = newCartId()
- val cart = testKit.spawn(ShoppingCart(cartId, Set.empty))
- val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
- cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)))
-
- testKit.stop(cart)
-
- // start again with same cartId
- val restartedCart = testKit.spawn(ShoppingCart(cartId, Set.empty))
- val stateProbe = testKit.createTestProbe[ShoppingCart.Summary]
- restartedCart ! ShoppingCart.Get(stateProbe.ref)
- stateProbe.expectMessage(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))
- }
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org