You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:35 UTC

[incubator-pekko-samples] branch wip-chbatey-cqrs-testing created (now 3cf2ab6)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch wip-chbatey-cqrs-testing
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


      at 3cf2ab6  Basic testing

This branch includes the following new commits:

     new 3cf2ab6  Basic testing

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 01/01: Basic testing

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-chbatey-cqrs-testing
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 3cf2ab6fd4bf3850288119f1256b7bf8a6877e57
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Fri Oct 2 15:10:52 2020 +0100

    Basic testing
---
 akka-sample-cqrs-scala/README.md                   |  66 +------
 akka-sample-cqrs-scala/build.sbt                   |  14 +-
 akka-sample-cqrs-scala/projection.sql              |  21 ++
 .../src/main/resources/application.conf            |  43 +++--
 .../projection/testing}/CborSerializable.scala     |   2 +-
 .../testing/ConfigurablePersistentActor.scala      |  47 +++++
 .../testing}/EventProcessorSettings.scala          |   7 +-
 .../scala/akka/projection/testing/Guardian.scala   |  87 +++++++++
 .../akka/projection/testing/HikariFactory.scala    |   9 +
 .../projection/testing/HikariJdbcSession.scala     |  23 +++
 .../projection/testing/HttpServer.scala}           |  15 +-
 .../akka/projection/testing/LoadGeneration.scala   |  85 ++++++++
 .../main/scala/akka/projection/testing/Main.scala  |  33 ++++
 .../projection/testing/ProjectionHandler.scala     |  19 ++
 .../scala/akka/projection/testing/TestRoutes.scala |  30 +++
 .../akka/projection/testing/TestValidation.scala   |  45 +++++
 .../src/main/scala/sample/cqrs/Main.scala          | 153 ---------------
 .../src/main/scala/sample/cqrs/ShoppingCart.scala  | 215 ---------------------
 .../cqrs/ShoppingCartProjectionHandler.scala       |  27 ---
 .../scala/sample/cqrs/ShoppingCartRoutes.scala     | 110 -----------
 .../src/test/resources/logback-test.xml            |  17 --
 .../test/scala/sample/cqrs/IntegrationSpec.scala   | 198 -------------------
 .../test/scala/sample/cqrs/ProjectionSpec.scala    |  87 ---------
 .../test/scala/sample/cqrs/ShoppingCartSpec.scala  |  86 ---------
 24 files changed, 450 insertions(+), 989 deletions(-)

diff --git a/akka-sample-cqrs-scala/README.md b/akka-sample-cqrs-scala/README.md
index 77ec65b..52ee6d2 100644
--- a/akka-sample-cqrs-scala/README.md
+++ b/akka-sample-cqrs-scala/README.md
@@ -1,66 +1,2 @@
-This tutorial contains a sample illustrating an CQRS design with [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html), [Akka Cluster Singleton](https://doc.akka.io/docs/akka/2.6/typed/cluster-singleton.html), [Akka Persistence](https://doc.akka.io/docs/akka/2.6/typed/persistence.html) and [Akka Persistence Query](https://doc.akka.io/docs/akka/2.6/persistence-query.html).
+# Projection latency testbed
 
-## Overview
-
-This sample application implements a CQRS-ES design that will side-effect in the read model on selected events persisted to Cassandra by the write model. In this sample, the side-effect is logging a line. 
-A more practical example would be to send a message to a Kafka topic or update a relational database.
-
-## Write model
-
-The write model is a shopping cart.
-
-The implementation is based on a sharded actor: each `ShoppingCart` is an [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html) entity. The entity actor `ShoppingCart` is an [EventSourcedBehavior](https://doc.akka.io/docs/akka/2.6/typed/persistence.html).
-
-Events from the shopping carts are tagged and consumed by the read model.
-
-## Read model
-
-The read model is implemented in such a way that 'load' is sharded over a number of processors. This number is `event-processor.parallelism`.
-This is implemented using [Akka Projections](https://doc.akka.io/docs/akka-projection/current) which is then running on top of
- [Sharded Daemon Process](https://doc.akka.io/docs/akka/current/typed/cluster-sharded-daemon-process.html).
-
-
-## Running the sample code
-
-1. Start a Cassandra server by running:
-
-```
-sbt "runMain sample.cqrs.Main cassandra"
-```
-
-2. Start a node that runs the write model:
-
-```
-sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2551"
-```
-
-3. Start a node that runs the read model:
-
-```
-sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2552"
-```
-
-4. More write or read nodes can be started started by defining roles and port:
-
-```
-sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2553"
-sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2554"
-```
-
-Try it with curl:
-
-```
-# add item to cart
-curl -X POST -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' http://127.0.0.1:8051/shopping/carts
-
-# get cart
-curl http://127.0.0.1:8051/shopping/carts/cart1
-
-# update quantity of item
-curl -X PUT -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' http://127.0.0.1:8051/shopping/carts
-
-# check out cart
-curl -X POST -H "Content-Type: application/json" -d '{}' http://127.0.0.1:8051/shopping/carts/cart1/checkout
-```
-
-or same `curl` commands to port 8052.
diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt
index 43cb70d..a67e25b 100644
--- a/akka-sample-cqrs-scala/build.sbt
+++ b/akka-sample-cqrs-scala/build.sbt
@@ -1,12 +1,12 @@
-val AkkaVersion = "2.6.8"
+val AkkaVersion = "2.6.9"
 val AkkaPersistenceCassandraVersion = "1.0.1"
 val AkkaHttpVersion = "10.2.0"
-val AkkaProjectionVersion = "0.3"
+val AkkaProjectionVersion = "1.0.0"
 
-lazy val `akka-sample-cqrs-scala` = project
+lazy val `akka-projection-testing` = project
   .in(file("."))
   .settings(
-    organization := "com.lightbend.akka.samples",
+    organization := "akka.projection.testing",
     version := "1.0",
     scalaVersion := "2.13.1",
     scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"),
@@ -20,19 +20,23 @@ lazy val `akka-sample-cqrs-scala` = project
         "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % AkkaPersistenceCassandraVersion,
         "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion,
         "com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionVersion,
+        "com.lightbend.akka" %% "akka-projection-jdbc" % AkkaProjectionVersion,
+        "com.zaxxer" % "HikariCP" % "3.4.5",
         "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
         "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
         "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
         "ch.qos.logback" % "logback-classic" % "1.2.3",
+        "org.postgresql" % "postgresql" % "42.2.14",
         "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
         "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test,
         "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
         "com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test,
+
         "org.scalatest" %% "scalatest" % "3.1.0" % Test,
         "commons-io" % "commons-io" % "2.4" % Test),
     fork in run := false,
     Global / cancelable := false, // ctrl-c
-    mainClass in (Compile, run) := Some("sample.cqrs.Main"),
+    mainClass in (Compile, run) := Some("akka.projection.testing.Main"),
     // disable parallel tests
     parallelExecution in Test := false,
     // show full stack traces and test case durations
diff --git a/akka-sample-cqrs-scala/projection.sql b/akka-sample-cqrs-scala/projection.sql
new file mode 100644
index 0000000..154a945
--- /dev/null
+++ b/akka-sample-cqrs-scala/projection.sql
@@ -0,0 +1,21 @@
+
+create table if not exists "AKKA_PROJECTION_OFFSET_STORE" (
+  "PROJECTION_NAME" VARCHAR(255) NOT NULL,
+  "PROJECTION_KEY" VARCHAR(255) NOT NULL,
+  "CURRENT_OFFSET" VARCHAR(255) NOT NULL,
+  "MANIFEST" VARCHAR(4) NOT NULL,
+  "MERGEABLE" BOOLEAN NOT NULL,
+  "LAST_UPDATED" BIGINT NOT NULL,
+  constraint "PK_PROJECTION_ID" primary key ("PROJECTION_NAME","PROJECTION_KEY")
+);
+
+create index if not exists "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME");
+
+create table if not exists events (
+    name varchar(256),
+    event varchar(256),
+    constraint pkey primary key (name, event)
+);
+
+
+
diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf
index 9bcb675..da92bcf 100644
--- a/akka-sample-cqrs-scala/src/main/resources/application.conf
+++ b/akka-sample-cqrs-scala/src/main/resources/application.conf
@@ -1,11 +1,29 @@
 akka {
+
+  projection {
+    jdbc {
+      dialect = "postgres-dialect"
+      offset-store {
+        schema = ""
+        table = "AKKA_PROJECTION_OFFSET_STORE"
+      }
+      blocking-jdbc-dispatcher {
+        type = Dispatcher
+        executor = "thread-pool-executor"
+        thread-pool-executor {
+          fixed-pool-size = 10
+        }
+      }
+    }
+  }
+
   loglevel = DEBUG
 
   actor {
     provider = cluster
 
     serialization-bindings {
-      "sample.cqrs.CborSerializable" = jackson-cbor
+      "akka.projection.testing.CborSerializable" = jackson-cbor
     }
   }
 
@@ -18,8 +36,8 @@ akka {
 
   cluster {
     seed-nodes = [
-      "akka://Shopping@127.0.0.1:2551",
-      "akka://Shopping@127.0.0.1:2552"
+      "akka://test@127.0.0.1:2551",
+      "akka://test@127.0.0.1:2552"
     ]
 
     roles = ["write-model", "read-model"]
@@ -37,13 +55,17 @@ akka {
 
 # Configuration for akka-persistence-cassandra
 akka.persistence.cassandra {
+  journal {
+    keyspace = "akka_testing"
+  }
+
   events-by-tag {
-    bucket-size = "Day"
+    bucket-size = "Hour"
     # for reduced latency
     eventual-consistency-delay = 200ms
     flush-interval = 50ms
     pubsub-notification = on
-    first-time-bucket = "20200115T00:00"
+    first-time-bucket = "20201001T00:00"
   }
 
   query {
@@ -61,14 +83,13 @@ datastax-java-driver {
   advanced.reconnect-on-init = on
 }
 
-akka.projection.cassandra.offset-store.keyspace = "akka_cqrs_sample"
-
 event-processor {
-  tag-prefix = "carts-slice"       // even processor tag prefix
-  parallelism = 4                  // number of event processors
+  parallelism = 4
+}
+
+test {
+
 }
 
-shopping.http.port = 0
-shopping.askTimeout = 5 s
 
 
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
similarity index 72%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
index cbec29e..a3a147e 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala
@@ -1,4 +1,4 @@
-package sample.cqrs
+package akka.projection.testing
 
 /**
  * Marker trait for serialization with Jackson CBOR
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala
new file mode 100644
index 0000000..579d208
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala
@@ -0,0 +1,47 @@
+package akka.projection.testing
+
+import akka.Done
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.pattern.StatusReply
+import akka.persistence.typed.PersistenceId
+import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
+
+object ConfigurablePersistentActor {
+
+  val Key: EntityTypeKey[Command] = EntityTypeKey[Command]("configurable")
+
+  def init(settings: EventProcessorSettings, system: ActorSystem[_]): ActorRef[ShardingEnvelope[Command]] = {
+    ClusterSharding(system).init(Entity(Key)(ctx => apply(settings, ctx.entityId))
+      .withRole("write-model"))
+  }
+
+  trait Command
+
+  final case class PersistAndAck(toPersist: String, replyTo: ActorRef[StatusReply[Done]], testName: String) extends Command
+
+  final case class Persist(toPersist: String, testName: String) extends Command
+
+  final case class Event(testName: String, payload: String, timeCreated: Long = System.currentTimeMillis()) extends CborSerializable
+
+  final case class State(eventsProcessed: Long) extends CborSerializable
+
+  def apply(settings: EventProcessorSettings, persistenceId: String): Behavior[Command] =
+    Behaviors.setup { ctx =>
+      EventSourcedBehavior[Command, Event, State](
+        persistenceId = PersistenceId.ofUniqueId(persistenceId),
+        State(0),
+        (_, command) => command match {
+          case Persist(toPersist, testName) =>
+            Effect.persist(Event(testName, toPersist))
+          case PersistAndAck(toPersist, ack, testName) =>
+            ctx.log.info("persisting event {}", command)
+            Effect.persist(Event(testName, toPersist)).thenRun(_ => ack ! StatusReply.ack())
+        },
+        (state, _) => state.copy(eventsProcessed = state.eventsProcessed + 1)).withTagger(event =>
+        Set("tag-" + math.abs(event.hashCode() % settings.parallelism)))
+    }
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
similarity index 63%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
index 774a221..69430ec 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala
@@ -1,4 +1,4 @@
-package sample.cqrs
+package akka.projection.testing
 
 import akka.actor.typed.ActorSystem
 import com.typesafe.config.Config
@@ -10,10 +10,9 @@ object EventProcessorSettings {
   }
 
   def apply(config: Config): EventProcessorSettings = {
-    val tagPrefix: String = config.getString("tag-prefix")
     val parallelism: Int = config.getInt("parallelism")
-    EventProcessorSettings(tagPrefix, parallelism)
+    EventProcessorSettings(parallelism)
   }
 }
 
-final case class EventProcessorSettings(tagPrefix: String, parallelism: Int)
+final case class EventProcessorSettings(parallelism: Int)
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala
new file mode 100644
index 0000000..c662600
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala
@@ -0,0 +1,87 @@
+package akka.projection.testing
+
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
+import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardedDaemonProcessSettings, ShardingEnvelope}
+import akka.cluster.typed.Cluster
+import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
+import akka.persistence.query.Offset
+import akka.projection.eventsourced.EventEnvelope
+import akka.projection.eventsourced.scaladsl.EventSourcedProvider
+import akka.projection.jdbc.scaladsl.JdbcProjection
+import akka.projection.scaladsl.ExactlyOnceProjection
+import akka.projection.testing.LoadGeneration.{Result, RunTest}
+import akka.projection.{ProjectionBehavior, ProjectionId}
+import akka.util.Timeout
+import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
+
+import scala.concurrent.duration.DurationInt
+import scala.util.{Failure, Success}
+
+object Guardian {
+
+  def createProjectionFor(
+                           settings: EventProcessorSettings,
+                           index: Int,
+                           factory: HikariFactory
+                         )(implicit system: ActorSystem[_]): ExactlyOnceProjection[Offset, EventEnvelope[ConfigurablePersistentActor.Event]] = {
+    val tag = s"tag-$index"
+    val sourceProvider = EventSourcedProvider.eventsByTag[ConfigurablePersistentActor.Event](
+      system = system,
+      readJournalPluginId = CassandraReadJournal.Identifier,
+      tag = tag)
+    JdbcProjection.exactlyOnce(
+      projectionId = ProjectionId("test-projection-id", tag),
+      sourceProvider,
+      () => factory.newSession(),
+      () => new ProjectionHandler(tag, system)
+    )
+  }
+
+  def apply(): Behavior[String] = {
+    Behaviors.setup[String] { context =>
+      implicit val system: ActorSystem[_] = context.system
+      val config = new HikariConfig
+      config.setJdbcUrl("jdbc:postgresql://127.0.0.1:5432/")
+      config.setUsername("docker")
+      config.setPassword("docker")
+      config.setMaximumPoolSize(19)
+      config.setAutoCommit(false)
+      val dataSource = new HikariDataSource(config)
+      val settings = EventProcessorSettings(system)
+      val shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]] = ConfigurablePersistentActor.init(settings, system)
+      if (Cluster(system).selfMember.hasRole("read-model")) {
+
+        val dbSessionFactory = new HikariFactory(dataSource)
+
+        // we only want to run the daemon processes on the read-model nodes
+        val shardingSettings = ClusterShardingSettings(system)
+        val shardedDaemonProcessSettings =
+          ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model"))
+
+        ShardedDaemonProcess(system).init(
+          name = "test-projection",
+          settings.parallelism,
+          n => ProjectionBehavior(createProjectionFor(settings, n, dbSessionFactory)),
+          shardedDaemonProcessSettings,
+          Some(ProjectionBehavior.Stop))
+      }
+
+      // TODO move to route
+      implicit val timeout: Timeout = 10.seconds
+      val loadGeneration: ActorRef[LoadGeneration.RunTest] = context.spawn(LoadGeneration(shardRegion, dataSource), "load-generation")
+      context.ask[RunTest, Result](loadGeneration, replyTo => LoadGeneration.RunTest(s"test-${System.currentTimeMillis()}", 2, 5, replyTo)) {
+        case Success(value) =>
+          context.log.info("Test passed {}", value)
+          "success"
+        case Failure(t) =>
+          context.log.error("Test failed",t )
+          "failure"
+      }
+
+
+      Behaviors.empty
+    }
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala
new file mode 100644
index 0000000..742a44b
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala
@@ -0,0 +1,9 @@
+package akka.projection.testing
+
+import javax.sql.DataSource
+
+class HikariFactory(val dataSource: DataSource) {
+  def newSession(): HikariJdbcSession = {
+    new HikariJdbcSession(dataSource)
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala
new file mode 100644
index 0000000..057b8d1
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala
@@ -0,0 +1,23 @@
+package akka.projection.testing
+
+import java.sql.Connection
+
+import akka.japi.function
+import akka.projection.jdbc.JdbcSession
+import javax.sql.DataSource
+
+
+
+class HikariJdbcSession(source: DataSource) extends JdbcSession {
+
+  private val connection = source.getConnection
+
+  override def withConnection[Result](func: function.Function[Connection, Result]): Result =
+    func(connection)
+
+  override def commit(): Unit = connection.commit()
+
+  override def rollback(): Unit = connection.rollback()
+
+  override def close(): Unit = connection.close()
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
similarity index 59%
rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala
rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
index 4a9c313..2444cee 100644
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala
@@ -1,18 +1,13 @@
-package sample.cqrs
+package akka.projection.testing
 
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-
-import akka.actor.CoordinatedShutdown
 import akka.actor.typed.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.server.Route
-import akka.Done
 
-class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) {
-  private val shutdown = CoordinatedShutdown(system)
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
 
+class HttpServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) {
   import system.executionContext
 
   def start(): Unit = {
@@ -21,7 +16,7 @@ class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[
       .onComplete {
       case Success(binding) =>
         val address = binding.localAddress
-        system.log.info("Shopping online at http://{}:{}/", address.getHostString, address.getPort)
+        system.log.info("Online at http://{}:{}/", address.getHostString, address.getPort)
 
       case Failure(ex) =>
         system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala
new file mode 100644
index 0000000..35fb682
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala
@@ -0,0 +1,85 @@
+package akka.projection.testing
+
+import akka.{Done, NotUsed}
+import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.{ActorRef, Behavior, Terminated}
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.pattern.StatusReply
+import akka.projection.testing.LoadGeneration.{Failed, Result, RunTest}
+import akka.projection.testing.LoadTest.Start
+import akka.stream.scaladsl.Source
+import akka.util.Timeout
+import javax.sql.DataSource
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.DurationInt
+import scala.util.{Failure, Success}
+
+object LoadGeneration {
+
+  case class RunTest(name: String, actors: Int, eventsPerActor: Int, reply: ActorRef[Result])
+
+  sealed trait Result
+
+  case class Pass() extends Result
+
+  case class Failed(t: Option[Throwable], expected: Int, got: Int) extends Result
+
+  def apply(shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[RunTest] = Behaviors.setup { ctx =>
+    Behaviors.receiveMessage[RunTest] {
+      case rt@RunTest(name, actors, eventsPerActor, reply) =>
+        ctx.spawn(LoadTest(name, shardRegion, source), s"test-$name") ! Start(rt)
+        Behaviors.same
+    }
+  }
+
+}
+
+object LoadTest {
+
+  sealed trait Command
+
+  case class Start(test: RunTest) extends Command
+
+  private case class StartValidation() extends Command
+
+  private case class LoadGenerationFailed(t: Throwable) extends Command
+
+  def apply(testName: String, shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[Command] = Behaviors.setup { ctx =>
+    import akka.actor.typed.scaladsl.AskPattern._
+    implicit val timeout: Timeout = 30.seconds
+    implicit val system = ctx.system
+    implicit val ec: ExecutionContext = system.executionContext
+    Behaviors.receiveMessage[Command] {
+      case Start(RunTest(_, actors, eventsPerActor, replyTo)) =>
+        ctx.log.info("Starting load generation")
+        val expected = actors * eventsPerActor
+        val testRun: Source[StatusReply[Done], NotUsed] = Source(1 to actors)
+          .flatMapConcat(id =>
+            Source(1 to eventsPerActor)
+              .mapAsync(1)(message => shardRegion.ask[StatusReply[Done]] { replyTo =>
+                ShardingEnvelope(s"$id", ConfigurablePersistentActor.PersistAndAck(s"actor-$id-message-$message", replyTo, testName))
+              }))
+        ctx.pipeToSelf(testRun.run()) {
+          case Success(_) => StartValidation()
+          case Failure(t) => LoadGenerationFailed(t)
+        }
+        Behaviors.receiveMessage[Command] {
+          case StartValidation() =>
+            ctx.log.info("Starting validation")
+            val validation = ctx.spawn(TestValidation(replyTo, testName, expected, source: DataSource), s"TestValidation=$testName")
+            ctx.watch(validation)
+            Behaviors.same
+          case LoadGenerationFailed(t) =>
+            ctx.log.error("Load generation failed", t)
+            replyTo ! Failed(Some(t), -1, -1)
+            Behaviors.stopped
+        }.receiveSignal {
+          case (ctx, Terminated(_)) =>
+            ctx.log.info("Validation finished, terminating")
+            Behaviors.stopped
+        }
+    }
+  }
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala
new file mode 100644
index 0000000..09690e3
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala
@@ -0,0 +1,33 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import com.typesafe.config.{Config, ConfigFactory}
+
+object Main {
+
+  def main(args: Array[String]): Unit = {
+    args.headOption match {
+
+      case Some(portString) if portString.matches("""\d+""") =>
+        val port = portString.toInt
+        val httpPort = ("80" + portString.takeRight(2)).toInt
+        startNode(port, httpPort)
+
+      case None =>
+        throw new IllegalArgumentException("port number required argument")
+    }
+  }
+
+  def startNode(port: Int, httpPort: Int): Unit = {
+    ActorSystem[String](Guardian(), "test", config(port, httpPort))
+
+  }
+
+  def config(port: Int, httpPort: Int): Config =
+    ConfigFactory.parseString(
+      s"""
+      akka.remote.artery.canonical.port = $port
+      test.http.port = $httpPort
+       """).withFallback(ConfigFactory.load())
+
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala
new file mode 100644
index 0000000..bf8e7a0
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala
@@ -0,0 +1,19 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import akka.projection.eventsourced.EventEnvelope
+import akka.projection.jdbc.scaladsl.JdbcHandler
+import org.slf4j.{Logger, LoggerFactory}
+
+class ProjectionHandler(tag: String, system: ActorSystem[_])
+    extends JdbcHandler[EventEnvelope[ConfigurablePersistentActor.Event], HikariJdbcSession] {
+  private val log: Logger = LoggerFactory.getLogger(getClass)
+
+  override def process(session: HikariJdbcSession, envelope: EventEnvelope[ConfigurablePersistentActor.Event]): Unit = {
+    log.info("Event {} for tag {}", envelope.event.payload, tag)
+    session.withConnection(connection =>
+      connection.createStatement()
+        .execute(s"insert into events(name, event) values ('${envelope.event.testName}','${envelope.event.payload}')")
+    )
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala
new file mode 100644
index 0000000..a1b40ce
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala
@@ -0,0 +1,30 @@
+package akka.projection.testing
+
+import akka.actor.typed.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import spray.json.DefaultJsonProtocol._
+import spray.json.RootJsonFormat
+
+object TestRoutes {
+  case class RunTest(name: String, nrActors: Long, messagesPerActor: Long)
+  case class TestResult(pass: Boolean, expected: Long, got: Long)
+
+  implicit val runTestFormat: RootJsonFormat[RunTest] = jsonFormat3(RunTest)
+  implicit val testResultFormat: RootJsonFormat[TestResult] = jsonFormat3(TestResult)
+}
+
+//class TestRoutes()(implicit val system: ActorSystem[_]) {
+//  import TestRoutes._
+//  val route: Route = path("test") {
+//    post {
+//      entity(as[RunTest]) { runTest =>
+//
+//
+//      }
+//    }
+//  }
+//
+//}
diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala
new file mode 100644
index 0000000..0976a82
--- /dev/null
+++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala
@@ -0,0 +1,45 @@
+package akka.projection.testing
+
+import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.scaladsl.Behaviors
+import akka.projection.testing.LoadGeneration.{Pass, Result}
+import javax.sql.DataSource
+
+object TestValidation {
+  // FIXME blocking, dispatcher
+  // FIXME timeout
+  def apply(replyTo: ActorRef[Result], testName: String, expectedNrEvents: Long, source: DataSource): Behavior[String] = {
+    import scala.concurrent.duration._
+    Behaviors.setup { ctx =>
+      def validate(): Boolean = {
+        val connection = source.getConnection
+        try {
+          val resultSet = connection.createStatement().executeQuery(s"select count(*) from events where name = '$testName'")
+          if (resultSet.next()) {
+            val count = resultSet.getInt("count")
+            ctx.log.info("Expected {} got {}!", expectedNrEvents, count)
+            expectedNrEvents == count
+          } else {
+            throw new RuntimeException("Expected single row")
+          }
+        } finally {
+          connection.close()
+        }
+      }
+
+      Behaviors.withTimers { timers =>
+        timers.startTimerAtFixedRate("test", 2.seconds)
+        Behaviors.receiveMessage {
+          case "test" =>
+            if (validate()) {
+              ctx.log.info("Validated. Stopping")
+              replyTo ! Pass()
+              Behaviors.stopped
+            } else {
+              Behaviors.same
+            }
+        }
+      }
+    }
+  }
+}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala
deleted file mode 100644
index 795d407..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-import java.util.concurrent.CountDownLatch
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.typed.{ ClusterShardingSettings, ShardedDaemonProcessSettings }
-import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
-import akka.cluster.typed.Cluster
-import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.persistence.query.Offset
-import akka.projection.{ ProjectionBehavior, ProjectionId }
-import akka.projection.scaladsl.AtLeastOnceProjection
-import akka.projection.cassandra.scaladsl.CassandraProjection
-import akka.projection.eventsourced.EventEnvelope
-import akka.projection.eventsourced.scaladsl.EventSourcedProvider
-import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-
-object Main {
-
-  def main(args: Array[String]): Unit = {
-    args.headOption match {
-
-      case Some(portString) if portString.matches("""\d+""") =>
-        val port = portString.toInt
-        val httpPort = ("80" + portString.takeRight(2)).toInt
-        startNode(port, httpPort)
-
-      case Some("cassandra") =>
-        startCassandraDatabase()
-        println("Started Cassandra, press Ctrl + C to kill")
-        new CountDownLatch(1).await()
-
-      case None =>
-        throw new IllegalArgumentException("port number, or cassandra required argument")
-    }
-  }
-
-  def startNode(port: Int, httpPort: Int): Unit = {
-    val system =
-      ActorSystem[Nothing](Guardian(), "Shopping", config(port, httpPort))
-
-    if (Cluster(system).selfMember.hasRole("read-model"))
-      createTables(system)
-  }
-
-  def config(port: Int, httpPort: Int): Config =
-    ConfigFactory.parseString(s"""
-      akka.remote.artery.canonical.port = $port
-      shopping.http.port = $httpPort
-       """).withFallback(ConfigFactory.load())
-
-  /**
-   * To make the sample easier to run we kickstart a Cassandra instance to
-   * act as the journal. Cassandra is a great choice of backend for Akka Persistence but
-   * in a real application a pre-existing Cassandra cluster should be used.
-   */
-  def startCassandraDatabase(): Unit = {
-    val databaseDirectory = new File("target/cassandra-db")
-    CassandraLauncher.start(databaseDirectory, CassandraLauncher.DefaultTestConfigResource, clean = false, port = 9042)
-  }
-
-  def createTables(system: ActorSystem[_]): Unit = {
-    val session =
-      CassandraSessionRegistry(system).sessionFor("alpakka.cassandra")
-
-    // TODO use real replication strategy in real application
-    val keyspaceStmt =
-      """
-      CREATE KEYSPACE IF NOT EXISTS akka_cqrs_sample
-      WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
-      """
-
-    val offsetTableStmt =
-      """
-      CREATE TABLE IF NOT EXISTS akka_cqrs_sample.offset_store (
-        projection_name text,
-        partition int,
-        projection_key text,
-        offset text,
-        manifest text,
-        last_updated timestamp,
-        PRIMARY KEY ((projection_name, partition), projection_key)
-      )
-      """
-
-    // ok to block here, main thread
-    Await.ready(session.executeDDL(keyspaceStmt), 30.seconds)
-    system.log.info("Created akka_cqrs_sample keyspace")
-    Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
-    system.log.info("Created akka_cqrs_sample.offset_store table")
-
-  }
-
-}
-
-object Guardian {
-
-  def createProjectionFor(
-      system: ActorSystem[_],
-      settings: EventProcessorSettings,
-      index: Int): AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = {
-    val tag = s"${settings.tagPrefix}-$index"
-    val sourceProvider = EventSourcedProvider.eventsByTag[ShoppingCart.Event](
-      system = system,
-      readJournalPluginId = CassandraReadJournal.Identifier,
-      tag = tag)
-    CassandraProjection.atLeastOnce(
-      projectionId = ProjectionId("shopping-carts", tag),
-      sourceProvider,
-      handler = () => new ShoppingCartProjectionHandler(tag, system))
-  }
-
-  def apply(): Behavior[Nothing] = {
-    Behaviors.setup[Nothing] { context =>
-      val system = context.system
-
-      val settings = EventProcessorSettings(system)
-
-      val httpPort = context.system.settings.config.getInt("shopping.http.port")
-
-      ShoppingCart.init(system, settings)
-
-      if (Cluster(system).selfMember.hasRole("read-model")) {
-
-        // we only want to run the daemon processes on the read-model nodes
-        val shardingSettings = ClusterShardingSettings(system)
-        val shardedDaemonProcessSettings =
-          ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model"))
-
-        ShardedDaemonProcess(system).init(
-          name = "ShoppingCartProjection",
-          settings.parallelism,
-          n => ProjectionBehavior(createProjectionFor(system, settings, n)),
-          shardedDaemonProcessSettings,
-          Some(ProjectionBehavior.Stop))
-      }
-
-      val routes = new ShoppingCartRoutes()(context.system)
-      new ShoppingCartServer(routes.shopping, httpPort)(context.system).start()
-
-      Behaviors.empty
-    }
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala
deleted file mode 100644
index c800f0b..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala
+++ /dev/null
@@ -1,215 +0,0 @@
-package sample.cqrs
-
-import java.time.Instant
-
-import scala.concurrent.duration._
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
-import akka.actor.typed.SupervisorStrategy
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.sharding.typed.scaladsl.Entity
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.pattern.StatusReply
-import akka.persistence.typed.PersistenceId
-import akka.persistence.typed.scaladsl.RetentionCriteria
-import akka.persistence.typed.scaladsl.Effect
-import akka.persistence.typed.scaladsl.EventSourcedBehavior
-import akka.persistence.typed.scaladsl.ReplyEffect
-
-/**
- * This is an event sourced actor. It has a state, [[ShoppingCart.State]], which
- * stores the current shopping cart items and whether it's checked out.
- *
- * Event sourced actors are interacted with by sending them commands,
- * see classes implementing [[ShoppingCart.Command]].
- *
- * Commands get translated to events, see classes implementing [[ShoppingCart.Event]].
- * It's the events that get persisted by the entity. Each event will have an event handler
- * registered for it, and an event handler updates the current state based on the event.
- * This will be done when the event is first created, and it will also be done when the entity is
- * loaded from the database - each event will be replayed to recreate the state
- * of the entity.
- */
-object ShoppingCart {
-
-  /**
-   * The current state held by the persistent entity.
-   */
-  final case class State(items: Map[String, Int], checkoutDate: Option[Instant]) extends CborSerializable {
-
-    def isCheckedOut: Boolean =
-      checkoutDate.isDefined
-
-    def hasItem(itemId: String): Boolean =
-      items.contains(itemId)
-
-    def isEmpty: Boolean =
-      items.isEmpty
-
-    def updateItem(itemId: String, quantity: Int): State = {
-      quantity match {
-        case 0 => copy(items = items - itemId)
-        case _ => copy(items = items + (itemId -> quantity))
-      }
-    }
-
-    def removeItem(itemId: String): State =
-      copy(items = items - itemId)
-
-    def checkout(now: Instant): State =
-      copy(checkoutDate = Some(now))
-
-    def toSummary: Summary =
-      Summary(items, isCheckedOut)
-  }
-  object State {
-    val empty = State(items = Map.empty, checkoutDate = None)
-  }
-
-  /**
-   * This interface defines all the commands that the ShoppingCart persistent actor supports.
-   */
-  sealed trait Command extends CborSerializable
-
-  /**
-   * A command to add an item to the cart.
-   *
-   * It can reply with `StatusReply[Summary]`, which is sent back to the caller when
-   * all the events emitted by this command are successfully persisted.
-   */
-  final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
-  /**
-   * A command to remove an item from the cart.
-   */
-  final case class RemoveItem(itemId: String, replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
-  /**
-   * A command to adjust the quantity of an item in the cart.
-   */
-  final case class AdjustItemQuantity(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]])
-      extends Command
-
-  /**
-   * A command to checkout the shopping cart.
-   */
-  final case class Checkout(replyTo: ActorRef[StatusReply[Summary]]) extends Command
-
-  /**
-   * A command to get the current state of the shopping cart.
-   */
-  final case class Get(replyTo: ActorRef[Summary]) extends Command
-
-  /**
-   * Summary of the shopping cart state, used in reply messages.
-   */
-  final case class Summary(items: Map[String, Int], checkedOut: Boolean) extends CborSerializable
-
-  /**
-   * This interface defines all the events that the ShoppingCart supports.
-   */
-  sealed trait Event extends CborSerializable {
-    def cartId: String
-  }
-
-  final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event
-
-  final case class ItemRemoved(cartId: String, itemId: String) extends Event
-
-  final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event
-
-  final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
-
-  val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart")
-
-  def init(system: ActorSystem[_], eventProcessorSettings: EventProcessorSettings): Unit = {
-    ClusterSharding(system).init(Entity(EntityKey) { entityContext =>
-      val n = math.abs(entityContext.entityId.hashCode % eventProcessorSettings.parallelism)
-      val eventProcessorTag = eventProcessorSettings.tagPrefix + "-" + n
-      ShoppingCart(entityContext.entityId, Set(eventProcessorTag))
-    }.withRole("write-model"))
-  }
-
-  def apply(cartId: String, eventProcessorTags: Set[String]): Behavior[Command] = {
-    EventSourcedBehavior
-      .withEnforcedReplies[Command, Event, State](
-        PersistenceId(EntityKey.name, cartId),
-        State.empty,
-        (state, command) =>
-          //The shopping cart behavior changes if it's checked out or not.
-          // The commands are handled differently for each case.
-          if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command)
-          else openShoppingCart(cartId, state, command),
-        (state, event) => handleEvent(state, event))
-      .withTagger(_ => eventProcessorTags)
-      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
-      .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
-  }
-
-  private def openShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] =
-    command match {
-      case AddItem(itemId, quantity, replyTo) =>
-        if (state.hasItem(itemId))
-          Effect.reply(replyTo)(StatusReply.Error(s"Item '$itemId' was already added to this shopping cart"))
-        else if (quantity <= 0)
-          Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero"))
-        else
-          Effect
-            .persist(ItemAdded(cartId, itemId, quantity))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-
-      case RemoveItem(itemId, replyTo) =>
-        if (state.hasItem(itemId))
-          Effect
-            .persist(ItemRemoved(cartId, itemId))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-        else
-          Effect.reply(replyTo)(StatusReply.Success(state.toSummary)) // removing an item is idempotent
-
-      case AdjustItemQuantity(itemId, quantity, replyTo) =>
-        if (quantity <= 0)
-          Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero"))
-        else if (state.hasItem(itemId))
-          Effect
-            .persist(ItemQuantityAdjusted(cartId, itemId, quantity))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-        else
-          Effect.reply(replyTo)(
-            StatusReply.Error(s"Cannot adjust quantity for item '$itemId'. Item not present on cart"))
-
-      case Checkout(replyTo) =>
-        if (state.isEmpty)
-          Effect.reply(replyTo)(StatusReply.Error("Cannot checkout an empty shopping cart"))
-        else
-          Effect
-            .persist(CheckedOut(cartId, Instant.now()))
-            .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary))
-
-      case Get(replyTo) =>
-        Effect.reply(replyTo)(state.toSummary)
-    }
-
-  private def checkedOutShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] =
-    command match {
-      case Get(replyTo) =>
-        Effect.reply(replyTo)(state.toSummary)
-      case cmd: AddItem =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't add an item to an already checked out shopping cart"))
-      case cmd: RemoveItem =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't remove an item from an already checked out shopping cart"))
-      case cmd: AdjustItemQuantity =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't adjust item on an already checked out shopping cart"))
-      case cmd: Checkout =>
-        Effect.reply(cmd.replyTo)(StatusReply.Error("Can't checkout already checked out shopping cart"))
-    }
-
-  private def handleEvent(state: State, event: Event) = {
-    event match {
-      case ItemAdded(_, itemId, quantity)            => state.updateItem(itemId, quantity)
-      case ItemRemoved(_, itemId)                    => state.removeItem(itemId)
-      case ItemQuantityAdjusted(_, itemId, quantity) => state.updateItem(itemId, quantity)
-      case CheckedOut(_, eventTime)                  => state.checkout(eventTime)
-    }
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala
deleted file mode 100644
index cf965a8..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package sample.cqrs
-
-import akka.Done
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.eventstream.EventStream
-import akka.projection.eventsourced.EventEnvelope
-import akka.projection.scaladsl.Handler
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.Future
-
-class ShoppingCartProjectionHandler(tag: String, system: ActorSystem[_])
-    extends Handler[EventEnvelope[ShoppingCart.Event]] {
-  val log = LoggerFactory.getLogger(getClass)
-
-  override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
-
-    log.info(
-      "EventProcessor({}) consumed {} from {} with seqNr {}",
-      tag,
-      envelope.event,
-      envelope.persistenceId,
-      envelope.sequenceNr)
-    system.eventStream ! EventStream.Publish(envelope.event)
-    Future.successful(Done)
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
deleted file mode 100644
index acd5b89..0000000
--- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-package sample.cqrs
-
-import scala.concurrent.Future
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.http.scaladsl.model.StatusCodes
-import akka.http.scaladsl.server.Route
-import akka.pattern.StatusReply
-import akka.util.Timeout
-
-object ShoppingCartRoutes {
-  final case class AddItem(cartId: String, itemId: String, quantity: Int)
-  final case class UpdateItem(cartId: String, itemId: String, quantity: Int)
-}
-
-class ShoppingCartRoutes()(implicit system: ActorSystem[_]) {
-
-  implicit private val timeout: Timeout =
-    Timeout.create(system.settings.config.getDuration("shopping.askTimeout"))
-  private val sharding = ClusterSharding(system)
-
-  import ShoppingCartRoutes._
-  import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-  import akka.http.scaladsl.server.Directives._
-  import JsonFormats._
-
-  val shopping: Route =
-    pathPrefix("shopping") {
-      pathPrefix("carts") {
-        concat(
-          post {
-            entity(as[AddItem]) {
-              data =>
-                val entityRef =
-                  sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId)
-                val reply: Future[StatusReply[ShoppingCart.Summary]] =
-                  entityRef.ask(ShoppingCart.AddItem(data.itemId, data.quantity, _))
-                onSuccess(reply) {
-                  case StatusReply.Success(summary: ShoppingCart.Summary) =>
-                    complete(StatusCodes.OK -> summary)
-                  case StatusReply.Error(reason) =>
-                    complete(StatusCodes.BadRequest -> reason)
-                }
-            }
-          },
-          put {
-            entity(as[UpdateItem]) {
-              data =>
-                val entityRef =
-                  sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId)
-
-                def command(replyTo: ActorRef[StatusReply[ShoppingCart.Summary]]) =
-                  if (data.quantity == 0)
-                    ShoppingCart.RemoveItem(data.itemId, replyTo)
-                  else
-                    ShoppingCart.AdjustItemQuantity(data.itemId, data.quantity, replyTo)
-
-                val reply: Future[StatusReply[ShoppingCart.Summary]] =
-                  entityRef.ask(command(_))
-                onSuccess(reply) {
-                  case StatusReply.Success(summary: ShoppingCart.Summary) =>
-                    complete(StatusCodes.OK -> summary)
-                  case StatusReply.Error(reason) =>
-                    complete(StatusCodes.BadRequest -> reason)
-                }
-            }
-          },
-          pathPrefix(Segment) { cartId =>
-            concat(get {
-              val entityRef =
-                sharding.entityRefFor(ShoppingCart.EntityKey, cartId)
-              onSuccess(entityRef.ask(ShoppingCart.Get)) { summary =>
-                if (summary.items.isEmpty) complete(StatusCodes.NotFound)
-                else complete(summary)
-              }
-            }, path("checkout") {
-              post {
-                val entityRef =
-                  sharding.entityRefFor(ShoppingCart.EntityKey, cartId)
-                val reply: Future[StatusReply[ShoppingCart.Summary]] =
-                  entityRef.ask(ShoppingCart.Checkout(_))
-                onSuccess(reply) {
-                  case StatusReply.Success(summary: ShoppingCart.Summary) =>
-                    complete(StatusCodes.OK -> summary)
-                  case StatusReply.Error(reason) =>
-                    complete(StatusCodes.BadRequest -> reason)
-                }
-              }
-            })
-          })
-      }
-    }
-
-}
-
-object JsonFormats {
-
-  import spray.json.RootJsonFormat
-  // import the default encoders for primitive types (Int, String, Lists etc)
-  import spray.json.DefaultJsonProtocol._
-
-  implicit val summaryFormat: RootJsonFormat[ShoppingCart.Summary] =
-    jsonFormat2(ShoppingCart.Summary)
-  implicit val addItemFormat: RootJsonFormat[ShoppingCartRoutes.AddItem] =
-    jsonFormat3(ShoppingCartRoutes.AddItem)
-  implicit val updateItemFormat: RootJsonFormat[ShoppingCartRoutes.UpdateItem] =
-    jsonFormat3(ShoppingCartRoutes.UpdateItem)
-
-}
diff --git a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml b/akka-sample-cqrs-scala/src/test/resources/logback-test.xml
deleted file mode 100644
index 179c899..0000000
--- a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<configuration>
-
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="com.datastax.oss.driver" level="WARN"/>
-    <logger name="org.apache.cassandra" level="ERROR"/>
-    <logger name="com.codahale.metrics" level="INFO"/>
-
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </root>
-
-</configuration>
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala
deleted file mode 100644
index 8f1930a..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-import java.util.UUID
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import akka.actor.testkit.typed.scaladsl.ActorTestKit
-import akka.actor.typed.eventstream.EventStream
-import akka.cluster.MemberStatus
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.typed.Cluster
-import akka.cluster.typed.Join
-import akka.pattern.StatusReply
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.persistence.testkit.scaladsl.PersistenceInit
-import akka.persistence.typed.PersistenceId
-import akka.persistence.typed.scaladsl.Effect
-import akka.persistence.typed.scaladsl.EventSourcedBehavior
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.TestSuite
-import org.scalatest.concurrent.Eventually
-import org.scalatest.concurrent.PatienceConfiguration
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.time.Span
-import org.scalatest.wordspec.AnyWordSpecLike
-
-object IntegrationSpec {
-  val config: Config = ConfigFactory.parseString(s"""
-      akka.cluster {
-         seed-nodes = []
-      }
-      
-      akka.persistence.cassandra {
-        events-by-tag {
-          eventual-consistency-delay = 200ms
-        }
-      
-        query {
-          refresh-interval = 500 ms
-        }
-      
-        journal.keyspace-autocreate = on
-        journal.tables-autocreate = on
-        snapshot.keyspace-autocreate = on
-        snapshot.tables-autocreate = on
-      }
-      datastax-java-driver {
-        basic.contact-points = ["127.0.0.1:19042"]
-        basic.load-balancing-policy.local-datacenter = "datacenter1"
-      }
-      
-      event-processor {
-        keep-alive-interval = 1 seconds
-      }
-      akka.loglevel = DEBUG
-      akka.actor.testkit.typed.single-expect-default = 5s
-      # For LoggingTestKit
-      akka.actor.testkit.typed.filter-leeway = 5s
-      akka.actor.testkit.typed.throw-on-shutdown-timeout = off
-    """).withFallback(ConfigFactory.load())
-}
-
-class IntegrationSpec
-    extends TestSuite
-    with Matchers
-    with BeforeAndAfterAll
-    with AnyWordSpecLike
-    with ScalaFutures
-    with Eventually {
-
-  implicit private val patience: PatienceConfig =
-    PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis))
-
-  private val databaseDirectory = new File("target/cassandra-IntegrationSpec")
-
-  private def roleConfig(role: String): Config =
-    ConfigFactory.parseString(s"akka.cluster.roles = [$role]")
-
-  // one TestKit (ActorSystem) per cluster node
-  private val testKit1 = ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config))
-  private val testKit2 =
-    ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config))
-  private val testKit3 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config))
-  private val testKit4 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config))
-
-  private val systems3 = List(testKit1.system, testKit2.system, testKit3.system)
-
-  override protected def beforeAll(): Unit = {
-    CassandraLauncher.start(
-      databaseDirectory,
-      CassandraLauncher.DefaultTestConfigResource,
-      clean = true,
-      port = 19042, // default is 9042, but use different for test
-      CassandraLauncher.classpathForResources("logback-test.xml"))
-
-    // avoid concurrent creation of keyspace and tables
-    initializePersistence()
-    Main.createTables(testKit1.system)
-
-    super.beforeAll()
-  }
-
-  private def initializePersistence(): Unit = {
-    val timeout = 10.seconds
-    val done = PersistenceInit.initializeDefaultPlugins(testKit1.system, timeout)
-    Await.result(done, timeout)
-  }
-
-  override protected def afterAll(): Unit = {
-    super.afterAll()
-
-    testKit4.shutdownTestKit()
-    testKit3.shutdownTestKit()
-    testKit2.shutdownTestKit()
-    testKit1.shutdownTestKit()
-
-    CassandraLauncher.stop()
-    FileUtils.deleteDirectory(databaseDirectory)
-  }
-
-  "Shopping Cart application" should {
-    "init and join Cluster" in {
-      testKit1.spawn[Nothing](Guardian(), "guardian")
-      testKit2.spawn[Nothing](Guardian(), "guardian")
-      testKit3.spawn[Nothing](Guardian(), "guardian")
-      // node4 is initialized and joining later
-
-      systems3.foreach { sys =>
-        Cluster(sys).manager ! Join(Cluster(testKit1.system).selfMember.address)
-      }
-
-      // let the nodes join and become Up
-      eventually(PatienceConfiguration.Timeout(10.seconds)) {
-        systems3.foreach { sys =>
-          Cluster(sys).selfMember.status should ===(MemberStatus.Up)
-        }
-      }
-    }
-
-    "update and consume from different nodes" in {
-      val cart1 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-1")
-      val probe1 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
-      val cart2 = ClusterSharding(testKit2.system).entityRefFor(ShoppingCart.EntityKey, "cart-2")
-      val probe2 = testKit2.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
-      val eventProbe3 = testKit3.createTestProbe[ShoppingCart.Event]()
-      testKit3.system.eventStream ! EventStream.Subscribe(eventProbe3.ref)
-
-      // update from node1, consume event from node3
-      cart1 ! ShoppingCart.AddItem("foo", 42, probe1.ref)
-      probe1.receiveMessage().isSuccess should ===(true)
-      eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-1", "foo", 42))
-
-      // update from node2, consume event from node3
-      cart2 ! ShoppingCart.AddItem("bar", 17, probe2.ref)
-      probe2.receiveMessage().isSuccess should ===(true)
-      cart2 ! ShoppingCart.AdjustItemQuantity("bar", 18, probe2.ref)
-      probe2.receiveMessage().isSuccess should ===(true)
-      eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-2", "bar", 17))
-      eventProbe3.expectMessage(ShoppingCart.ItemQuantityAdjusted("cart-2", "bar", 18))
-    }
-
-    "continue even processing from offset" in {
-      // give it time to write the offset before shutting down
-      Thread.sleep(1000)
-      testKit3.shutdownTestKit()
-
-      val eventProbe4 = testKit4.createTestProbe[ShoppingCart.Event]()
-      testKit4.system.eventStream ! EventStream.Subscribe(eventProbe4.ref)
-
-      testKit4.spawn[Nothing](Guardian(), "guardian")
-
-      Cluster(testKit4.system).manager ! Join(Cluster(testKit1.system).selfMember.address)
-
-      // let the node join and become Up
-      eventually(PatienceConfiguration.Timeout(10.seconds)) {
-        Cluster(testKit4.system).selfMember.status should ===(MemberStatus.Up)
-      }
-
-      val cart3 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-3")
-      val probe3 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]]
-
-      // update from node1, consume event from node4
-      cart3 ! ShoppingCart.AddItem("abc", 43, probe3.ref)
-      probe3.receiveMessage().isSuccess should ===(true)
-      // note that node4 is new, but continues reading from previous offset, i.e. not receiving events
-      // that have already been consumed
-      eventProbe4.expectMessage(ShoppingCart.ItemAdded("cart-3", "abc", 43))
-    }
-
-  }
-}
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala
deleted file mode 100644
index 5932ea9..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-package sample.cqrs
-
-import java.io.File
-
-import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import akka.actor.typed.eventstream.EventStream
-import akka.pattern.StatusReply
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.projection.testkit.scaladsl.ProjectionTestKit
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.wordspec.AnyWordSpecLike
-
-object ProjectionSpec {
-  def config =
-    ConfigFactory.parseString("""
-      akka.actor.provider=local
-      akka.persistence.cassandra {		
-         events-by-tag {		
-           eventual-consistency-delay = 200ms		
-         }		
-       		
-         query {		
-           refresh-interval = 500 ms		
-         }		
-       		
-         journal.keyspace-autocreate = on		
-         journal.tables-autocreate = on		
-         snapshot.keyspace-autocreate = on		
-         snapshot.tables-autocreate = on		
-       }		
-       datastax-java-driver {		
-         basic.contact-points = ["127.0.0.1:19042"]		
-         basic.load-balancing-policy.local-datacenter = "datacenter1"		
-       }
-    """).withFallback(ConfigFactory.load()) // re-use application.conf other settings
-}
-
-class ProjectionSpec
-    extends ScalaTestWithActorTestKit(ProjectionSpec.config)
-    with AnyWordSpecLike
-    with BeforeAndAfterAll {
-  val projectionTestKit = ProjectionTestKit(testKit)
-  val settings = EventProcessorSettings(system)
-
-  val databaseDirectory = new File("target/cassandra-ProjectionSpec")
-
-  override protected def beforeAll(): Unit = {
-    CassandraLauncher.start(
-      databaseDirectory,
-      CassandraLauncher.DefaultTestConfigResource,
-      clean = true,
-      port = 19042, // default is 9042, but use different for test		
-      CassandraLauncher.classpathForResources("logback-test.xml"))
-
-    Main.createTables(system)
-
-    super.beforeAll()
-  }
-
-  override protected def afterAll(): Unit = {
-    super.afterAll()
-    CassandraLauncher.stop()
-    FileUtils.deleteDirectory(databaseDirectory)
-  }
-
-  "The events from the Shopping Cart" should {
-
-    "be published to the system event stream by the projection" in {
-      val cartProbe = createTestProbe[Any]()
-      val cart = spawn(ShoppingCart("cart-1", Set(s"${settings.tagPrefix}-0")))
-      cart ! ShoppingCart.AddItem("25", 12, cartProbe.ref)
-      cartProbe.expectMessageType[StatusReply[ShoppingCart.Summary]].isSuccess should ===(true)
-
-      val eventProbe = createTestProbe[ShoppingCart.Event]()
-      system.eventStream ! EventStream.Subscribe(eventProbe.ref)
-      projectionTestKit.run(Guardian.createProjectionFor(system, settings, 0)) {
-        val added = eventProbe.expectMessageType[ShoppingCart.ItemAdded]
-        added.cartId should ===("cart-1")
-        added.itemId should ===("25")
-        added.quantity should ===(12)
-      }
-    }
-  }
-
-}
diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala
deleted file mode 100644
index 898f923..0000000
--- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package sample.cqrs
-
-import java.util.UUID
-
-import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
-import akka.pattern.StatusReply
-import org.scalatest.wordspec.AnyWordSpecLike
-
-class ShoppingCartSpec extends ScalaTestWithActorTestKit(s"""
-      akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
-      akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
-      akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
-    """) with AnyWordSpecLike {
-
-  private var counter = 0
-  def newCartId(): String = {
-    counter += 1
-    s"cart-$counter"
-  }
-
-  "The Shopping Cart" should {
-
-    "add item" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)))
-    }
-
-    "reject already added item" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.AddItem("foo", 13, probe.ref)
-      probe.receiveMessage().isError should ===(true)
-    }
-
-    "remove item" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.RemoveItem("foo", probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map.empty, checkedOut = false)))
-    }
-
-    "adjust quantity" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.AdjustItemQuantity("foo", 43, probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false)))
-    }
-
-    "checkout" in {
-      val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.receiveMessage().isSuccess should ===(true)
-      cart ! ShoppingCart.Checkout(probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true)))
-
-      cart ! ShoppingCart.AddItem("bar", 13, probe.ref)
-      probe.receiveMessage().isError should ===(true)
-    }
-
-    "keep its state" in {
-      val cartId = newCartId()
-      val cart = testKit.spawn(ShoppingCart(cartId, Set.empty))
-      val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
-      cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
-      probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)))
-
-      testKit.stop(cart)
-
-      // start again with same cartId
-      val restartedCart = testKit.spawn(ShoppingCart(cartId, Set.empty))
-      val stateProbe = testKit.createTestProbe[ShoppingCart.Summary]
-      restartedCart ! ShoppingCart.Get(stateProbe.ref)
-      stateProbe.expectMessage(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))
-    }
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org