You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:36 UTC

[incubator-pekko-samples] 01/01: Basic testing

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-chbatey-cqrs-testing
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 3cf2ab6fd4bf3850288119f1256b7bf8a6877e57
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Fri Oct 2 15:10:52 2020 +0100

    Basic testing
---
 akka-sample-cqrs-scala/README.md                   |  66 +------
 akka-sample-cqrs-scala/build.sbt                   |  14 +-
 akka-sample-cqrs-scala/projection.sql              |  21 ++
 .../src/main/resources/application.conf            |  43 +++--
 .../projection/testing}/CborSerializable.scala     |   2 +-
 .../testing/ConfigurablePersistentActor.scala      |  47 +++++
 .../testing}/EventProcessorSettings.scala          |   7 +-
 .../scala/akka/projection/testing/Guardian.scala   |  87 +++++++++
 .../akka/projection/testing/HikariFactory.scala    |   9 +
 .../projection/testing/HikariJdbcSession.scala     |  23 +++
 .../projection/testing/HttpServer.scala}           |  15 +-
 .../akka/projection/testing/LoadGeneration.scala   |  85 ++++++++
 .../main/scala/akka/projection/testing/Main.scala  |  33 ++++
 .../projection/testing/ProjectionHandler.scala     |  19 ++
 .../scala/akka/projection/testing/TestRoutes.scala |  30 +++
 .../akka/projection/testing/TestValidation.scala   |  45 +++++
 .../src/main/scala/sample/cqrs/Main.scala          | 153 ---------------
 .../src/main/scala/sample/cqrs/ShoppingCart.scala  | 215 ---------------------
 .../cqrs/ShoppingCartProjectionHandler.scala       |  27 ---
 .../scala/sample/cqrs/ShoppingCartRoutes.scala     | 110 -----------
 .../src/test/resources/logback-test.xml            |  17 --
 .../test/scala/sample/cqrs/IntegrationSpec.scala   | 198 -------------------
 .../test/scala/sample/cqrs/ProjectionSpec.scala    |  87 ---------
 .../test/scala/sample/cqrs/ShoppingCartSpec.scala  |  86 ---------
 24 files changed, 450 insertions(+), 989 deletions(-)

diff --git a/akka-sample-cqrs-scala/README.md b/akka-sample-cqrs-scala/README.md
index 77ec65b..52ee6d2 100644
--- a/akka-sample-cqrs-scala/README.md
+++ b/akka-sample-cqrs-scala/README.md
@@ -1,66 +1,2 @@
-This tutorial contains a sample illustrating an CQRS design with [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html), [Akka Cluster Singleton](https://doc.akka.io/docs/akka/2.6/typed/cluster-singleton.html), [Akka Persistence](https://doc.akka.io/docs/akka/2.6/typed/persistence.html) and [Akka Persistence Query](https://doc.akka.io/docs/akka/2.6/persistence-query.html).
+# Projection latency testbed
 
-## Overview
-
-This sample application implements a CQRS-ES design that will side-effect in the read model on selected events persisted to Cassandra by the write model. In this sample, the side-effect is logging a line. 
-A more practical example would be to send a message to a Kafka topic or update a relational database.
-
-## Write model
-
-The write model is a shopping cart.
-
-The implementation is based on a sharded actor: each `ShoppingCart` is an [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html) entity. The entity actor `ShoppingCart` is an [EventSourcedBehavior](https://doc.akka.io/docs/akka/2.6/typed/persistence.html).
-
-Events from the shopping carts are tagged and consumed by the read model.
-
-## Read model
-
-The read model is implemented in such a way that 'load' is sharded over a number of processors. This number is `event-processor.parallelism`.
-This is implemented using [Akka Projections](https://doc.akka.io/docs/akka-projection/current) which is then running on top of
- [Sharded Daemon Process](https://doc.akka.io/docs/akka/current/typed/cluster-sharded-daemon-process.html).
-
-
-## Running the sample code
-
-1. Start a Cassandra server by running:
-
-```
-sbt "runMain sample.cqrs.Main cassandra"
-```
-
-2. Start a node that runs the write model:
-
-```
-sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2551"
-```
-
-3. Start a node that runs the read model:
-
-```
-sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2552"
-```
-
-4. More write or read nodes can be started started by defining roles and port:
-
-```
-sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2553"
-sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2554"
-```
-
-Try it with curl:
-
-```
-# add item to cart
-curl -X POST -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' http://127.0.0.1:8051/shopping/carts
-
-# get cart
-curl http://127.0.0.1:8051/shopping/carts/cart1
-
-# update quantity of item
-curl -X PUT -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' http://127.0.0.1:8051/shopping/carts
-
-# check out cart
-curl -X POST -H "Content-Type: application/json" -d '{}' http://127.0.0.1:8051/shopping/carts/cart1/checkout
-```
-
-or same `curl` commands to port 8052.
diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 43cb70d..a67e25b 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,12 +1,12 @@
-val AkkaVersion = "2.6.8"
+val AkkaVersion = "2.6.9"
 val AkkaPersistenceCassandraVersion = "1.0.1"
 val AkkaHttpVersion = "10.2.0"
-val AkkaProjectionVersion = "0.3"
+val AkkaProjectionVersion = "1.0.0"
 
-lazy val `akka-sample-cqrs-scala` = project
+lazy val `akka-projection-testing` = project
   .in(file("."))
   .settings(
-    organization := "com.lightbend.akka.samples",
+    organization := "akka.projection.testing",
     version := "1.0",
     scalaVersion := "2.13.1",
     scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"),
@@ -20,19 +20,23 @@ lazy val `akka-sample-cqrs-scala` = project
         "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % AkkaPersistenceCassandraVersion,
         "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion,
         "com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionVersion,
+        "com.lightbend.akka" %% "akka-projection-jdbc" % AkkaProjectionVersion,
+        "com.zaxxer" % "HikariCP" % "3.4.5",
         "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
         "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
         "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
         "ch.qos.logback" % "logback-classic" % "1.2.3",
+        "org.postgresql" % "postgresql" % "42.2.14",
         "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
         "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test,
         "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
         "com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test,
+
         "org.scalatest" %% "scalatest" % "3.1.0" % Test,
         "commons-io" % "commons-io" % "2.4" % Test),
     fork in run := false,
     Global / cancelable := false, // ctrl-c
-    mainClass in (Compile, run) := Some("sample.cqrs.Main"),
+    mainClass in (Compile, run) := Some("akka.projection.testing.Main"),
     // disable parallel tests
     parallelExecution in Test := false,
     // show full stack traces and test case durations
diff --git a/akka-sample-cqrs-scala/projection.sql b/akka-sample-cqrs-scala/projection.sql
new file mode 100644
index 0000000..154a945
--- /dev/null
+++ b/akka-sample-cqrs-scala/projection.sql
@@ -0,0 +1,21 @@
+
+create table if not exists "AKKA_PROJECTION_OFFSET_STORE" (
+  "PROJECTION_NAME" VARCHAR(255) NOT NULL,
+  "PROJECTION_KEY" VARCHAR(255) NOT NULL,
+  "CURRENT_OFFSET" VARCHAR(255) NOT NULL,
+  "MANIFEST" VARCHAR(4) NOT NULL,
+  "MERGEABLE" BOOLEAN NOT NULL,
+  "LAST_UPDATED" BIGINT NOT NULL,
+  constraint "PK_PROJECTION_ID" primary key ("PROJECTION_NAME","PROJECTION_KEY")
+);
+
+create index if not exists "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME");
+
+create table if not exists events (
+    name varchar(256),
+    event varchar(256),
+    constraint pkey primary key (name, event)
+);
+
+
+
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 9bcb675..da92bcf 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -1,11 +1,29 @@
 akka {
+
+  projection {
+    jdbc {
+      dialect = "postgres-dialect"
+      offset-store {
+        schema = ""
+        table = "AKKA_PROJECTION_OFFSET_STORE"
+      }
+      blocking-jdbc-dispatcher {
+        type = Dispatcher
+        executor = "thread-pool-executor"
+        thread-pool-executor {
+          fixed-pool-size = 10
+        }
+      }
+    }
+  }
+
   loglevel = DEBUG
 
   actor {
     provider = cluster
 
     serialization-bindings {
-      "sample.cqrs.CborSerializable" = jackson-cbor
+      "akka.projection.testing.CborSerializable" = jackson-cbor
     }
   }
 
@@ -18,8 +36,8 @@ akka {
 
   cluster {
     seed-nodes = [
-      "akka://Shopping@127.0.0.1:2551",
-      "akka://Shopping@127.0.0.1:2552"
+      "akka://test@127.0.0.1:2551",
+      "akka://test@127.0.0.1:2552"
     ]
 
     roles = ["write-model", "read-model"]
@@ -37,13 +55,17 @@ akka {
 
 # Configuration for akka-persistence-cassandra
 akka.persistence.cassandra {
+  journal {
+    keyspace = "akka_testing"
+  }
+
   events-by-tag {
-    bucket-size = "Day"
+    bucket-size = "Hour"
     # for reduced latency
     eventual-consistency-delay = 200ms
     flush-interval = 50ms
     pubsub-notification = on
-    first-time-bucket = "20200115T00:00"
+    first-time-bucket = "20201001T00:00"
   }
 
   query {
@@ -61,14 +83,13 @@ datastax-java-driver {
   advanced.reconnect-on-init = on
 }
 
-akka.projection.cassandra.offset-store.keyspace = "akka_cqrs_sample"
-
 event-processor {
-  tag-prefix = "carts-slice"       // even processor tag prefix
-  parallelism = 4                  // number of event processors
+  parallelism = 4
+}
+
+test {
+
 }
 
-shopping.http.port = 0
-shopping.askTimeout = 5 s
 
 
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
similarity index 72%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
index cbec29e..a3a147e 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
@@ -1,4 +1,4 @@
-package sample.cqrs
+package akka.projection.testing
 
 /**
  * Marker trait for serialization with Jackson CBOR
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala
new file mode 100644
index 0000000..579d208
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala
@@ -0,0 +1,47 @@
+package akka.projection.testing
+
+import akka.Done
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.pattern.StatusReply
+import akka.persistence.typed.PersistenceId
+import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
+
+object ConfigurablePersistentActor {
+
+  val Key: EntityTypeKey[Command] = EntityTypeKey[Command]("configurable")
+
+  def init(settings: EventProcessorSettings, system: ActorSystem[_]): ActorRef[ShardingEnvelope[Command]] = {
+    ClusterSharding(system).init(Entity(Key)(ctx => apply(settings, ctx.entityId))
+      .withRole("write-model"))
+  }
+
+  trait Command
+
+  final case class PersistAndAck(toPersist: String, replyTo: ActorRef[StatusReply[Done]], testName: String) extends Command
+
+  final case class Persist(toPersist: String, testName: String) extends Command
+
+  final case class Event(testName: String, payload: String, timeCreated: Long = System.currentTimeMillis()) extends CborSerializable
+
+  final case class State(eventsProcessed: Long) extends CborSerializable
+
+  def apply(settings: EventProcessorSettings, persistenceId: String): Behavior[Command] =
+    Behaviors.setup { ctx =>
+      EventSourcedBehavior[Command, Event, State](
+        persistenceId = PersistenceId.ofUniqueId(persistenceId),
+        State(0),
+        (_, command) => command match {
+          case Persist(toPersist, testName) =>
+            Effect.persist(Event(testName, toPersist))
+          case PersistAndAck(toPersist, ack, testName) =>
+            ctx.log.info("persisting event {}", command)
+            Effect.persist(Event(testName, toPersist)).thenRun(_ => ack ! StatusReply.ack())
+        },
+        (state, _) => state.copy(eventsProcessed = state.eventsProcessed + 1)).withTagger(event =>
+        Set("tag-" + math.abs(event.hashCode() % settings.parallelism)))
+    }
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
similarity index 63%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
index 774a221..69430ec 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
@@ -1,4 +1,4 @@
-package sample.cqrs
+package akka.projection.testing
 
 import akka.actor.typed.ActorSystem
 import com.typesafe.config.Config
@@ -10,10 +10,9 @@ object EventProcessorSettings {
   }
 
   def apply(config: Config): EventProcessorSettings = {
-    val tagPrefix: String = config.getString("tag-prefix")
     val parallelism: Int = config.getInt("parallelism")
-    EventProcessorSettings(tagPrefix, parallelism)
+    EventProcessorSettings(parallelism)
   }
 }
 
-final case class EventProcessorSettings(tagPrefix: String, parallelism: Int)
+final case class EventProcessorSettings(parallelism: Int)
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala
new file mode 100644
index 0000000..c662600
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala
@@ -0,0 +1,87 @@
+package akka.projection.testing
+
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
+import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardedDaemonProcessSettings, ShardingEnvelope}
+import akka.cluster.typed.Cluster
+import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
+import akka.persistence.query.Offset
+import akka.projection.eventsourced.EventEnvelope
+import akka.projection.eventsourced.scaladsl.EventSourcedProvider
+import akka.projection.jdbc.scaladsl.JdbcProjection
+import akka.projection.scaladsl.ExactlyOnceProjection
+import akka.projection.testing.LoadGeneration.{Result, RunTest}
+import akka.projection.{ProjectionBehavior, ProjectionId}
+import akka.util.Timeout
+import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
+
+import scala.concurrent.duration.DurationInt
+import scala.util.{Failure, Success}
+
+object Guardian {
+
+  def createProjectionFor(
+                           settings: EventProcessorSettings,
+                           index: Int,
+                           factory: HikariFactory
+                         )(implicit system: ActorSystem[_]): ExactlyOnceProjection[Offset, EventEnvelope[ConfigurablePersistentActor.Event]] = {
+    val tag = s"tag-$index"
+    val sourceProvider = EventSourcedProvider.eventsByTag[ConfigurablePersistentActor.Event](
+      system = system,
+      readJournalPluginId = CassandraReadJournal.Identifier,
+      tag = tag)
+    JdbcProjection.exactlyOnce(
+      projectionId = ProjectionId("test-projection-id", tag),
+      sourceProvider,
+      () => factory.newSession(),
+      () => new ProjectionHandler(tag, system)
+    )
+  }
+
+  def apply(): Behavior[String] = {
+    Behaviors.setup[String] { context =>
+      implicit val system: ActorSystem[_] = context.system
+      val config = new HikariConfig
+      config.setJdbcUrl("jdbc:postgresql://127.0.0.1:5432/")
+      config.setUsername("docker")
+      config.setPassword("docker")
+      config.setMaximumPoolSize(19)
+      config.setAutoCommit(false)
+      val dataSource = new HikariDataSource(config)
+      val settings = EventProcessorSettings(system)
+      val shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]] = ConfigurablePersistentActor.init(settings, system)
+      if (Cluster(system).selfMember.hasRole("read-model")) {
+
+        val dbSessionFactory = new HikariFactory(dataSource)
+
+        // we only want to run the daemon processes on the read-model nodes
+        val shardingSettings = ClusterShardingSettings(system)
+        val shardedDaemonProcessSettings =
+          ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model"))
+
+        ShardedDaemonProcess(system).init(
+          name = "test-projection",
+          settings.parallelism,
+          n => ProjectionBehavior(createProjectionFor(settings, n, dbSessionFactory)),
+          shardedDaemonProcessSettings,
+          Some(ProjectionBehavior.Stop))
+      }
+
+      // TODO move to route
+      implicit val timeout: Timeout = 10.seconds
+      val loadGeneration: ActorRef[LoadGeneration.RunTest] = context.spawn(LoadGeneration(shardRegion, dataSource), "load-generation")
+      context.ask[RunTest, Result](loadGeneration, replyTo => LoadGeneration.RunTest(s"test-${System.currentTimeMillis()}", 2, 5, replyTo)) {
+        case Success(value) =>
+          context.log.info("Test passed {}", value)
+          "success"
+        case Failure(t) =>
+          context.log.error("Test failed",t )
+          "failure"
+      }
+
+
+      Behaviors.empty
+    }
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala
new file mode 100644
index 0000000..742a44b
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala
@@ -0,0 +1,9 @@
+package akka.projection.testing
+
+import javax.sql.DataSource
+
+class HikariFactory(val dataSource: DataSource) {
+  def newSession(): HikariJdbcSession = {
+    new HikariJdbcSession(dataSource)
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala
new file mode 100644
index 0000000..057b8d1
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala
@@ -0,0 +1,23 @@
+package akka.projection.testing
+
+import java.sql.Connection
+
+import akka.japi.function
+import akka.projection.jdbc.JdbcSession
+import javax.sql.DataSource
+
+
+
+class HikariJdbcSession(source: DataSource) extends JdbcSession {
+
+  private val connection = source.getConnection
+
+  override def withConnection[Result](func: function.Function[Connection, Result]): Result =
+    func(connection)
+
+  override def commit(): Unit = connection.commit()
+
+  override def rollback(): Unit = connection.rollback()
+
+  override def close(): Unit = connection.close()
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
similarity index 59%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
index 4a9c313..2444cee 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
@@ -1,18 +1,13 @@
-package sample.cqrs
+package akka.projection.testing
 
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-
-import akka.actor.CoordinatedShutdown
 import akka.actor.typed.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.server.Route
-import akka.Done
 
-class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) {
-  private val shutdown = CoordinatedShutdown(system)
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
 
+class HttpServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) {
   import system.executionContext
 
   def start(): Unit = {
@@ -21,7 +16,7 @@ class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[
       .onComplete {
       case Success(binding) =>
         val address = binding.localAddress
-        system.log.info("Shopping online at http://{}:{}/", address.getHostString, address.getPort)
+        system.log.info("Online at http://{}:{}/", address.getHostString, address.getPort)
 
       case Failure(ex) =>
         system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala
new file mode 100644
index 0000000..35fb682
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala
@@ -0,0 +1,85 @@
+package akka.projection.testing
+
+import akka.{Done, NotUsed}
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, Behavior, Terminated}
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.pattern.StatusReply
+import akka.projection.testing.LoadGeneration.{Failed, Result, RunTest}
+import akka.projection.testing.LoadTest.Start
+import akka.stream.scaladsl.Source
+import akka.util.Timeout
+import javax.sql.DataSource
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.DurationInt
+import scala.util.{Failure, Success}
+
+object LoadGeneration {
+
+  case class RunTest(name: String, actors: Int, eventsPerActor: Int, reply: ActorRef[Result])
+
+  sealed trait Result
+
+  case class Pass() extends Result
+
+  case class Failed(t: Option[Throwable], expected: Int, got: Int) extends Result
+
+  def apply(shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[RunTest] = Behaviors.setup { ctx =>
+    Behaviors.receiveMessage[RunTest] {
+      case rt@RunTest(name, actors, eventsPerActor, reply) =>
+        ctx.spawn(LoadTest(name, shardRegion, source), s"test-$name") ! Start(rt)
+        Behaviors.same
+    }
+  }
+
+}
+
+object LoadTest {
+
+  sealed trait Command
+
+  case class Start(test: RunTest) extends Command
+
+  private case class StartValidation() extends Command
+
+  private case class LoadGenerationFailed(t: Throwable) extends Command
+
+  def apply(testName: String, shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[Command] = Behaviors.setup { ctx =>
+    import akka.actor.typed.scaladsl.AskPattern._
+    implicit val timeout: Timeout = 30.seconds
+    implicit val system = ctx.system
+    implicit val ec: ExecutionContext = system.executionContext
+    Behaviors.receiveMessage[Command] {
+      case Start(RunTest(_, actors, eventsPerActor, replyTo)) =>
+        ctx.log.info("Starting load generation")
+        val expected = actors * eventsPerActor
+        val testRun: Source[StatusReply[Done], NotUsed] = Source(1 to actors)
+          .flatMapConcat(id =>
+            Source(1 to eventsPerActor)
+              .mapAsync(1)(message => shardRegion.ask[StatusReply[Done]] { replyTo =>
+                ShardingEnvelope(s"$id", ConfigurablePersistentActor.PersistAndAck(s"actor-$id-message-$message", replyTo, testName))
+              }))
+        ctx.pipeToSelf(testRun.run()) {
+          case Success(_) => StartValidation()
+          case Failure(t) => LoadGenerationFailed(t)
+        }
+        Behaviors.receiveMessage[Command] {
+          case StartValidation() =>
+            ctx.log.info("Starting validation")
+            val validation = ctx.spawn(TestValidation(replyTo, testName, expected, source: DataSource), s"TestValidation=$testName")
+            ctx.watch(validation)
+            Behaviors.same
+          case LoadGenerationFailed(t) =>
+            ctx.log.error("Load generation failed", t)
+            replyTo ! Failed(Some(t), -1, -1)
+            Behaviors.stopped
+        }.receiveSignal {
+          case (ctx, Terminated(_)) =>
+            ctx.log.info("Validation finished, terminating")
+            Behaviors.stopped
+        }
+    }
+  }
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala
new file mode 100644
index 0000000..09690e3
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala
@@ -0,0 +1,33 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import com.typesafe.config.{Config, ConfigFactory}
+
+object Main {
+
+  def main(args: Array[String]): Unit = {
+    args.headOption match {
+
+      case Some(portString) if portString.matches("""\d+""") =>
+        val port = portString.toInt
+        val httpPort = ("80" + portString.takeRight(2)).toInt
+        startNode(port, httpPort)
+
+      case None =>
+        throw new IllegalArgumentException("port number required argument")
+    }
+  }
+
+  def startNode(port: Int, httpPort: Int): Unit = {
+    ActorSystem[String](Guardian(), "test", config(port, httpPort))
+
+  }
+
+  def config(port: Int, httpPort: Int): Config =
+    ConfigFactory.parseString(
+      s"""
+      akka.remote.artery.canonical.port = $port
+      test.http.port = $httpPort
+       """).withFallback(ConfigFactory.load())
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala
new file mode 100644
index 0000000..bf8e7a0
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala
@@ -0,0 +1,19 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import akka.projection.eventsourced.EventEnvelope
+import akka.projection.jdbc.scaladsl.JdbcHandler
+import org.slf4j.{Logger, LoggerFactory}
+
+class ProjectionHandler(tag: String, system: ActorSystem[_])
+    extends JdbcHandler[EventEnvelope[ConfigurablePersistentActor.Event], HikariJdbcSession] {
+  private val log: Logger = LoggerFactory.getLogger(getClass)
+
+  override def process(session: HikariJdbcSession, envelope: EventEnvelope[ConfigurablePersistentActor.Event]): Unit = {
+    log.info("Event {} for tag {}", envelope.event.payload, tag)
+    session.withConnection(connection =>
+      connection.createStatement()
+        .execute(s"insert into events(name, event) values ('${envelope.event.testName}','${envelope.event.payload}')")
+    )
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala
new file mode 100644
index 0000000..a1b40ce
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala
@@ -0,0 +1,30 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import spray.json.DefaultJsonProtocol._
+import spray.json.RootJsonFormat
+
+object TestRoutes {
+  case class RunTest(name: String, nrActors: Long, messagesPerActor: Long)
+  case class TestResult(pass: Boolean, expected: Long, got: Long)
+
+  implicit val runTestFormat: RootJsonFormat[RunTest] = jsonFormat3(RunTest)
+  implicit val testResultFormat: RootJsonFormat[TestResult] = jsonFormat3(TestResult)
+}
+
+//class TestRoutes()(implicit val system: ActorSystem[_]) {
+//  import TestRoutes._
+//  val route: Route = path("test") {
+//    post {
+//      entity(as[RunTest]) { runTest =>
+//
+//
+//      }
+//    }
+//  }
+//
+//}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala
new file mode 100644
index 0000000..0976a82
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala
@@ -0,0 +1,45 @@
+package akka.projection.testing
+
+import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.scaladsl.Behaviors
+import akka.projection.testing.LoadGeneration.{Pass, Result}
+import javax.sql.DataSource
+
+object TestValidation {
+  // FIXME blocking, dispatcher
+  // FIXME timeout
+  def apply(replyTo: ActorRef[Result], testName: String, expectedNrEvents: Long, source: DataSource): Behavior[String] = {
+    import scala.concurrent.duration._
+    Behaviors.setup { ctx =>
+      def validate(): Boolean = {
+        val connection = source.getConnection
+        try {
+          val resultSet = connection.createStatement().executeQuery(s"select count(*) from events where name = '$testName'")
+          if (resultSet.next()) {
+            val count = resultSet.getInt("count")
+            ctx.log.info("Expected {} got {}!", expectedNrEvents, count)
+            expectedNrEvents == count
+          } else {
+            throw new RuntimeException("Expected single row")
+          }
+        } finally {
+          connection.close()
+        }
+      }
+
+      Behaviors.withTimers { timers =>
+        timers.startTimerAtFixedRate("test", 2.seconds)
+        Behaviors.receiveMessage {
+          case "test" =>
+            if (validate()) {
+              ctx.log.info("Validated. Stopping")
+              replyTo ! Pass()
+              Behaviors.stopped
+            } else {
+              Behaviors.same
+            }
+        }
+      }
+    }
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala
deleted file mode 100644
index 795d407..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-import java.util.concurrent.CountDownLatch
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.typed.{ ClusterShardingSettings, ShardedDaemonProcessSettings }
-import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
-import akka.cluster.typed.Cluster
-import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.persistence.query.Offset
-import akka.projection.{ ProjectionBehavior, ProjectionId }
-import akka.projection.scaladsl.AtLeastOnceProjection
-import akka.projection.cassandra.scaladsl.CassandraProjection
-import akka.projection.eventsourced.EventEnvelope
-import akka.projection.eventsourced.scaladsl.EventSourcedProvider
-import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-
-object Main {
-
-  def main(args: Array[String]): Unit = {
-    args.headOption match {
-
-      case Some(portString) if portString.matches("""\d+""") =>
-        val port = portString.toInt
-        val httpPort = ("80" + portString.takeRight(2)).toInt
-        startNode(port, httpPort)
-
-      case Some("cassandra") =>
-        startCassandraDatabase()
-        println("Started Cassandra, press Ctrl + C to kill")
-        new CountDownLatch(1).await()
-
-      case None =>
-        throw new IllegalArgumentException("port number, or cassandra required argument")
-    }
-  }
-
-  def startNode(port: Int, httpPort: Int): Unit = {
-    val system =
-      ActorSystem[Nothing](Guardian(), "Shopping", config(port, httpPort))
-
-    if (Cluster(system).selfMember.hasRole("read-model"))
-      createTables(system)
-  }
-
-  def config(port: Int, httpPort: Int): Config =
-    ConfigFactory.parseString(s"""
-      akka.remote.artery.canonical.port = $port
-      shopping.http.port = $httpPort
-       """).withFallback(ConfigFactory.load())
-
-  /**
-   * To make the sample easier to run we kickstart a Cassandra instance to
-   * act as the journal. Cassandra is a great choice of backend for Akka Persistence but
-   * in a real application a pre-existing Cassandra cluster should be used.
-   */
-  def startCassandraDatabase(): Unit = {
-    val databaseDirectory = new File("target/cassandra-db")
-    CassandraLauncher.start(databaseDirectory, CassandraLauncher.DefaultTestConfigResource, clean = false, port = 9042)
-  }
-
-  def createTables(system: ActorSystem[_]): Unit = {
-    val session =
-      CassandraSessionRegistry(system).sessionFor("alpakka.cassandra")
-
-    // TODO use real replication strategy in real application
-    val keyspaceStmt =
-      """
-      CREATE KEYSPACE IF NOT EXISTS akka_cqrs_sample
-      WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
-      """
-
-    val offsetTableStmt =
-      """
-      CREATE TABLE IF NOT EXISTS akka_cqrs_sample.offset_store (
-        projection_name text,
-        partition int,
-        projection_key text,
-        offset text,
-        manifest text,
-        last_updated timestamp,
-        PRIMARY KEY ((projection_name, partition), projection_key)
-      )
-      """
-
-    // ok to block here, main thread
-    Await.ready(session.executeDDL(keyspaceStmt), 30.seconds)
-    system.log.info("Created akka_cqrs_sample keyspace")
-    Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
-    system.log.info("Created akka_cqrs_sample.offset_store table")
-
-  }
-
-}
-
-object Guardian {
-
-  def createProjectionFor(
-      system: ActorSystem[_],
-      settings: EventProcessorSettings,
-      index: Int): AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = {
-    val tag = s"${settings.tagPrefix}-$index"
-    val sourceProvider = EventSourcedProvider.eventsByTag[ShoppingCart.Event](
-      system = system,
-      readJournalPluginId = CassandraReadJournal.Identifier,
-      tag = tag)
-    CassandraProjection.atLeastOnce(
-      projectionId = ProjectionId("shopping-carts", tag),
-      sourceProvider,
-      handler = () => new ShoppingCartProjectionHandler(tag, system))
-  }
-
-  def apply(): Behavior[Nothing] = {
-    Behaviors.setup[Nothing] { context =>
-      val system = context.system
-
-      val settings = EventProcessorSettings(system)
-
-      val httpPort = context.system.settings.config.getInt("shopping.http.port")
-
-      ShoppingCart.init(system, settings)
-
-      if (Cluster(system).selfMember.hasRole("read-model")) {
-
-        // we only want to run the daemon processes on the read-model nodes
-        val shardingSettings = ClusterShardingSettings(system)
-        val shardedDaemonProcessSettings =
-          ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model"))
-
-        ShardedDaemonProcess(system).init(
-          name = "ShoppingCartProjection",
-          settings.parallelism,
-          n => ProjectionBehavior(createProjectionFor(system, settings, n)),
-          shardedDaemonProcessSettings,
-          Some(ProjectionBehavior.Stop))
-      }
-
-      val routes = new ShoppingCartRoutes()(context.system)
-      new ShoppingCartServer(routes.shopping, httpPort)(context.system).start()
-
-      Behaviors.empty
-    }
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala
deleted file mode 100644
index c800f0b..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala
+++ /dev/null
@@ -1,215 +0,0 @@
-package sample.cqrs
-
-import java.time.Instant
-
-import scala.concurrent.duration._
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
-import akka.actor.typed.SupervisorStrategy
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.sharding.typed.scaladsl.Entity
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.pattern.StatusReply
-import akka.persistence.typed.PersistenceId
-import akka.persistence.typed.scaladsl.RetentionCriteria
-import akka.persistence.typed.scaladsl.Effect
-import akka.persistence.typed.scaladsl.EventSourcedBehavior
-import akka.persistence.typed.scaladsl.ReplyEffect
-
-/**
- * This is an event sourced actor. It has a state, [[ShoppingCart.State]], which
- * stores the current shopping cart items and whether it's checked out.
- *
- * Event sourced actors are interacted with by sending them commands,
- * see classes implementing [[ShoppingCart.Command]].
- *
- * Commands get translated to events, see classes implementing [[ShoppingCart.Event]].
- * It's the events that get persisted by the entity. Each event will have an event handler
- * registered for it, and an event handler updates the current state based on the event.
- * This will be done when the event is first created, and it will also be done when the entity is
- * loaded from the database - each event will be replayed to recreate the state
- * of the entity.
- */
-object ShoppingCart {
-
-  /**
-   * The current state held by the persistent entity.
-   */
-  final case class State(items: Map[String, Int], checkoutDate: Option[Instant]) extends CborSerializable {
-
-    def isCheckedOut: Boolean =
-      checkoutDate.isDefined
-
-    def hasItem(itemId: String): Boolean =
-      items.contains(itemId)
-
-    def isEmpty: Boolean =
-      items.isEmpty
-
-    def updateItem(itemId: String, quantity: Int): State = {
-      quantity match {
-        case 0 => copy(items = items - itemId)
-        case _ => copy(items = items + (itemId -> quantity))
-      }
-    }
-
-    def removeItem(itemId: String): State =
-      copy(items = items - itemId)
-
-    def checkout(now: Instant): State =
-      copy(checkoutDate = Some(now))
-
-    def toSummary: Summary =
-      Summary(items, isCheckedOut)
-  }
-  object State {
-    val empty = State(items = Map.empty, checkoutDate = None)
-  }
-
-  /**
-   * This interface defines all the commands that the ShoppingCart persistent actor supports.
-   */
-  sealed trait Command extends CborSerializable
-
-  /**
-   * A command to add an item to the cart.
-   *
-   * It can reply with `StatusReply[Summary]`, which is sent back to the caller when
-   * all the events emitted by this command are successfully persisted.
-   */
-  final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
-  /**
-   * A command to remove an item from the cart.
-   */
-  final case class RemoveItem(itemId: String, replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
-  /**
-   * A command to adjust the quantity of an item in the cart.
-   */
-  final case class AdjustItemQuantity(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]])
-      extends Command
-
-  /**
-   * A command to checkout the shopping cart.
-   */
-  final case class Checkout(replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
-  /**
-   * A command to get the current state of the shopping cart.
-   */
-  final case class Get(replyTo: ActorRef[Summary]) extends Command
-
-  /**
-   * Summary of the shopping cart state, used in reply messages.
-   */
-  final case class Summary(items: Map[String, Int], checkedOut: Boolean) extends CborSerializable
-
-  /**
-   * This interface defines all the events that the ShoppingCart supports.
-   */
-  sealed trait Event extends CborSerializable {
-    def cartId: String
-  }
-
-  final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event
-
-  final case class ItemRemoved(cartId: String, itemId: String) extends Event
-
-  final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event
-
-  final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
-
-  val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart")
-
-  def init(system: ActorSystem[_], eventProcessorSettings: EventProcessorSettings): Unit = {
-    ClusterSharding(system).init(Entity(EntityKey) { entityContext =>
-      val n = math.abs(entityContext.entityId.hashCode % eventProcessorSettings.parallelism)
-      val eventProcessorTag = eventProcessorSettings.tagPrefix + "-" + n
-      ShoppingCart(entityContext.entityId, Set(eventProcessorTag))
-    }.withRole("write-model"))
-  }
-
-  def apply(cartId: String, eventProcessorTags: Set[String]): Behavior[Command] = {
-    EventSourcedBehavior
-      .withEnforcedReplies[Command, Event, State](
-        PersistenceId(EntityKey.name, cartId),
-        State.empty,
-        (state, command) =>
-          //The shopping cart behavior changes if it's checked out or not.
-          // The commands are handled differently for each case.
-          if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command)
-          else openShoppingCart(cartId, state, command),
-        (state, event) => handleEvent(state, event))
-      .withTagger(_ => eventProcessorTags)
-      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
-      .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
-  }
-
-  private def openShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] =
-    command match {
-      case AddItem(itemId, quantity, replyTo) =>
-        if (state.hasItem(itemId))
-          Effect.reply(replyTo)(StatusReply.Error(s"Item '$itemId' was already added to this shopping cart"))
-        else if (quantity <= 0)
-          Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero"))
-        else
-          Effect
-            .persist(ItemAdded(cartId, itemId, quantity))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-
-      case RemoveItem(itemId, replyTo) =>
-        if (state.hasItem(itemId))
-          Effect
-            .persist(ItemRemoved(cartId, itemId))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-        else
-          Effect.reply(replyTo)(StatusReply.Success(state.toSummary)) // removing an item is idempotent
-
-      case AdjustItemQuantity(itemId, quantity, replyTo) =>
-        if (quantity <= 0)
-          Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero"))
-        else if (state.hasItem(itemId))
-          Effect
-            .persist(ItemQuantityAdjusted(cartId, itemId, quantity))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-        else
-          Effect.reply(replyTo)(
-            StatusReply.Error(s"Cannot adjust quantity for item '$itemId'. Item not present on cart"))
-
-      case Checkout(replyTo) =>
-        if (state.isEmpty)
-          Effect.reply(replyTo)(StatusReply.Error("Cannot checkout an empty shopping cart"))
-        else
-          Effect
-            .persist(CheckedOut(cartId, Instant.now()))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-
-      case Get(replyTo) =>
-        Effect.reply(replyTo)(state.toSummary)
-    }
-
-  private def checkedOutShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] =
-    command match {
-      case Get(replyTo) =>
-        Effect.reply(replyTo)(state.toSummary)
-      case cmd: AddItem =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't add an item to an already checked out shopping cart"))
-      case cmd: RemoveItem =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't remove an item from an already checked out shopping cart"))
-      case cmd: AdjustItemQuantity =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't adjust item on an already checked out shopping cart"))
-      case cmd: Checkout =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't checkout already checked out shopping cart"))
-    }
-
-  private def handleEvent(state: State, event: Event) = {
-    event match {
-      case ItemAdded(_, itemId, quantity)            => state.updateItem(itemId, quantity)
-      case ItemRemoved(_, itemId)                    => state.removeItem(itemId)
-      case ItemQuantityAdjusted(_, itemId, quantity) => state.updateItem(itemId, quantity)
-      case CheckedOut(_, eventTime)                  => state.checkout(eventTime)
-    }
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala
deleted file mode 100644
index cf965a8..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package sample.cqrs
-
-import akka.Done
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.eventstream.EventStream
-import akka.projection.eventsourced.EventEnvelope
-import akka.projection.scaladsl.Handler
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.Future
-
-class ShoppingCartProjectionHandler(tag: String, system: ActorSystem[_])
-    extends Handler[EventEnvelope[ShoppingCart.Event]] {
-  val log = LoggerFactory.getLogger(getClass)
-
-  override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
-
-    log.info(
-      "EventProcessor({}) consumed {} from {} with seqNr {}",
-      tag,
-      envelope.event,
-      envelope.persistenceId,
-      envelope.sequenceNr)
-    system.eventStream ! EventStream.Publish(envelope.event)
-    Future.successful(Done)
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
deleted file mode 100644
index acd5b89..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-package sample.cqrs
-
-import scala.concurrent.Future
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.http.scaladsl.model.StatusCodes
-import akka.http.scaladsl.server.Route
-import akka.pattern.StatusReply
-import akka.util.Timeout
-
-object ShoppingCartRoutes {
-  final case class AddItem(cartId: String, itemId: String, quantity: Int)
-  final case class UpdateItem(cartId: String, itemId: String, quantity: Int)
-}
-
-class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
-
-  implicit private val timeout: Timeout =
-    Timeout.create(system.settings.config.getDuration("shopping.askTimeout"))
-  private val sharding = ClusterSharding(system)
-
-  import ShoppingCartRoutes._
-  import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-  import akka.http.scaladsl.server.Directives._
-  import JsonFormats._
-
-  val shopping: Route =
-    pathPrefix("shopping") {
-      pathPrefix("carts") {
-        concat(
-          post {
-            entity(as[AddItem]) {
-              data =>
-                val entityRef =
-                  sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId)
-                val reply: Future[StatusReply[ShoppingCart.Summary]] =
-                  entityRef.ask(ShoppingCart.AddItem(data.itemId, data.quantity, _))
-                onSuccess(reply) {
-                  case StatusReply.Success(summary: ShoppingCart.Summary) =>
-                    complete(StatusCodes.OK -> summary)
-                  case StatusReply.Error(reason) =>
-                    complete(StatusCodes.BadRequest -> reason)
-                }
-            }
-          },
-          put {
-            entity(as[UpdateItem]) {
-              data =>
-                val entityRef =
-                  sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId)
-
-                def command(replyTo: ActorRef[StatusReply[ShoppingCart.Summary]]) =
-                  if (data.quantity == 0)
-                    ShoppingCart.RemoveItem(data.itemId, replyTo)
-                  else
-                    ShoppingCart.AdjustItemQuantity(data.itemId, data.quantity, replyTo)
-
-                val reply: Future[StatusReply[ShoppingCart.Summary]] =
-                  entityRef.ask(command(_))
-                onSuccess(reply) {
-                  case StatusReply.Success(summary: ShoppingCart.Summary) =>
-                    complete(StatusCodes.OK -> summary)
-                  case StatusReply.Error(reason) =>
-                    complete(StatusCodes.BadRequest -> reason)
-                }
-            }
-          },
-          pathPrefix(Segment) { cartId =>
-            concat(get {
-              val entityRef =
-                sharding.entityRefFor(ShoppingCart.EntityKey, cartId)
-              onSuccess(entityRef.ask(ShoppingCart.Get)) { summary =>
-                if (summary.items.isEmpty) complete(StatusCodes.NotFound)
-                else complete(summary)
-              }
-            }, path("checkout") {
-              post {
-                val entityRef =
-                  sharding.entityRefFor(ShoppingCart.EntityKey, cartId)
-                val reply: Future[StatusReply[ShoppingCart.Summary]] =
-                  entityRef.ask(ShoppingCart.Checkout(_))
-                onSuccess(reply) {
-                  case StatusReply.Success(summary: ShoppingCart.Summary) =>
-                    complete(StatusCodes.OK -> summary)
-                  case StatusReply.Error(reason) =>
-                    complete(StatusCodes.BadRequest -> reason)
-                }
-              }
-            })
-          })
-      }
-    }
-
-}
-
-object JsonFormats {
-
-  import spray.json.RootJsonFormat
-  // import the default encoders for primitive types (Int, String, Lists etc)
-  import spray.json.DefaultJsonProtocol._
-
-  implicit val summaryFormat: RootJsonFormat[ShoppingCart.Summary] =
-    jsonFormat2(ShoppingCart.Summary)
-  implicit val addItemFormat: RootJsonFormat[ShoppingCartRoutes.AddItem] =
-    jsonFormat3(ShoppingCartRoutes.AddItem)
-  implicit val updateItemFormat: RootJsonFormat[ShoppingCartRoutes.UpdateItem] =
-    jsonFormat3(ShoppingCartRoutes.UpdateItem)
-
-}
diff --git a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml b/akka-sample-cqrs-scala/src/test/resources/logback-test.xml
deleted file mode 100644
index 179c899..0000000
--- a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<configuration>
-
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="com.datastax.oss.driver" level="WARN"/>
-    <logger name="org.apache.cassandra" level="ERROR"/>
-    <logger name="com.codahale.metrics" level="INFO"/>
-
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-</configuration>
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala
deleted file mode 100644
index 8f1930a..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-import java.util.UUID
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import akka.actor.testkit.typed.scaladsl.ActorTestKit
-import akka.actor.typed.eventstream.EventStream
-import akka.cluster.MemberStatus
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.typed.Cluster
-import akka.cluster.typed.Join
-import akka.pattern.StatusReply
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.persistence.testkit.scaladsl.PersistenceInit
-import akka.persistence.typed.PersistenceId
-import akka.persistence.typed.scaladsl.Effect
-import akka.persistence.typed.scaladsl.EventSourcedBehavior
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.TestSuite
-import org.scalatest.concurrent.Eventually
-import org.scalatest.concurrent.PatienceConfiguration
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.time.Span
-import org.scalatest.wordspec.AnyWordSpecLike
-
-object IntegrationSpec {
-  val config: Config = ConfigFactory.parseString(s"""
-      akka.cluster {
-         seed-nodes = []
-      }
-      
-      akka.persistence.cassandra {
-        events-by-tag {
-          eventual-consistency-delay = 200ms
-        }
-      
-        query {
-          refresh-interval = 500 ms
-        }
-      
-        journal.keyspace-autocreate = on
-        journal.tables-autocreate = on
-        snapshot.keyspace-autocreate = on
-        snapshot.tables-autocreate = on
-      }
-      datastax-java-driver {
-        basic.contact-points = ["127.0.0.1:19042"]
-        basic.load-balancing-policy.local-datacenter = "datacenter1"
-      }
-      
-      event-processor {
-        keep-alive-interval = 1 seconds
-      }
-      akka.loglevel = DEBUG
-      akka.actor.testkit.typed.single-expect-default = 5s
-      # For LoggingTestKit
-      akka.actor.testkit.typed.filter-leeway = 5s
-      akka.actor.testkit.typed.throw-on-shutdown-timeout = off
-    """).withFallback(ConfigFactory.load())
-}
-
-class IntegrationSpec
-    extends TestSuite
-    with Matchers
-    with BeforeAndAfterAll
-    with AnyWordSpecLike
-    with ScalaFutures
-    with Eventually {
-
-  implicit private val patience: PatienceConfig =
-    PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis))
-
-  private val databaseDirectory = new File("target/cassandra-IntegrationSpec")
-
-  private def roleConfig(role: String): Config =
-    ConfigFactory.parseString(s"akka.cluster.roles = [$role]")
-
-  // one TestKit (ActorSystem) per cluster node
-  private val testKit1 = ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config))
-  private val testKit2 =
-    ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config))
-  private val testKit3 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config))
-  private val testKit4 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config))
-
-  private val systems3 = List(testKit1.system, testKit2.system, testKit3.system)
-
-  override protected def beforeAll(): Unit = {
-    CassandraLauncher.start(
-      databaseDirectory,
-      CassandraLauncher.DefaultTestConfigResource,
-      clean = true,
-      port = 19042, // default is 9042, but use different for test
-      CassandraLauncher.classpathForResources("logback-test.xml"))
-
-    // avoid concurrent creation of keyspace and tables
-    initializePersistence()
-    Main.createTables(testKit1.system)
-
-    super.beforeAll()
-  }
-
-  private def initializePersistence(): Unit = {
-    val timeout = 10.seconds
-    val done = PersistenceInit.initializeDefaultPlugins(testKit1.system, timeout)
-    Await.result(done, timeout)
-  }
-
-  override protected def afterAll(): Unit = {
-    super.afterAll()
-
-    testKit4.shutdownTestKit()
-    testKit3.shutdownTestKit()
-    testKit2.shutdownTestKit()
-    testKit1.shutdownTestKit()
-
-    CassandraLauncher.stop()
-    FileUtils.deleteDirectory(databaseDirectory)
-  }
-
-  "Shopping Cart application" should {
-    "init and join Cluster" in {
-      testKit1.spawn[Nothing](Guardian(), "guardian")
-      testKit2.spawn[Nothing](Guardian(), "guardian")
-      testKit3.spawn[Nothing](Guardian(), "guardian")
-      // node4 is initialized and joining later
-
-      systems3.foreach { sys =>
-        Cluster(sys).manager ! Join(Cluster(testKit1.system).selfMember.address)
-      }
-
-      // let the nodes join and become Up
-      eventually(PatienceConfiguration.Timeout(10.seconds)) {
-        systems3.foreach { sys =>
-          Cluster(sys).selfMember.status should ===(MemberStatus.Up)
-        }
-      }
-    }
-
-    "update and consume from different nodes" in {
-      val cart1 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-1")
-      val probe1 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
-      val cart2 = ClusterSharding(testKit2.system).entityRefFor(ShoppingCart.EntityKey, "cart-2")
-      val probe2 = testKit2.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
-      val eventProbe3 = testKit3.createTestProbe[ShoppingCart.Event]()
-      testKit3.system.eventStream ! EventStream.Subscribe(eventProbe3.ref)
-
-      // update from node1, consume event from node3
-      cart1 ! ShoppingCart.AddItem("foo", 42, probe1.ref)
-      probe1.receiveMessage().isSuccess should ===(true)
-      eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-1", "foo", 42))
-
-      // update from node2, consume event from node3
-      cart2 ! ShoppingCart.AddItem("bar", 17, probe2.ref)
-      probe2.receiveMessage().isSuccess should ===(true)
-      cart2 ! ShoppingCart.AdjustItemQuantity("bar", 18, probe2.ref)
-      probe2.receiveMessage().isSuccess should ===(true)
-      eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-2", "bar", 17))
-      eventProbe3.expectMessage(ShoppingCart.ItemQuantityAdjusted("cart-2", "bar", 18))
-    }
-
-    "continue even processing from offset" in {
-      // give it time to write the offset before shutting down
-      Thread.sleep(1000)
-      testKit3.shutdownTestKit()
-
-      val eventProbe4 = testKit4.createTestProbe[ShoppingCart.Event]()
-      testKit4.system.eventStream ! EventStream.Subscribe(eventProbe4.ref)
-
-      testKit4.spawn[Nothing](Guardian(), "guardian")
-
-      Cluster(testKit4.system).manager ! Join(Cluster(testKit1.system).selfMember.address)
-
-      // let the node join and become Up
-      eventually(PatienceConfiguration.Timeout(10.seconds)) {
-        Cluster(testKit4.system).selfMember.status should ===(MemberStatus.Up)
-      }
-
-      val cart3 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-3")
-      val probe3 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
-      // update from node1, consume event from node4
-      cart3 ! ShoppingCart.AddItem("abc", 43, probe3.ref)
-      probe3.receiveMessage().isSuccess should ===(true)
-      // note that node4 is new, but continues reading from previous offset, i.e. not receiving events
-      // that have already been consumed
-      eventProbe4.expectMessage(ShoppingCart.ItemAdded("cart-3", "abc", 43))
-    }
-
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala
deleted file mode 100644
index 5932ea9..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-
-import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import akka.actor.typed.eventstream.EventStream
-import akka.pattern.StatusReply
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.projection.testkit.scaladsl.ProjectionTestKit
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.wordspec.AnyWordSpecLike
-
-object ProjectionSpec {
-  def config =
-    ConfigFactory.parseString("""
-      akka.actor.provider=local
-      akka.persistence.cassandra {		
-         events-by-tag {		
-           eventual-consistency-delay = 200ms		
-         }		
-       		
-         query {		
-           refresh-interval = 500 ms		
-         }		
-       		
-         journal.keyspace-autocreate = on		
-         journal.tables-autocreate = on		
-         snapshot.keyspace-autocreate = on		
-         snapshot.tables-autocreate = on		
-       }		
-       datastax-java-driver {		
-         basic.contact-points = ["127.0.0.1:19042"]		
-         basic.load-balancing-policy.local-datacenter = "datacenter1"		
-       }
-    """).withFallback(ConfigFactory.load()) // re-use application.conf other settings
-}
-
-class ProjectionSpec
-    extends ScalaTestWithActorTestKit(ProjectionSpec.config)
-    with AnyWordSpecLike
-    with BeforeAndAfterAll {
-  val projectionTestKit = ProjectionTestKit(testKit)
-  val settings = EventProcessorSettings(system)
-
-  val databaseDirectory = new File("target/cassandra-ProjectionSpec")
-
-  override protected def beforeAll(): Unit = {
-    CassandraLauncher.start(
-      databaseDirectory,
-      CassandraLauncher.DefaultTestConfigResource,
-      clean = true,
-      port = 19042, // default is 9042, but use different for test		
-      CassandraLauncher.classpathForResources("logback-test.xml"))
-
-    Main.createTables(system)
-
-    super.beforeAll()
-  }
-
-  override protected def afterAll(): Unit = {
-    super.afterAll()
-    CassandraLauncher.stop()
-    FileUtils.deleteDirectory(databaseDirectory)
-  }
-
-  "The events from the Shopping Cart" should {
-
-    "be published to the system event stream by the projection" in {
-      val cartProbe = createTestProbe[Any]()
-      val cart = spawn(ShoppingCart("cart-1", Set(s"${settings.tagPrefix}-0")))
-      cart ! ShoppingCart.AddItem("25", 12, cartProbe.ref)
-      cartProbe.expectMessageType[StatusReply[ShoppingCart.Summary]].isSuccess should ===(true)
-
-      val eventProbe = createTestProbe[ShoppingCart.Event]()
-      system.eventStream ! EventStream.Subscribe(eventProbe.ref)
-      projectionTestKit.run(Guardian.createProjectionFor(system, settings, 0)) {
-        val added = eventProbe.expectMessageType[ShoppingCart.ItemAdded]
-        added.cartId should ===("cart-1")
-        added.itemId should ===("25")
-        added.quantity should ===(12)
-      }
-    }
-  }
-
-}
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala
deleted file mode 100644
index 898f923..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package sample.cqrs
-
-import java.util.UUID
-
-import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import akka.pattern.StatusReply
-import org.scalatest.wordspec.AnyWordSpecLike
-
-class ShoppingCartSpec extends ScalaTestWithActorTestKit(s"""
-      akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
-      akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
-      akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
-    """) with AnyWordSpecLike {
-
-  private var counter = 0
-  def newCartId(): String = {
-    counter += 1
-    s"cart-$counter"
-  }
-
-  "The Shopping Cart" should {
-
-    "add item" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)))
-    }
-
-    "reject already added item" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.AddItem("foo", 13, probe.ref)
-      probe.receiveMessage().isError should ===(true)
-    }
-
-    "remove item" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.RemoveItem("foo", probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map.empty, checkedOut = false)))
-    }
-
-    "adjust quantity" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.AdjustItemQuantity("foo", 43, probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false)))
-    }
-
-    "checkout" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.Checkout(probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true)))
-
-      cart ! ShoppingCart.AddItem("bar", 13, probe.ref)
-      probe.receiveMessage().isError should ===(true)
-    }
-
-    "keep its state" in {
-      val cartId = newCartId()
-      val cart = testKit.spawn(ShoppingCart(cartId, Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)))
-
-      testKit.stop(cart)
-
-      // start again with same cartId
-      val restartedCart = testKit.spawn(ShoppingCart(cartId, Set.empty))
-      val stateProbe = testKit.createTestProbe[ShoppingCart.Summary]
-      restartedCart ! ShoppingCart.Get(stateProbe.ref)
-      stateProbe.expectMessage(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))
-    }
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org