You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by md...@apache.org on 2023/03/20 21:50:41 UTC
[incubator-pekko-samples] 02/02: format source with scalafmt, #17
This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 99d7bb1091f805c55a5c9fb8d4242d070f5c0895
Author: Auto Format <nobody>
AuthorDate: Mon Mar 20 21:58:36 2023 +0100
format source with scalafmt, #17
---
akka-sample-fsm-scala/build.sbt | 6 +-
.../src/main/scala/sample/DiningHakkers.scala | 66 ++++-----
akka-sample-kafka-to-sharding-scala/build.sbt | 43 +++---
.../sharding/embeddedkafka/KafkaBroker.scala | 2 +-
.../main/scala/sample/sharding/kafka/Main.scala | 51 +++----
.../sample/sharding/kafka/ProcessorSettings.scala | 8 +-
.../scala/sample/sharding/kafka/UserEvents.scala | 10 +-
.../sharding/kafka/UserEventsKafkaProcessor.scala | 32 ++---
.../sample/sharding/kafka/UserGrpcService.scala | 6 +-
.../kafka/producer/UserEventProducer.scala | 2 +-
akka-sample-persistence-dc-scala/build.sbt | 5 +-
.../project/plugins.sbt | 1 -
.../scala/sample/persistence/res/MainApp.scala | 25 ++--
.../sample/persistence/res/auction/Auction.scala | 1 -
.../sample/persistence/res/bank/BankAccount.scala | 51 ++-----
.../persistence/res/counter/ThumbsUpCounter.scala | 59 ++++----
.../persistence/res/counter/ThumbsUpHttp.scala | 21 +--
.../persistence/res/movielist/MovieWatchList.scala | 24 +---
akka-sample-persistence-scala/build.sbt | 3 +-
.../sample/persistence/CborSerializable.scala | 4 +-
.../scala/sample/persistence/ShoppingCart.scala | 8 +-
.../sample/persistence/ShoppingCartSpec.scala | 12 +-
akka-sample-sharding-java/build.sbt | 18 +--
akka-sample-sharding-scala/build.sbt | 21 +--
.../main/scala/sample/killrweather/fog/Fog.scala | 19 ++-
.../sample/killrweather/fog/WeatherStation.scala | 35 +++--
.../sample/killrweather/JacksonSerializers.scala | 5 +-
.../scala/sample/killrweather/JsonFormats.scala | 5 +-
.../sample/killrweather/WeatherHttpServer.scala | 13 +-
.../scala/sample/killrweather/WeatherRoutes.scala | 24 ++--
.../scala/sample/killrweather/WeatherStation.scala | 17 ++-
docs-gen/build.sbt | 60 ++++----
docs-gen/project/AkkaSamplePlugin.scala | 8 +-
pekko-sample-cluster-client-grpc-scala/build.sbt | 7 +-
.../sample/cluster/client/grpc/ClusterClient.scala | 159 ++++++++++-----------
.../grpc/ClusterClientReceptionistGrpcImpl.scala | 72 +++++-----
.../cluster/client/grpc/ClusterClientSpec.scala | 8 +-
pekko-sample-cluster-java/build.sbt | 17 ++-
.../stats/StatsSampleSingleMasterSpec.scala | 10 +-
.../sample/cluster/stats/StatsSampleSpec.scala | 8 +-
pekko-sample-cluster-scala/build.sbt | 17 ++-
.../sample/cluster/simple/ClusterListener.scala | 2 +-
.../src/main/scala/sample/cluster/stats/App.scala | 5 +-
.../scala/sample/cluster/stats/AppOneMaster.scala | 6 +-
.../scala/sample/cluster/stats/StatsClient.scala | 1 -
.../scala/sample/cluster/stats/StatsService.scala | 9 +-
.../scala/sample/cluster/stats/StatsWorker.scala | 2 +-
.../scala/sample/cluster/transformation/App.scala | 2 +-
.../sample/cluster/transformation/Frontend.scala | 8 +-
.../sample/cluster/transformation/Worker.scala | 2 +-
.../stats/StatsSampleSingleMasterSpec.scala | 8 +-
.../sample/cluster/stats/StatsSampleSpec.scala | 5 +-
pekko-sample-distributed-data-java/build.sbt | 5 +-
.../distributeddata/ReplicatedCacheSpec.scala | 1 -
.../distributeddata/ReplicatedMetricsSpec.scala | 1 -
.../sample/distributeddata/STMultiNodeSpec.scala | 2 +-
.../sample/distributeddata/ShoppingCartSpec.scala | 5 +-
.../sample/distributeddata/VotingServiceSpec.scala | 5 +-
pekko-sample-distributed-data-scala/build.sbt | 5 +-
.../project/plugins.sbt | 1 -
.../sample/distributeddata/ReplicatedCache.scala | 11 +-
.../sample/distributeddata/ReplicatedMetrics.scala | 24 ++--
.../sample/distributeddata/ShoppingCart.scala | 29 ++--
.../sample/distributeddata/VotingService.scala | 11 +-
.../distributeddata/ReplicatedCacheSpec.scala | 1 -
.../distributeddata/ReplicatedMetricsSpec.scala | 1 -
.../sample/distributeddata/STMultiNodeSpec.scala | 2 +-
.../sample/distributeddata/ShoppingCartSpec.scala | 1 -
.../sample/distributeddata/VotingServiceSpec.scala | 1 -
.../src/main/scala/worker/CborSerializable.scala | 4 +-
.../src/main/scala/worker/Main.scala | 16 +--
.../src/main/scala/worker/WorkExecutor.scala | 7 +-
.../src/main/scala/worker/WorkManager.scala | 154 ++++++++++----------
.../src/main/scala/worker/Worker.scala | 75 +++++-----
74 files changed, 639 insertions(+), 736 deletions(-)
diff --git a/akka-sample-fsm-scala/build.sbt b/akka-sample-fsm-scala/build.sbt
index d31c88d..f530982 100644
--- a/akka-sample-fsm-scala/build.sbt
+++ b/akka-sample-fsm-scala/build.sbt
@@ -6,9 +6,7 @@ val akkaVersion = "2.6.20"
scalaVersion := "2.13.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.11"
-)
+ "ch.qos.logback" % "logback-classic" % "1.2.11")
licenses := Seq(
- ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))
-)
+ ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
diff --git a/akka-sample-fsm-scala/src/main/scala/sample/DiningHakkers.scala b/akka-sample-fsm-scala/src/main/scala/sample/DiningHakkers.scala
index 59e45fa..93d199e 100644
--- a/akka-sample-fsm-scala/src/main/scala/sample/DiningHakkers.scala
+++ b/akka-sample-fsm-scala/src/main/scala/sample/DiningHakkers.scala
@@ -35,12 +35,11 @@ object Chopstick {
def apply(): Behavior[ChopstickMessage] = available()
- //When a Chopstick is taken by a hakker
- //It will refuse to be taken by other hakkers
- //But the owning hakker can put it back
+ // When a Chopstick is taken by a hakker
+ // It will refuse to be taken by other hakkers
+ // But the owning hakker can put it back
private def takenBy(
- hakker: ActorRef[ChopstickAnswer]
- ): Behavior[ChopstickMessage] = {
+ hakker: ActorRef[ChopstickAnswer]): Behavior[ChopstickMessage] = {
Behaviors.receive {
case (ctx, Take(otherHakker)) =>
otherHakker ! Busy(ctx.self)
@@ -54,7 +53,7 @@ object Chopstick {
}
}
- //When a Chopstick is available, it can be taken by a hakker
+ // When a Chopstick is available, it can be taken by a hakker
private def available(): Behavior[ChopstickMessage] = {
Behaviors.receivePartial {
case (ctx, Take(hakker)) =>
@@ -81,9 +80,9 @@ object Hakker {
}
class Hakker(ctx: ActorContext[Command],
- name: String,
- left: ActorRef[ChopstickMessage],
- right: ActorRef[ChopstickMessage]) {
+ name: String,
+ left: ActorRef[ChopstickMessage],
+ right: ActorRef[ChopstickMessage]) {
import Hakker._
@@ -96,8 +95,8 @@ class Hakker(ctx: ActorContext[Command],
startThinking(ctx, 5.seconds)
}
- //When a hakker is thinking it can become hungry
- //and try to pick up its chopsticks and eat
+ // When a hakker is thinking it can become hungry
+ // and try to pick up its chopsticks and eat
private val thinking: Behavior[Command] = {
Behaviors.receiveMessagePartial {
case Eat =>
@@ -107,10 +106,10 @@ class Hakker(ctx: ActorContext[Command],
}
}
- //When a hakker is hungry it tries to pick up its chopsticks and eat
- //When it picks one up, it goes into wait for the other
- //If the hakkers first attempt at grabbing a chopstick fails,
- //it starts to wait for the response of the other grab
+ // When a hakker is hungry it tries to pick up its chopsticks and eat
+ // When it picks one up, it goes into wait for the other
+ // If the hakkers first attempt at grabbing a chopstick fails,
+ // it starts to wait for the response of the other grab
private lazy val hungry: Behavior[Command] =
Behaviors.receiveMessagePartial {
case HandleChopstickAnswer(Taken(`left`)) =>
@@ -123,21 +122,19 @@ class Hakker(ctx: ActorContext[Command],
firstChopstickDenied
}
- //When a hakker is waiting for the last chopstick it can either obtain it
- //and start eating, or the other chopstick was busy, and the hakker goes
- //back to think about how he should obtain his chopsticks :-)
+ // When a hakker is waiting for the last chopstick it can either obtain it
+ // and start eating, or the other chopstick was busy, and the hakker goes
+ // back to think about how he should obtain his chopsticks :-)
private def waitForOtherChopstick(
- chopstickToWaitFor: ActorRef[ChopstickMessage],
- takenChopstick: ActorRef[ChopstickMessage]
- ): Behavior[Command] = {
+ chopstickToWaitFor: ActorRef[ChopstickMessage],
+ takenChopstick: ActorRef[ChopstickMessage]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case HandleChopstickAnswer(Taken(`chopstickToWaitFor`)) =>
ctx.log.info(
"{} has picked up {} and {} and starts to eat",
name,
left.path.name,
- right.path.name
- )
+ right.path.name)
startEating(ctx, 5.seconds)
case HandleChopstickAnswer(Busy(`chopstickToWaitFor`)) =>
@@ -146,8 +143,8 @@ class Hakker(ctx: ActorContext[Command],
}
}
- //When a hakker is eating, he can decide to start to think,
- //then he puts down his chopsticks and starts to think
+ // When a hakker is eating, he can decide to start to think,
+ // then he puts down his chopsticks and starts to think
private lazy val eating: Behavior[Command] = {
Behaviors.receiveMessagePartial {
case Think =>
@@ -158,9 +155,9 @@ class Hakker(ctx: ActorContext[Command],
}
}
- //When the results of the other grab comes back,
- //he needs to put it back if he got the other one.
- //Then go back and think and try to grab the chopsticks again
+ // When the results of the other grab comes back,
+ // he needs to put it back if he got the other one.
+ // Then go back and think and try to grab the chopsticks again
private lazy val firstChopstickDenied: Behavior[Command] = {
Behaviors.receiveMessagePartial {
case HandleChopstickAnswer(Taken(chopstick)) =>
@@ -172,7 +169,7 @@ class Hakker(ctx: ActorContext[Command],
}
private def startThinking(ctx: ActorContext[Command],
- duration: FiniteDuration) = {
+ duration: FiniteDuration) = {
Behaviors.withTimers[Command] { timers =>
timers.startSingleTimer(Eat, Eat, duration)
thinking
@@ -180,7 +177,7 @@ class Hakker(ctx: ActorContext[Command],
}
private def startEating(ctx: ActorContext[Command],
- duration: FiniteDuration) = {
+ duration: FiniteDuration) = {
Behaviors.withTimers[Command] { timers =>
timers.startSingleTimer(Think, Think, duration)
eating
@@ -191,17 +188,16 @@ class Hakker(ctx: ActorContext[Command],
object DiningHakkers {
def apply(): Behavior[NotUsed] = Behaviors.setup { context =>
- //Create 5 chopsticks
+ // Create 5 chopsticks
val chopsticks = for (i <- 1 to 5)
yield context.spawn(Chopstick(), "Chopstick" + i)
- //Create 5 awesome hakkers and assign them their left and right chopstick
+ // Create 5 awesome hakkers and assign them their left and right chopstick
val hakkers = for {
(name, i) <- List("Ghosh", "Boner", "Klang", "Krasser", "Manie").zipWithIndex
- } yield
- context.spawn(Hakker(name, chopsticks(i), chopsticks((i + 1) % 5)), name)
+ } yield context.spawn(Hakker(name, chopsticks(i), chopsticks((i + 1) % 5)), name)
- //Signal all hakkers that they should start thinking, and watch the show
+ // Signal all hakkers that they should start thinking, and watch the show
hakkers.foreach(_ ! Hakker.Think)
Behaviors.empty
diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 97ad3e1..c720c1c 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -7,7 +7,7 @@ val LogbackVersion = "1.2.11"
ThisBuild / scalaVersion := "2.13.8"
ThisBuild / organization := "com.lightbend.akka.samples"
-ThisBuild / Compile/ scalacOptions ++= Seq(
+ThisBuild / Compile / scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-unchecked",
@@ -18,8 +18,7 @@ ThisBuild / Test / testOptions += Tests.Argument("-oDF")
ThisBuild / licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
ThisBuild / resolvers ++= Seq(
"Akka Snapshots" at "https://repo.akka.io/snapshots",
- Resolver.bintrayRepo("akka", "snapshots")
-)
+ Resolver.bintrayRepo("akka", "snapshots"))
Global / cancelable := true // ctrl-c
@@ -39,33 +38,33 @@ lazy val client = project
.enablePlugins(AkkaGrpcPlugin, JavaAgent)
.settings(
libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "com.typesafe.akka" %% "akka-discovery" % AkkaVersion))
+ "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-discovery" % AkkaVersion))
lazy val processor = project
.in(file("processor"))
.enablePlugins(AkkaGrpcPlugin, JavaAgent)
.settings(javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test")
.settings(libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream-kafka-cluster-sharding" % AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
- "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
- "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
- "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
- "com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
- "com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
- "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
- "ch.qos.logback" % "logback-classic" % LogbackVersion,
- "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
- "org.scalatest" %% "scalatest" % "3.0.8" % Test))
+ "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+ "com.typesafe.akka" %% "akka-stream-kafka-cluster-sharding" % AlpakkaKafkaVersion,
+ "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
+ "com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
+ "com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
+ "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
+ "ch.qos.logback" % "logback-classic" % LogbackVersion,
+ "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test))
lazy val producer = project
.in(file("producer"))
.settings(Compile / PB.targets := Seq(scalapb.gen() -> (Compile / sourceManaged).value))
.settings(libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
- "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.11",
- "org.scalatest" %% "scalatest" % "3.0.8" % Test))
+ "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+ "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
+ "ch.qos.logback" % "logback-classic" % "1.2.11",
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test))
diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
index 7e5c81c..f1eb3b9 100644
--- a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
@@ -1,6 +1,6 @@
package sample.sharding.embeddedkafka
-import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import net.manub.embeddedkafka.{ EmbeddedKafka, EmbeddedKafkaConfig }
import org.slf4j.LoggerFactory
object KafkaBroker extends App with EmbeddedKafka {
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 9353e73..2f5c2f3 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -1,17 +1,17 @@
package sample.sharding.kafka
import akka.actor.typed.scaladsl.adapter._
-import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
-import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Terminated}
-import akka.cluster.typed.{Cluster, SelfUp, Subscribe}
+import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
+import akka.cluster.typed.{ Cluster, SelfUp, Subscribe }
import akka.http.scaladsl._
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
+import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.management.scaladsl.AkkaManagement
import akka.stream.Materializer
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
sealed trait Command
case object NodeMemberUp extends Command
@@ -37,20 +37,21 @@ object Main {
def init(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
ActorSystem(Behaviors.setup[Command] {
- ctx =>
- AkkaManagement(ctx.system.toClassic).start()
- val cluster = Cluster(ctx.system)
- val upAdapter = ctx.messageAdapter[SelfUp](_ => NodeMemberUp)
- cluster.subscriptions ! Subscribe(upAdapter, classOf[SelfUp])
- val settings = ProcessorSettings("kafka-to-sharding-processor", ctx.system.toClassic)
- ctx.pipeToSelf(UserEvents.init(ctx.system, settings)) {
- case Success(extractor) => ShardingStarted(extractor)
- case Failure(ex) => throw ex
- }
- starting(ctx, None, joinedCluster = false, settings)
- }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+ ctx =>
+ AkkaManagement(ctx.system.toClassic).start()
+ val cluster = Cluster(ctx.system)
+ val upAdapter = ctx.messageAdapter[SelfUp](_ => NodeMemberUp)
+ cluster.subscriptions ! Subscribe(upAdapter, classOf[SelfUp])
+ val settings = ProcessorSettings("kafka-to-sharding-processor", ctx.system.toClassic)
+ ctx.pipeToSelf(UserEvents.init(ctx.system, settings)) {
+ case Success(extractor) => ShardingStarted(extractor)
+ case Failure(ex) => throw ex
+ }
+ starting(ctx, None, joinedCluster = false, settings)
+ }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
- def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings): Behavior[Command] = {
+ def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings)
+ : Behavior[Command] = {
import ctx.executionContext
ctx.log.info("Sharding started and joined cluster. Starting event processor")
val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, settings), "kafka-event-processor")
@@ -63,7 +64,8 @@ object Main {
running(ctx, binding, eventProcessor)
}
- def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors
+ def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean,
+ settings: ProcessorSettings): Behavior[Command] = Behaviors
.receive[Command] {
case (ctx, ShardingStarted(region)) if joinedCluster =>
ctx.log.info("Sharding has started")
@@ -74,12 +76,13 @@ object Main {
case (ctx, NodeMemberUp) if sharding.isDefined =>
ctx.log.info("Member has joined the cluster")
start(ctx, sharding.get, settings)
- case (_, NodeMemberUp) =>
+ case (_, NodeMemberUp) =>
ctx.log.info("Member has joined the cluster")
starting(ctx, sharding, joinedCluster = true, settings)
}
- def running(ctx: ActorContext[Command], binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] =
+ def running(ctx: ActorContext[Command], binding: Future[Http.ServerBinding], processor: ActorRef[Nothing])
+ : Behavior[Command] =
Behaviors.receiveMessagePartial[Command] {
case BindingFailed(t) =>
ctx.log.error("Failed to bind front end", t)
@@ -91,8 +94,8 @@ object Main {
Behaviors.stopped
}
-
- def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = {
+ def startGrpc(
+ system: ActorSystem[_], frontEndPort: Int, region: ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = {
val mat = Materializer.createMaterializer(system.toClassic)
val service: HttpRequest => Future[HttpResponse] =
UserServiceHandler(new UserGrpcService(system, region))(mat, system.toClassic)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
index a47bc75..e5796b1 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
@@ -5,7 +5,7 @@ import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.kafka.ConsumerSettings
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
+import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, StringDeserializer }
import scala.jdk.CollectionConverters._
import scala.concurrent.duration._
@@ -18,12 +18,12 @@ case object ProcessorSettings {
config.getStringList("topics").asScala.toList,
config.getString("group"),
Timeout.create(config.getDuration("ask-timeout")),
- system: ActorSystem
- )
+ system: ActorSystem)
}
}
-final class ProcessorSettings(val bootstrapServers: String, val topics: List[String], val groupId: String, val askTimeout: Timeout, val system: ActorSystem) {
+final class ProcessorSettings(val bootstrapServers: String, val topics: List[String], val groupId: String,
+ val askTimeout: Timeout, val system: ActorSystem) {
def kafkaConsumerSettings(): ConsumerSettings[String, Array[Byte]] = {
ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrapServers)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 2865705..db29080 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -2,9 +2,9 @@ package sample.sharding.kafka
import akka.Done
import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
+import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity}
+import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity }
import akka.kafka.cluster.sharding.KafkaClusterSharding
import scala.concurrent.Future
@@ -17,8 +17,7 @@ object UserEvents {
timeout = 10.seconds,
topic = settings.topics.head,
entityIdExtractor = (msg: Command) => msg.userId,
- settings = settings.kafkaConsumerSettings()
- ).map(messageExtractor => {
+ settings = settings.kafkaConsumerSettings()).map(messageExtractor => {
system.log.info("Message extractor created. Initializing sharding")
ClusterSharding(system).init(
Entity(settings.entityTypeKey)(createBehavior = _ => UserEvents())
@@ -30,7 +29,8 @@ object UserEvents {
sealed trait Command extends CborSerializable {
def userId: String
}
- final case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) extends Command
+ final case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long,
+ replyTo: ActorRef[Done]) extends Command
final case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends Command
// state
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index f2efcec..4fe5f48 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -12,7 +12,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
-import akka.actor.typed.{ActorSystem => TypedActorSystem}
+import akka.actor.typed.{ ActorSystem => TypedActorSystem }
import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.kafka.scaladsl.Committer
import akka.kafka.scaladsl.Consumer
@@ -49,9 +49,8 @@ object UserEventsKafkaProcessor {
.narrow
}
-
- private def startConsumingFromTopic(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings)
- (implicit actorSystem: TypedActorSystem[_]): Future[Done] = {
+ private def startConsumingFromTopic(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings)(
+ implicit actorSystem: TypedActorSystem[_]): Future[Done] = {
implicit val ec: ExecutionContext = actorSystem.executionContext
implicit val scheduler: Scheduler = actorSystem.toClassic.scheduler
@@ -63,24 +62,23 @@ object UserEventsKafkaProcessor {
.topics(processorSettings.topics: _*)
.withRebalanceListener(rebalanceListener.toClassic)
-
Consumer.sourceWithOffsetContext(processorSettings.kafkaConsumerSettings(), subscription)
// MapAsync and Retries can be replaced by reliable delivery
.mapAsync(20) { record =>
logger.info(s"user id consumed kafka partition ${record.key()}->${record.partition()}")
- retry(() =>
- shardRegion.ask[Done](replyTo => {
- val purchaseProto = UserPurchaseProto.parseFrom(record.value())
- UserEvents.UserPurchase(
- purchaseProto.userId,
- purchaseProto.product,
- purchaseProto.quantity,
- purchaseProto.price,
- replyTo)
- })(processorSettings.askTimeout, actorSystem.scheduler),
+ retry(
+ () =>
+ shardRegion.ask[Done](replyTo => {
+ val purchaseProto = UserPurchaseProto.parseFrom(record.value())
+ UserEvents.UserPurchase(
+ purchaseProto.userId,
+ purchaseProto.product,
+ purchaseProto.quantity,
+ purchaseProto.price,
+ replyTo)
+ })(processorSettings.askTimeout, actorSystem.scheduler),
attempts = 5,
- delay = 1.second
- )
+ delay = 1.second)
}
.runWith(Committer.sinkWithOffsetContext(CommitterSettings(classic)))
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index 2e947c6..cf96a47 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -1,12 +1,12 @@
package sample.sharding.kafka
import akka.actor.typed.scaladsl.AskPattern._
-import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
+import akka.actor.typed.{ ActorRef, ActorSystem, Scheduler }
import akka.util.Timeout
-import sample.sharding.kafka.UserEvents.{GetRunningTotal, Command, RunningTotal}
+import sample.sharding.kafka.UserEvents.{ Command, GetRunningTotal, RunningTotal }
import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.concurrent.{ ExecutionContextExecutor, Future }
class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Command]) extends UserService {
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index a20418c..872f28a 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -8,7 +8,7 @@ import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
+import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }
import sample.sharding.kafka.serialization.user_events.UserPurchaseProto
import scala.concurrent.Future
diff --git a/akka-sample-persistence-dc-scala/build.sbt b/akka-sample-persistence-dc-scala/build.sbt
index f987f22..ea2b402 100644
--- a/akka-sample-persistence-dc-scala/build.sbt
+++ b/akka-sample-persistence-dc-scala/build.sbt
@@ -22,13 +22,10 @@ libraryDependencies ++= Seq(
"com.lightbend.akka.management" %% "akka-management" % AkkaClusterManagementVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaClusterManagementVersion,
"com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
-
"ch.qos.logback" % "logback-classic" % "1.2.11",
-
"com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % AkkaPersistenceCassandraVersion,
"com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test,
- "org.scalatest" %% "scalatest" % "3.0.8" % Test
-)
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test)
// transitive dependency of akka 2.5x that is brought in by addons but evicted
dependencyOverrides += "com.typesafe.akka" %% "akka-protobuf" % AkkaVersion
diff --git a/akka-sample-persistence-dc-scala/project/plugins.sbt b/akka-sample-persistence-dc-scala/project/plugins.sbt
index 43732f7..a7f83b3 100644
--- a/akka-sample-persistence-dc-scala/project/plugins.sbt
+++ b/akka-sample-persistence-dc-scala/project/plugins.sbt
@@ -1,2 +1 @@
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")
-
diff --git a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/MainApp.scala b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/MainApp.scala
index 3dba940..391071b 100644
--- a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/MainApp.scala
+++ b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/MainApp.scala
@@ -5,17 +5,17 @@ import java.util.concurrent.CountDownLatch
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.typed.{ReplicatedSharding, ReplicatedShardingExtension}
+import akka.cluster.sharding.typed.{ ReplicatedSharding, ReplicatedShardingExtension }
import akka.http.scaladsl.Http
import akka.management.scaladsl.AkkaManagement
import akka.persistence.cassandra.testkit.CassandraLauncher
import akka.persistence.typed.ReplicaId
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ Config, ConfigFactory }
import sample.persistence.res.bank.BankAccount
-import sample.persistence.res.counter.{ThumbsUpCounter, ThumbsUpHttp}
+import sample.persistence.res.counter.{ ThumbsUpCounter, ThumbsUpHttp }
import scala.concurrent.ExecutionContext
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
object MainApp {
@@ -48,24 +48,27 @@ object MainApp {
}
def startNode(port: Int, dc: String): Unit = {
- implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty[Nothing], "ClusterSystem", config(port, dc))
+ implicit val system: ActorSystem[Nothing] =
+ ActorSystem[Nothing](Behaviors.empty[Nothing], "ClusterSystem", config(port, dc))
implicit val ec: ExecutionContext = system.executionContext
val replicatedSharding = ReplicatedShardingExtension(system)
- val thumbsUpReplicatedSharding: ReplicatedSharding[ThumbsUpCounter.Command] = replicatedSharding .init(ThumbsUpCounter.Provider)
+ val thumbsUpReplicatedSharding: ReplicatedSharding[ThumbsUpCounter.Command] =
+ replicatedSharding.init(ThumbsUpCounter.Provider)
// no HTTP end points for them, just showing that multiple replicated sharding instances can be started
- val bankAccountReplicatedSharding: ReplicatedSharding[BankAccount.Command] = replicatedSharding.init(BankAccount.Provider)
+ val bankAccountReplicatedSharding: ReplicatedSharding[BankAccount.Command] =
+ replicatedSharding.init(BankAccount.Provider)
if (port != 0) {
val httpHost = "0.0.0.0"
- val httpPort = 20000+port
+ val httpPort = 20000 + port
Http().newServerAt(httpHost, httpPort)
.bind(ThumbsUpHttp.route(ReplicaId(dc), thumbsUpReplicatedSharding))
.onComplete {
- case Success(_) => system.log.info("HTTP Server bound to http://{}:{}", httpHost, httpPort)
- case Failure(ex) => system.log.error(s"Failed to bind HTTP Server to http://$httpHost:$httpPort", ex)
- }
+ case Success(_) => system.log.info("HTTP Server bound to http://{}:{}", httpHost, httpPort)
+ case Failure(ex) => system.log.error(s"Failed to bind HTTP Server to http://$httpHost:$httpPort", ex)
+ }
AkkaManagement(system).start()
}
diff --git a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/auction/Auction.scala b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/auction/Auction.scala
index 9681b27..12817e4 100644
--- a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/auction/Auction.scala
+++ b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/auction/Auction.scala
@@ -182,7 +182,6 @@ object Auction {
* When the responsible replicas has seen all `AuctionFinished` events from other DCs
* all other events have also been propagated and it can persist `WinnerDecided` and
* the auction is finally `Closed`.
- *
*/
private sealed trait AuctionPhase
diff --git a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/bank/BankAccount.scala b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/bank/BankAccount.scala
index 68f63a2..e5424f3 100644
--- a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/bank/BankAccount.scala
+++ b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/bank/BankAccount.scala
@@ -1,15 +1,15 @@
package sample.persistence.res.bank
import akka.Done
-import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
-import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import akka.actor.typed.{ ActorRef, Behavior }
import akka.cluster.sharding.typed.ReplicatedEntityProvider
import akka.pattern.StatusReply
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.typed.scaladsl.ReplicationContext
-import akka.persistence.typed.{ReplicaId, ReplicationId}
-import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, ReplicatedEventSourcing}
-import sample.persistence.res.{CborSerializable, MainApp}
+import akka.persistence.typed.{ ReplicaId, ReplicationId }
+import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
+import sample.persistence.res.{ CborSerializable, MainApp }
object BankAccount {
@@ -33,14 +33,12 @@ object BankAccount {
}
}
-
def apply(replicationId: ReplicationId): Behavior[Command] = {
Behaviors.setup[Command] { context =>
ReplicatedEventSourcing.commonJournalConfig(
replicationId,
MainApp.AllReplicas,
- CassandraReadJournal.Identifier,
- )(replicationContext => eventSourcedBehavior(replicationContext, context))
+ CassandraReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, context))
}
}
@@ -48,7 +46,8 @@ object BankAccount {
val Provider: ReplicatedEntityProvider[Command] = ReplicatedEntityProvider
.perDataCenter("account", MainApp.AllReplicas) { replicationId => BankAccount(replicationId) }
- private def eventSourcedBehavior(replicationContext: ReplicationContext, context: ActorContext[Command]): EventSourcedBehavior[Command, Event, State] =
+ private def eventSourcedBehavior(replicationContext: ReplicationContext, context: ActorContext[Command])
+ : EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior[Command, Event, State](
replicationContext.persistenceId,
State(0),
@@ -57,20 +56,19 @@ object BankAccount {
val newState = state.applyOperation(event)
detectOverdrawn(newState, replicationContext, context)
newState
- }
- )
+ })
private def commandHandler(state: State, command: Command): Effect[Event, State] = command match {
case Deposit(amount, ack) =>
Effect.persist(Deposited(amount)).thenRun(_ => ack ! StatusReply.ack())
case Withdraw(amount, ack) =>
if (state.balance - amount >= 0) {
- Effect.persist(Withdrawn(amount)).thenRun(_ => ack ! StatusReply.ack())
+ Effect.persist(Withdrawn(amount)).thenRun(_ => ack ! StatusReply.ack())
} else {
Effect.none.thenRun(_ => ack ! StatusReply.error("insufficient funds"))
}
case GetBalance(replyTo) =>
- Effect.none.thenRun(_ =>replyTo ! state.balance)
+ Effect.none.thenRun(_ => replyTo ! state.balance)
case AlertOverdrawn(amount) =>
Effect.persist(Overdrawn(amount))
}
@@ -78,9 +76,9 @@ object BankAccount {
/**
* Here we trigger events based on replicated events
*/
- def detectOverdrawn(state: BankAccount.State, replicationContext: ReplicationContext, context: ActorContext[Command]): Unit = {
- if (
- replicationContext.concurrent // this event happened concurrently with other events already processed
+ def detectOverdrawn(
+ state: BankAccount.State, replicationContext: ReplicationContext, context: ActorContext[Command]): Unit = {
+ if (replicationContext.concurrent // this event happened concurrently with other events already processed
&& replicationContext.replicaId == ReplicaId("eu-central") // if we only want to do the side effect in a single DC
&& !replicationContext.recoveryRunning // probably want to avoid re-execution of side effects during recovery
) {
@@ -93,24 +91,3 @@ object BankAccount {
}
}
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpCounter.scala b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpCounter.scala
index d29448e..c18c6b4 100644
--- a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpCounter.scala
+++ b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpCounter.scala
@@ -1,12 +1,12 @@
package sample.persistence.res.counter
import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.{ ActorRef, Behavior }
import akka.cluster.sharding.typed.ReplicatedEntityProvider
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, ReplicatedEventSourcing}
-import akka.persistence.typed.{ReplicaId, ReplicationId}
-import sample.persistence.res.{CborSerializable, MainApp}
+import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
+import akka.persistence.typed.{ ReplicaId, ReplicationId }
+import sample.persistence.res.{ CborSerializable, MainApp }
object ThumbsUpCounter {
@@ -28,33 +28,38 @@ object ThumbsUpCounter {
def add(userId: String): State = copy(users + userId)
}
- val Provider: ReplicatedEntityProvider[Command] = ReplicatedEntityProvider.perDataCenter("counter", MainApp.AllReplicas) { replicationId => ThumbsUpCounter(replicationId) }
+ val Provider: ReplicatedEntityProvider[Command] =
+ ReplicatedEntityProvider.perDataCenter("counter", MainApp.AllReplicas) { replicationId =>
+ ThumbsUpCounter(replicationId)
+ }
// we use a shared journal as cassandra typically spans DCs rather than a DB per replica
def apply(replicationId: ReplicationId): Behavior[Command] =
Behaviors.setup { ctx =>
- ReplicatedEventSourcing.commonJournalConfig(replicationId, MainApp.AllReplicas, CassandraReadJournal.Identifier) { replicationContext =>
- EventSourcedBehavior[Command, Event, State](
- persistenceId = replicationId.persistenceId,
- emptyState = State(Set.empty),
- commandHandler = (state, cmd) => cmd match {
- case GiveThumbsUp(_, userId, replyTo) =>
- Effect.persist(GaveThumbsUp(userId)).thenRun { state2 =>
- ctx.log.info("Thumbs-up by {}, total count {}", userId, state2.users.size)
- replyTo ! state2.users.size
- }
- case GetCount(_, replyTo) =>
- replyTo ! state.users.size
- Effect.none
- case GetUsers(_, replyTo) =>
- replyTo ! state
- Effect.none
- },
- eventHandler = (state, event) => event match {
- case GaveThumbsUp(userId) =>
- state.add(userId)
- }
- )
+ ReplicatedEventSourcing.commonJournalConfig(replicationId, MainApp.AllReplicas, CassandraReadJournal.Identifier) {
+ replicationContext =>
+ EventSourcedBehavior[Command, Event, State](
+ persistenceId = replicationId.persistenceId,
+ emptyState = State(Set.empty),
+ commandHandler = (state, cmd) =>
+ cmd match {
+ case GiveThumbsUp(_, userId, replyTo) =>
+ Effect.persist(GaveThumbsUp(userId)).thenRun { state2 =>
+ ctx.log.info("Thumbs-up by {}, total count {}", userId, state2.users.size)
+ replyTo ! state2.users.size
+ }
+ case GetCount(_, replyTo) =>
+ replyTo ! state.users.size
+ Effect.none
+ case GetUsers(_, replyTo) =>
+ replyTo ! state
+ Effect.none
+ },
+ eventHandler = (state, event) =>
+ event match {
+ case GaveThumbsUp(userId) =>
+ state.add(userId)
+ })
}
}
}
diff --git a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpHttp.scala b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpHttp.scala
index 70a4bab..9461ad4 100644
--- a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpHttp.scala
+++ b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/counter/ThumbsUpHttp.scala
@@ -3,20 +3,21 @@ package sample.persistence.res.counter
import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.ReplicatedSharding
import akka.http.scaladsl.Http
-import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.model.{ ContentTypes, HttpEntity, HttpResponse, StatusCodes }
import akka.http.scaladsl.server.Route
import akka.persistence.typed.ReplicaId
import akka.stream.scaladsl.Source
-import akka.util.{ByteString, Timeout}
+import akka.util.{ ByteString, Timeout }
import sample.persistence.res.counter.ThumbsUpCounter.State
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
object ThumbsUpHttp {
- def route(selfReplica: ReplicaId, res: ReplicatedSharding[ThumbsUpCounter.Command])(implicit system: ActorSystem[_]): Route = {
+ def route(selfReplica: ReplicaId, res: ReplicatedSharding[ThumbsUpCounter.Command])(
+ implicit system: ActorSystem[_]): Route = {
import akka.http.scaladsl.server.Directives._
@@ -28,7 +29,8 @@ object ThumbsUpHttp {
// example: curl http://127.0.0.1:22551/thumbs-up/a
get {
path(Segment) { resourceId =>
- onComplete(res.entityRefsFor(resourceId)(selfReplica).ask[State](replyTo => ThumbsUpCounter.GetUsers(resourceId, replyTo))) {
+ onComplete(res.entityRefsFor(resourceId)(selfReplica).ask[State](replyTo =>
+ ThumbsUpCounter.GetUsers(resourceId, replyTo))) {
case Success(state) =>
val s = Source.fromIterator(() => state.users.iterator)
.intersperse("\n")
@@ -41,15 +43,14 @@ object ThumbsUpHttp {
// example: curl -X POST http://127.0.0.1:22551/thumbs-up/a/u1
post {
path(Segment / Segment) { (resourceId, userId) =>
- onComplete(res.entityRefsFor(resourceId)(selfReplica).ask[Long](replyTo => ThumbsUpCounter.GiveThumbsUp(resourceId, userId, replyTo))) {
- case Success(i) => complete(i.toString)
+ onComplete(res.entityRefsFor(resourceId)(selfReplica).ask[Long](replyTo =>
+ ThumbsUpCounter.GiveThumbsUp(resourceId, userId, replyTo))) {
+ case Success(i) => complete(i.toString)
case Failure(ex) => complete(StatusCodes.BadRequest, ex.toString)
}
}
- }
- )
+ })
}
-
}
}
diff --git a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/movielist/MovieWatchList.scala b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/movielist/MovieWatchList.scala
index 1a3d895..cc6e715 100644
--- a/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/movielist/MovieWatchList.scala
+++ b/akka-sample-persistence-dc-scala/src/main/scala/sample/persistence/res/movielist/MovieWatchList.scala
@@ -1,10 +1,10 @@
package sample.persistence.res.movielist
-import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal
-import akka.persistence.typed.{PersistenceId, ReplicaId, ReplicationId}
+import akka.persistence.typed.{ PersistenceId, ReplicaId, ReplicationId }
import akka.persistence.typed.crdt.ORSet
-import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, ReplicatedEventSourcing}
+import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing }
import sample.persistence.res.MainApp
/**
@@ -25,11 +25,12 @@ object MovieWatchList {
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("movies", entityId, replicaId),
MainApp.AllReplicas,
- CassandraReadJournal.Identifier
- )(replicationContext => eventSourcedBehavior(replicaId, replicationContext.persistenceId))
+ CassandraReadJournal.Identifier)(replicationContext =>
+ eventSourcedBehavior(replicaId, replicationContext.persistenceId))
}
- private def eventSourcedBehavior(replicaId: ReplicaId, persistenceId: PersistenceId): EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]] =
+ private def eventSourcedBehavior(
+ replicaId: ReplicaId, persistenceId: PersistenceId): EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]] =
EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]](
persistenceId,
ORSet.empty(replicaId),
@@ -51,14 +52,3 @@ object MovieWatchList {
}
}
-
-
-
-
-
-
-
-
-
-
-
diff --git a/akka-sample-persistence-scala/build.sbt b/akka-sample-persistence-scala/build.sbt
index 962125b..b39bad6 100644
--- a/akka-sample-persistence-scala/build.sbt
+++ b/akka-sample-persistence-scala/build.sbt
@@ -9,8 +9,7 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.2.11",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
- "org.scalatest" %% "scalatest" % "3.1.0" % Test
-)
+ "org.scalatest" %% "scalatest" % "3.1.0" % Test)
Compile / scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
diff --git a/akka-sample-persistence-scala/src/main/scala/sample/persistence/CborSerializable.scala b/akka-sample-persistence-scala/src/main/scala/sample/persistence/CborSerializable.scala
index 15b5bb9..17ffd81 100644
--- a/akka-sample-persistence-scala/src/main/scala/sample/persistence/CborSerializable.scala
+++ b/akka-sample-persistence-scala/src/main/scala/sample/persistence/CborSerializable.scala
@@ -1,6 +1,6 @@
package sample.persistence
/**
- * Marker trait for serialization with Jackson CBOR
- */
+ * Marker trait for serialization with Jackson CBOR
+ */
trait CborSerializable
diff --git a/akka-sample-persistence-scala/src/main/scala/sample/persistence/ShoppingCart.scala b/akka-sample-persistence-scala/src/main/scala/sample/persistence/ShoppingCart.scala
index e888152..2e76149 100644
--- a/akka-sample-persistence-scala/src/main/scala/sample/persistence/ShoppingCart.scala
+++ b/akka-sample-persistence-scala/src/main/scala/sample/persistence/ShoppingCart.scala
@@ -83,7 +83,8 @@ object ShoppingCart {
/**
* A command to adjust the quantity of an item in the cart.
*/
- final case class AdjustItemQuantity(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command
+ final case class AdjustItemQuantity(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]])
+ extends Command
/**
* A command to checkout the shopping cart.
@@ -120,7 +121,7 @@ object ShoppingCart {
PersistenceId("ShoppingCart", cartId),
State.empty,
(state, command) =>
- //The shopping cart behavior changes if it's checked out or not.
+ // The shopping cart behavior changes if it's checked out or not.
// The commands are handled differently for each case.
if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command)
else openShoppingCart(cartId, state, command),
@@ -146,7 +147,8 @@ object ShoppingCart {
case RemoveItem(itemId, replyTo) =>
if (state.hasItem(itemId)) {
- Effect.persist(ItemRemoved(cartId, itemId)).thenRun(updatedCart => replyTo ! StatusReply.Success(updatedCart.toSummary))
+ Effect.persist(ItemRemoved(cartId, itemId)).thenRun(updatedCart =>
+ replyTo ! StatusReply.Success(updatedCart.toSummary))
} else {
replyTo ! StatusReply.Success(state.toSummary) // removing an item is idempotent
Effect.none
diff --git a/akka-sample-persistence-scala/src/test/scala/sample/persistence/ShoppingCartSpec.scala b/akka-sample-persistence-scala/src/test/scala/sample/persistence/ShoppingCartSpec.scala
index b64e614..16789b0 100644
--- a/akka-sample-persistence-scala/src/test/scala/sample/persistence/ShoppingCartSpec.scala
+++ b/akka-sample-persistence-scala/src/test/scala/sample/persistence/ShoppingCartSpec.scala
@@ -31,16 +31,16 @@ class ShoppingCartSpec extends ScalaTestWithActorTestKit(s"""
val cart = testKit.spawn(ShoppingCart(newCartId()))
val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should === (true)
+ probe.receiveMessage().isSuccess should ===(true)
cart ! ShoppingCart.AddItem("foo", 13, probe.ref)
- probe.receiveMessage().isError should === (true)
+ probe.receiveMessage().isError should ===(true)
}
"remove item" in {
val cart = testKit.spawn(ShoppingCart(newCartId()))
val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should === (true)
+ probe.receiveMessage().isSuccess should ===(true)
cart ! ShoppingCart.RemoveItem("foo", probe.ref)
probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map.empty, checkedOut = false)))
}
@@ -49,7 +49,7 @@ class ShoppingCartSpec extends ScalaTestWithActorTestKit(s"""
val cart = testKit.spawn(ShoppingCart(newCartId()))
val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should === (true)
+ probe.receiveMessage().isSuccess should ===(true)
cart ! ShoppingCart.AdjustItemQuantity("foo", 43, probe.ref)
probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false)))
}
@@ -58,12 +58,12 @@ class ShoppingCartSpec extends ScalaTestWithActorTestKit(s"""
val cart = testKit.spawn(ShoppingCart(newCartId()))
val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]]
cart ! ShoppingCart.AddItem("foo", 42, probe.ref)
- probe.receiveMessage().isSuccess should === (true)
+ probe.receiveMessage().isSuccess should ===(true)
cart ! ShoppingCart.Checkout(probe.ref)
probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true)))
cart ! ShoppingCart.AddItem("bar", 13, probe.ref)
- probe.receiveMessage().isError should === (true)
+ probe.receiveMessage().isError should ===(true)
}
"keep its state" in {
diff --git a/akka-sample-sharding-java/build.sbt b/akka-sample-sharding-java/build.sbt
index 918261c..55aaf8b 100644
--- a/akka-sample-sharding-java/build.sbt
+++ b/akka-sample-sharding-java/build.sbt
@@ -1,17 +1,14 @@
-
val AkkaVersion = "2.6.20"
val AkkaHttpVersion = "10.2.10"
val LogbackVersion = "1.2.11"
lazy val buildSettings = Seq(
organization := "com.lightbend.akka.samples",
- scalaVersion := "2.13.8"
-)
+ scalaVersion := "2.13.8")
lazy val commonJavacOptions = Seq(
"-Xlint:unchecked",
- "-Xlint:deprecation"
-)
+ "-Xlint:deprecation")
lazy val commonSettings = Seq(
Compile / javacOptions ++= commonJavacOptions,
@@ -19,9 +16,7 @@ lazy val commonSettings = Seq(
run / fork := false,
Global / cancelable := false,
licenses := Seq(
- ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))
- )
-)
+ ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))))
lazy val killrweather = project
.in(file("killrweather"))
@@ -35,8 +30,7 @@ lazy val killrweather = project
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-jackson" % AkkaHttpVersion,
- "ch.qos.logback" % "logback-classic" % LogbackVersion)
- )
+ "ch.qos.logback" % "logback-classic" % LogbackVersion))
lazy val `killrweather-fog` = project
.in(file("killrweather-fog"))
@@ -49,9 +43,7 @@ lazy val `killrweather-fog` = project
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
- "ch.qos.logback" % "logback-classic" % LogbackVersion
- )
- )
+ "ch.qos.logback" % "logback-classic" % LogbackVersion))
// Startup aliases for the first two seed nodes and a third, more can be started.
addCommandAlias("sharding1", "runMain sample.killrweather.KillrWeather 2551")
diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt
index 1ef37d0..fa35f40 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -1,12 +1,10 @@
-
val AkkaVersion = "2.6.20"
val AkkaHttpVersion = "10.1.11"
val LogbackVersion = "1.2.11"
lazy val buildSettings = Seq(
organization := "com.lightbend.akka.samples",
- scalaVersion := "2.13.8"
-)
+ scalaVersion := "2.13.8")
lazy val commonScalacOptions = Seq(
"-deprecation",
@@ -14,13 +12,11 @@ lazy val commonScalacOptions = Seq(
"-unchecked",
"-Xlint",
"-Ywarn-unused:imports",
- "-encoding", "UTF-8"
-)
+ "-encoding", "UTF-8")
lazy val commonJavacOptions = Seq(
"-Xlint:unchecked",
- "-Xlint:deprecation"
-)
+ "-Xlint:deprecation")
lazy val commonSettings = Seq(
Compile / scalacOptions ++= commonScalacOptions,
@@ -29,9 +25,7 @@ lazy val commonSettings = Seq(
run / fork := false,
Global / cancelable := false,
licenses := Seq(
- ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))
- )
-)
+ ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))))
lazy val killrweather = project
.in(file("killrweather"))
@@ -45,8 +39,7 @@ lazy val killrweather = project
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
- "ch.qos.logback" % "logback-classic" % LogbackVersion)
- )
+ "ch.qos.logback" % "logback-classic" % LogbackVersion))
lazy val `killrweather-fog` = project
.in(file("killrweather-fog"))
@@ -60,9 +53,7 @@ lazy val `killrweather-fog` = project
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
- "ch.qos.logback" % "logback-classic" % LogbackVersion
- )
- )
+ "ch.qos.logback" % "logback-classic" % LogbackVersion))
// Startup aliases for the first two seed nodes and a third, more can be started.
addCommandAlias("sharding1", "runMain sample.killrweather.KillrWeather 2551")
diff --git a/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/Fog.scala b/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/Fog.scala
index 4eb3917..9d5aab8 100644
--- a/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/Fog.scala
+++ b/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/Fog.scala
@@ -9,10 +9,10 @@ import com.typesafe.config.Config
import scala.concurrent.duration._
/**
- * In another terminal start the `Fog` (see Fog computing https://en.wikipedia.org/wiki/Fog_computing).
- * Starts the fog network, simulating devices and stations.
- * In the wild, each station would run its own system and be location-aware.
- */
+ * In another terminal start the `Fog` (see Fog computing https://en.wikipedia.org/wiki/Fog_computing).
+ * Starts the fog network, simulating devices and stations.
+ * In the wild, each station would run its own system and be location-aware.
+ */
object Fog {
def main(args: Array[String]): Unit = {
@@ -34,9 +34,9 @@ object Guardian {
context.spawn(
Behaviors.supervise(
- WeatherStation(wsid, settings, weatherPort)
- ).onFailure[RuntimeException](SupervisorStrategy.restartWithBackoff(1.second, 5.seconds, 0.5)),
- s"weather-station-$wsid")
+ WeatherStation(wsid, settings, weatherPort)).onFailure[RuntimeException](
+ SupervisorStrategy.restartWithBackoff(1.second, 5.seconds, 0.5)),
+ s"weather-station-$wsid")
}
Behaviors.empty
}
@@ -57,10 +57,9 @@ object FogSettings {
.requiring(_ > Duration.Zero, s"'$durationKey' must be > 0")
FogSettings(
- weatherStations = config.getInt("initial-weather-stations"),
+ weatherStations = config.getInt("initial-weather-stations"),
host = config.getString("weather-station.hostname"),
- sampleInterval = millis("weather-station.sample-interval")
- )
+ sampleInterval = millis("weather-station.sample-interval"))
}
}
diff --git a/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/WeatherStation.scala b/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/WeatherStation.scala
index e7906bc..4012b41 100644
--- a/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/WeatherStation.scala
+++ b/akka-sample-sharding-scala/killrweather-fog/src/main/scala/sample/killrweather/fog/WeatherStation.scala
@@ -15,16 +15,16 @@ import scala.util.Random
import scala.util.Success
/**
- * How many weather stations there are? Currently:
- * "well over 10,000 manned and automatic surface weather stations,
- * 1,000 upper-air stations, 7,000 ships, 100 moored and 1,000 drifting buoys,
- * hundreds of weather radars and 3,000 specially equipped commercial aircraft
- * measure key parameters of the atmosphere, land and ocean surface every day.
- * Add to these some 16 meteorological and 50 research satellites to get an idea
- * of the size of the global network for meteorological, hydrological and other
- * geophysical observations."
- * - https://public.wmo.int/en/our-mandate/what-we-do/observations
- */
+ * How many weather stations there are? Currently:
+ * "well over 10,000 manned and automatic surface weather stations,
+ * 1,000 upper-air stations, 7,000 ships, 100 moored and 1,000 drifting buoys,
+ * hundreds of weather radars and 3,000 specially equipped commercial aircraft
+ * measure key parameters of the atmosphere, land and ocean surface every day.
+ * Add to these some 16 meteorological and 50 research satellites to get an idea
+ * of the size of the global network for meteorological, hydrological and other
+ * geophysical observations."
+ * - https://public.wmo.int/en/our-mandate/what-we-do/observations
+ */
private[fog] object WeatherStation {
type WeatherStationId = String
@@ -37,12 +37,12 @@ private[fog] object WeatherStation {
/** Starts a device and it's task to initiate reading data at a scheduled rate. */
def apply(wsid: WeatherStationId, settings: FogSettings, httpPort: Int): Behavior[Command] =
Behaviors.setup(ctx =>
- new WeatherStation(ctx, wsid, settings, httpPort).running(httpPort)
- )
+ new WeatherStation(ctx, wsid, settings, httpPort).running(httpPort))
}
/** Starts a device and it's task to initiate reading data at a scheduled rate. */
-private class WeatherStation(context: ActorContext[WeatherStation.Command], wsid: WeatherStation.WeatherStationId, settings: FogSettings, httpPort: Int) {
+private class WeatherStation(context: ActorContext[WeatherStation.Command], wsid: WeatherStation.WeatherStationId,
+ settings: FogSettings, httpPort: Int) {
import WeatherStation._
private val random = new Random()
@@ -95,16 +95,13 @@ private class WeatherStation(context: ActorContext[WeatherStation.Command], wsid
val json = JsObject(
"eventTime" -> JsNumber(eventTime),
"dataType" -> JsString("temperature"),
- "value" -> JsNumber(temperature)
- )
+ "value" -> JsNumber(temperature))
val futureResponseBody: Future[String] = http.singleRequest(Post(stationUrl, json))
- .flatMap (res =>
+ .flatMap(res =>
Unmarshal(res).to[String].map(body =>
if (res.status.isSuccess()) body
- else throw new RuntimeException(s"Failed to register data: $body")
- )
- )
+ else throw new RuntimeException(s"Failed to register data: $body")))
context.pipeToSelf(futureResponseBody) {
case Success(s) => ProcessSuccess(s)
case Failure(e) => ProcessFailure(e)
diff --git a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JacksonSerializers.scala b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JacksonSerializers.scala
index 369bd1e..eb5b524 100644
--- a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JacksonSerializers.scala
+++ b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JacksonSerializers.scala
@@ -16,9 +16,9 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer
class DataTypeJsonSerializer extends StdSerializer[WeatherStation.DataType](classOf[WeatherStation.DataType]) {
override def serialize(value: WeatherStation.DataType, gen: JsonGenerator, provider: SerializerProvider): Unit = {
val strValue = value match {
- case WeatherStation.DataType.Dewpoint => "d"
+ case WeatherStation.DataType.Dewpoint => "d"
case WeatherStation.DataType.Temperature => "t"
- case WeatherStation.DataType.Pressure => "p"
+ case WeatherStation.DataType.Pressure => "p"
}
gen.writeString(strValue)
}
@@ -52,4 +52,3 @@ class FunctionJsonDeserializer extends StdDeserializer[WeatherStation.Function](
case "h" => WeatherStation.Function.HighLow
}
}
-
diff --git a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JsonFormats.scala b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JsonFormats.scala
index 31a7389..aedb890 100644
--- a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JsonFormats.scala
+++ b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/JsonFormats.scala
@@ -25,7 +25,7 @@ object JsonFormats {
case JsString(text) =>
stringToValue.get(text.toLowerCase) match {
case Some(t) => t
- case None => deserializationError(s"Possible values are ${stringToValue.keySet}, [$text] is not among them")
+ case None => deserializationError(s"Possible values are ${stringToValue.keySet}, [$text] is not among them")
}
case surprise =>
deserializationError(s"Expected a string value, got $surprise")
@@ -39,7 +39,8 @@ object JsonFormats {
implicit val dataTypeFormat: JsonFormat[WeatherStation.DataType] = SimpleEnumFormat(WeatherStation.DataType.All)
implicit val dataFormat: RootJsonFormat[WeatherStation.Data] = jsonFormat3(WeatherStation.Data)
- implicit val dataIngestedFormat: RootJsonFormat[WeatherStation.DataRecorded] = jsonFormat1(WeatherStation.DataRecorded)
+ implicit val dataIngestedFormat: RootJsonFormat[WeatherStation.DataRecorded] =
+ jsonFormat1(WeatherStation.DataRecorded)
implicit val queryWindowFormat: RootJsonFormat[WeatherStation.TimeWindow] = jsonFormat3(WeatherStation.TimeWindow)
implicit val queryStatusFormat: RootJsonFormat[WeatherStation.QueryResult] = jsonFormat5(WeatherStation.QueryResult)
diff --git a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherHttpServer.scala b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherHttpServer.scala
index 605d3d5..d4ae8d5 100644
--- a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherHttpServer.scala
+++ b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherHttpServer.scala
@@ -1,11 +1,11 @@
package sample.killrweather
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
import scala.concurrent.duration._
import akka.actor.typed.ActorSystem
import akka.actor.CoordinatedShutdown
-import akka.{Done, actor => classic}
+import akka.{ actor => classic, Done }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
@@ -27,19 +27,16 @@ private[killrweather] object WeatherHttpServer {
system.log.info(
"WeatherServer online at http://{}:{}/",
address.getHostString,
- address.getPort
- )
+ address.getPort)
shutdown.addTask(
CoordinatedShutdown.PhaseServiceRequestsDone,
- "http-graceful-terminate"
- ) { () =>
+ "http-graceful-terminate") { () =>
binding.terminate(10.seconds).map { _ =>
system.log.info(
"WeatherServer http://{}:{}/ graceful shutdown completed",
address.getHostString,
- address.getPort
- )
+ address.getPort)
Done
}
}
diff --git a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherRoutes.scala b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherRoutes.scala
index 3b04551..457b1d4 100644
--- a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherRoutes.scala
+++ b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherRoutes.scala
@@ -27,18 +27,22 @@ private[killrweather] final class WeatherRoutes(system: ActorSystem[_]) {
ref.ask(WeatherStation.Record(data, System.currentTimeMillis, _))
}
- private def query(wsid: Long, dataType: WeatherStation.DataType, function: WeatherStation.Function): Future[WeatherStation.QueryResult] = {
+ private def query(wsid: Long, dataType: WeatherStation.DataType, function: WeatherStation.Function)
+ : Future[WeatherStation.QueryResult] = {
val ref = sharding.entityRefFor(WeatherStation.TypeKey, wsid.toString)
ref.ask(WeatherStation.Query(dataType, function, _))
}
// unmarshallers for the query parameters
- private val funcsFromName = WeatherStation.Function.All.map(function => function.toString.toLowerCase -> function).toMap
- private implicit val functionTypeUnmarshaller = Unmarshaller.strict[String, WeatherStation.Function](text => funcsFromName(text.toLowerCase))
-
- private val dataTypesFromNames = WeatherStation.DataType.All.map(dataType => dataType.toString.toLowerCase -> dataType).toMap
- private implicit val dataTypeUnmarshaller = Unmarshaller.strict[String, WeatherStation.DataType](text => dataTypesFromNames(text.toLowerCase))
+ private val funcsFromName =
+ WeatherStation.Function.All.map(function => function.toString.toLowerCase -> function).toMap
+ private implicit val functionTypeUnmarshaller =
+ Unmarshaller.strict[String, WeatherStation.Function](text => funcsFromName(text.toLowerCase))
+ private val dataTypesFromNames =
+ WeatherStation.DataType.All.map(dataType => dataType.toString.toLowerCase -> dataType).toMap
+ private implicit val dataTypeUnmarshaller =
+ Unmarshaller.strict[String, WeatherStation.DataType](text => dataTypesFromNames(text.toLowerCase))
// imports needed for the routes and entity json marshalling
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
@@ -49,8 +53,9 @@ private[killrweather] final class WeatherRoutes(system: ActorSystem[_]) {
path("weather" / LongNumber) { wsid =>
concat(
get {
- parameters(("type".as[WeatherStation.DataType], "function".as[WeatherStation.Function])) { (dataType, function) =>
- complete(query(wsid, dataType, function))
+ parameters(("type".as[WeatherStation.DataType], "function".as[WeatherStation.Function])) {
+ (dataType, function) =>
+ complete(query(wsid, dataType, function))
}
},
post {
@@ -59,8 +64,7 @@ private[killrweather] final class WeatherRoutes(system: ActorSystem[_]) {
complete(StatusCodes.Accepted -> s"$performed from event time: ${data.eventTime}")
}
}
- }
- )
+ })
}
}
diff --git a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherStation.scala b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherStation.scala
index c15f39b..bd27e76 100644
--- a/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherStation.scala
+++ b/akka-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherStation.scala
@@ -36,7 +36,6 @@ private[killrweather] object WeatherStation {
WeatherStation(entityContext.entityId)
})
-
// actor commands and responses
sealed trait Command extends CborSerializable
@@ -44,8 +43,8 @@ private[killrweather] object WeatherStation {
final case class DataRecorded(wsid: String) extends CborSerializable
final case class Query(dataType: DataType, func: Function, replyTo: ActorRef[QueryResult]) extends Command
- final case class QueryResult(wsid: String, dataType: DataType, func: Function, readings: Int, value: Vector[TimeWindow]) extends CborSerializable
-
+ final case class QueryResult(wsid: String, dataType: DataType, func: Function, readings: Int,
+ value: Vector[TimeWindow]) extends CborSerializable
// small domain model for querying and storing weather data
@@ -54,6 +53,7 @@ private[killrweather] object WeatherStation {
@JsonDeserialize(using = classOf[DataTypeJsonDeserializer])
sealed trait DataType
object DataType {
+
/** Temperature in celcius */
case object Temperature extends DataType
case object Dewpoint extends DataType
@@ -82,7 +82,6 @@ private[killrweather] object WeatherStation {
final case class Data(eventTime: Long, dataType: DataType, value: Double)
final case class TimeWindow(start: Long, end: Long, value: Double)
-
def apply(wsid: String): Behavior[Command] = Behaviors.setup { context =>
context.log.info("Starting weather station {}", wsid)
@@ -99,13 +98,13 @@ private[killrweather] object WeatherStation {
val updated = values :+ data
if (context.log.isDebugEnabled) {
val averageForSameType = average(updated.filter(_.dataType == data.dataType).map(_.value))
- context.log.debugN("{} total readings from station {}, type {}, average {}, diff: processingTime - eventTime: {} ms",
+ context.log.debugN(
+ "{} total readings from station {}, type {}, average {}, diff: processingTime - eventTime: {} ms",
updated.size,
wsid,
data.dataType,
averageForSameType,
- received - data.eventTime
- )
+ received - data.eventTime)
}
replyTo ! DataRecorded(wsid)
running(context, wsid, updated) // store
@@ -132,7 +131,7 @@ private[killrweather] object WeatherStation {
.map(e => TimeWindow(e.eventTime, e.eventTime, e.value))
.get)
- }
+ }
replyTo ! QueryResult(wsid, dataType, func, valuesForType.size, queryResult)
Behaviors.same
@@ -141,4 +140,4 @@ private[killrweather] object WeatherStation {
context.log.info("Stopping, losing all recorded state for station {}", wsid)
Behaviors.same
}
-}
\ No newline at end of file
+}
diff --git a/docs-gen/build.sbt b/docs-gen/build.sbt
index f0e336c..8d33679 100644
--- a/docs-gen/build.sbt
+++ b/docs-gen/build.sbt
@@ -1,83 +1,71 @@
lazy val `akka-sample-cluster-java` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Cluster with Java",
- baseProject := "pekko-sample-cluster-java"
- )
+ name := "Akka Cluster with Java",
+ baseProject := "pekko-sample-cluster-java")
lazy val `akka-sample-cluster-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Cluster with Scala",
- baseProject := "pekko-sample-cluster-scala"
- )
+ name := "Akka Cluster with Scala",
+ baseProject := "pekko-sample-cluster-scala")
lazy val `pekko-sample-distributed-data-java` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Distributed Data with Java",
- baseProject := "pekko-sample-distributed-data-java"
- )
+ name := "Akka Distributed Data with Java",
+ baseProject := "pekko-sample-distributed-data-java")
lazy val `pekko-sample-distributed-data-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Distributed Data with Scala",
- baseProject := "pekko-sample-distributed-data-scala"
- )
+ name := "Akka Distributed Data with Scala",
+ baseProject := "pekko-sample-distributed-data-scala")
lazy val `pekko-sample-distributed-workers-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Distributed Workers with Scala",
- baseProject := "pekko-sample-distributed-workers-scala"
- )
+ name := "Akka Distributed Workers with Scala",
+ baseProject := "pekko-sample-distributed-workers-scala")
lazy val `akka-sample-fsm-java` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka FSM with Java",
- baseProject := "akka-sample-fsm-java"
- )
+ name := "Akka FSM with Java",
+ baseProject := "akka-sample-fsm-java")
lazy val `akka-sample-fsm-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka FSM with Scala",
- baseProject := "akka-sample-fsm-scala"
- )
+ name := "Akka FSM with Scala",
+ baseProject := "akka-sample-fsm-scala")
lazy val `akka-sample-persistence-java` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Persistence with Java",
- baseProject := "akka-sample-persistence-java"
- )
+ name := "Akka Persistence with Java",
+ baseProject := "akka-sample-persistence-java")
lazy val `akka-sample-persistence-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Persistence with Scala",
- baseProject := "akka-sample-persistence-scala"
- )
+ name := "Akka Persistence with Scala",
+ baseProject := "akka-sample-persistence-scala")
lazy val `akka-sample-sharding-java` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Cluster Sharding with Java",
- baseProject := "akka-sample-sharding-java"
- )
+ name := "Akka Cluster Sharding with Java",
+ baseProject := "akka-sample-sharding-java")
lazy val `akka-sample-sharding-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Cluster Sharding with Scala",
- baseProject := "akka-sample-sharding-scala"
- )
+ name := "Akka Cluster Sharding with Scala",
+ baseProject := "akka-sample-sharding-scala")
lazy val `akka-sample-kafka-to-sharding-scala` = project
.enablePlugins(AkkaSamplePlugin)
.settings(
- name := "Akka Kafka to Sharding with Scala",
- baseProject := "akka-sample-kafka-to-sharding-scala"
- )
+ name := "Akka Kafka to Sharding with Scala",
+ baseProject := "akka-sample-kafka-to-sharding-scala")
diff --git a/docs-gen/project/AkkaSamplePlugin.scala b/docs-gen/project/AkkaSamplePlugin.scala
index 106997a..2ae7ceb 100644
--- a/docs-gen/project/AkkaSamplePlugin.scala
+++ b/docs-gen/project/AkkaSamplePlugin.scala
@@ -39,10 +39,8 @@ object AkkaSamplePlugin extends sbt.AutoPlugin {
r.replaceAllIn(body,
_ match {
case r(lbl, uri) if !uri.contains("http") => s"""[$lbl](${baseUrl.value}/${baseProject.value}/$uri)"""
- case r(lbl, uri) => s"[$lbl]($uri)"
- }
- )
+ case r(lbl, uri) => s"[$lbl]($uri)"
+ })
},
- templateName := baseProject.value.replaceAll("-sample-", "-samples-")
- )
+ templateName := baseProject.value.replaceAll("-sample-", "-samples-"))
}
diff --git a/pekko-sample-cluster-client-grpc-scala/build.sbt b/pekko-sample-cluster-client-grpc-scala/build.sbt
index 1fc0e1b..534c8fe 100644
--- a/pekko-sample-cluster-client-grpc-scala/build.sbt
+++ b/pekko-sample-cluster-client-grpc-scala/build.sbt
@@ -19,8 +19,7 @@ lazy val `pekko-sample-cluster-client-grpc-scala` = project
"-feature",
"-unchecked",
"-Xlog-reflective-calls",
- "-Xlint"
- ),
+ "-Xlint"),
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
// javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime",
libraryDependencies ++= Seq(
@@ -29,7 +28,5 @@ lazy val `pekko-sample-cluster-client-grpc-scala` = project
"org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,
"org.apache.pekko" %% "pekko-discovery" % pekkoVersion,
"org.apache.pekko" %% "pekko-multi-node-testkit" % pekkoVersion % Test,
- "org.scalatest" %% "scalatest" % "3.1.1" % Test
- )
- )
+ "org.scalatest" %% "scalatest" % "3.1.1" % Test))
.configs(MultiJvm)
diff --git a/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClient.scala b/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClient.scala
index 0772009..e93df58 100644
--- a/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClient.scala
+++ b/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClient.scala
@@ -1,23 +1,22 @@
package sample.cluster.client.grpc
import org.apache.pekko.NotUsed
-import org.apache.pekko.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated}
+import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated }
import org.apache.pekko.event.LoggingAdapter
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl.Source
import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.{ ExecutionContext, Future, Promise }
object ClusterClient {
/**
- * Factory method for `ClusterClient` [[org.apache.pekko.actor.Props]].
- */
+ * Factory method for `ClusterClient` [[org.apache.pekko.actor.Props]].
+ */
def props(
- settings: ClusterClientSettings
- )(implicit materializer: Materializer): Props =
+ settings: ClusterClientSettings)(implicit materializer: Materializer): Props =
Props(new ClusterClient(settings))
sealed trait Command
@@ -26,20 +25,20 @@ object ClusterClient {
extends Command {
/**
- * Convenience constructor with `localAffinity` false
- */
+ * Convenience constructor with `localAffinity` false
+ */
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
/**
- * More efficient than `Send` for single request-reply interaction
- */
+ * More efficient than `Send` for single request-reply interaction
+ */
final case class SendAsk(path: String, msg: Any, localAffinity: Boolean)
extends Command {
/**
- * Convenience constructor with `localAffinity` false
- */
+ * Convenience constructor with `localAffinity` false
+ */
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
@@ -48,19 +47,17 @@ object ClusterClient {
final case class Publish(topic: String, msg: Any) extends Command
private def createClientStub(
- settings: ClusterClientSettings
- )(implicit sys: ActorSystem): ClusterClientReceptionistServiceClient = {
+ settings: ClusterClientSettings)(implicit sys: ActorSystem): ClusterClientReceptionistServiceClient = {
ClusterClientReceptionistServiceClient(settings.grpcClientSettings)
}
private def newSession(
- settings: ClusterClientSettings,
- receptionistServiceClient: ClusterClientReceptionistServiceClient,
- sender: ActorRef,
- killSwitch: SharedKillSwitch,
- log: LoggingAdapter,
- serialization: ClusterClientSerialization
- )(implicit mat: Materializer): Future[ActorRef] = {
+ settings: ClusterClientSettings,
+ receptionistServiceClient: ClusterClientReceptionistServiceClient,
+ sender: ActorRef,
+ killSwitch: SharedKillSwitch,
+ log: LoggingAdapter,
+ serialization: ClusterClientSerialization)(implicit mat: Materializer): Future[ActorRef] = {
val sessionReqRefPromise = Promise[ActorRef]()
log.info("New session for {}", sender)
receptionistServiceClient
@@ -72,16 +69,14 @@ object ClusterClient {
// never complete from stream element
completionMatcher = PartialFunction.empty,
// never fail from stream element
- failureMatcher = PartialFunction.empty
- )
+ failureMatcher = PartialFunction.empty)
// .actorRef[Any](bufferSize = settings.bufferSize, overflowStrategy = OverflowStrategy.dropNew)
.via(killSwitch.flow)
.map {
case send: Send =>
val payload = serialization.serializePayload(send.msg)
Req().withSend(
- SendReq(send.path, send.localAffinity, Some(payload))
- )
+ SendReq(send.path, send.localAffinity, Some(payload)))
case sendToAll: SendToAll =>
val payload = serialization.serializePayload(sendToAll.msg)
Req().withSendToAll(SendToAllReq(sendToAll.path, Some(payload)))
@@ -92,12 +87,12 @@ object ClusterClient {
.mapMaterializedValue(sessionReqRef => {
sessionReqRefPromise.success(sessionReqRef)
NotUsed
- })
- )
+ }))
.watch(sender) // end session when original sender terminates
- .recoverWithRetries(-1, {
- case _: WatchedActorTerminatedException => Source.empty
- })
+ .recoverWithRetries(-1,
+ {
+ case _: WatchedActorTerminatedException => Source.empty
+ })
.map { rsp =>
serialization.deserializePayload(rsp.payload.get)
}
@@ -110,10 +105,9 @@ object ClusterClient {
}
private def askSend(
- receptionistServiceClient: ClusterClientReceptionistServiceClient,
- send: SendAsk,
- serialization: ClusterClientSerialization
- )(implicit ec: ExecutionContext): Future[Any] = {
+ receptionistServiceClient: ClusterClientReceptionistServiceClient,
+ send: SendAsk,
+ serialization: ClusterClientSerialization)(implicit ec: ExecutionContext): Future[Any] = {
val payload = serialization.serializePayload(send.msg)
val sendReq = SendReq(send.path, send.localAffinity, Some(payload))
receptionistServiceClient.askSend(sendReq).map { rsp =>
@@ -123,54 +117,53 @@ object ClusterClient {
}
/**
- * This actor is intended to be used on an external node that is not member
- * of the cluster. It acts like a gateway for sending messages to actors
- * somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish
- * a connection to a [[ClusterClientReceptionist]] somewhere in the cluster.
- *
- * You can send messages via the `ClusterClient` to any actor in the cluster
- * that is registered in the [[ClusterClientReceptionist]].
- * Messages are wrapped in [[ClusterClient#Send]], [[ClusterClient#SendToAll]]
- * or [[ClusterClient#Publish]].
- *
- * 1. [[ClusterClient#Send]] -
- * The message will be delivered to one recipient with a matching path, if any such
- * exists. If several entries match the path the message will be delivered
- * to one random destination. The sender of the message can specify that local
- * affinity is preferred, i.e. the message is sent to an actor in the same local actor
- * system as the used receptionist actor, if any such exists, otherwise random to any other
- * matching entry.
- *
- * 2. [[ClusterClient#SendToAll]] -
- * The message will be delivered to all recipients with a matching path.
- *
- * 3. [[ClusterClient#Publish]] -
- * The message will be delivered to all recipients Actors that have been registered as subscribers to
- * to the named topic.
- *
- * Use the factory method [[ClusterClient#props]]) to create the
- * [[org.apache.pekko.actor.Props]] for the actor.
- *
- * If the receptionist is not currently available, the client will buffer the messages
- * and then deliver them when the connection to the receptionist has been established.
- * The size of the buffer is configurable and it can be disabled by using a buffer size
- * of 0. When the buffer is full old messages will be dropped when new messages are sent
- * via the client.
- *
- * Note that this is a best effort implementation: messages can always be lost due to the distributed
- * nature of the actors involved.
- */
+ * This actor is intended to be used on an external node that is not member
+ * of the cluster. It acts like a gateway for sending messages to actors
+ * somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish
+ * a connection to a [[ClusterClientReceptionist]] somewhere in the cluster.
+ *
+ * You can send messages via the `ClusterClient` to any actor in the cluster
+ * that is registered in the [[ClusterClientReceptionist]].
+ * Messages are wrapped in [[ClusterClient#Send]], [[ClusterClient#SendToAll]]
+ * or [[ClusterClient#Publish]].
+ *
+ * 1. [[ClusterClient#Send]] -
+ * The message will be delivered to one recipient with a matching path, if any such
+ * exists. If several entries match the path the message will be delivered
+ * to one random destination. The sender of the message can specify that local
+ * affinity is preferred, i.e. the message is sent to an actor in the same local actor
+ * system as the used receptionist actor, if any such exists, otherwise random to any other
+ * matching entry.
+ *
+ * 2. [[ClusterClient#SendToAll]] -
+ * The message will be delivered to all recipients with a matching path.
+ *
+ * 3. [[ClusterClient#Publish]] -
+ * The message will be delivered to all recipients Actors that have been registered as subscribers to
+ * to the named topic.
+ *
+ * Use the factory method [[ClusterClient#props]]) to create the
+ * [[org.apache.pekko.actor.Props]] for the actor.
+ *
+ * If the receptionist is not currently available, the client will buffer the messages
+ * and then deliver them when the connection to the receptionist has been established.
+ * The size of the buffer is configurable and it can be disabled by using a buffer size
+ * of 0. When the buffer is full old messages will be dropped when new messages are sent
+ * via the client.
+ *
+ * Note that this is a best effort implementation: messages can always be lost due to the distributed
+ * nature of the actors involved.
+ */
final class ClusterClient(settings: ClusterClientSettings)(
- implicit materializer: Materializer
-) extends Actor
+ implicit materializer: Materializer) extends Actor
with ActorLogging {
import ClusterClient._
val serialization = new ClusterClientSerialization(context.system)
- private val receptionistServiceClient
- : ClusterClientReceptionistServiceClient = createClientStub(settings)(context.system)
+ private val receptionistServiceClient: ClusterClientReceptionistServiceClient =
+ createClientStub(settings)(context.system)
// Original sender -> stream Source.actorRef of the session
private var sessionRef: Map[ActorRef, Future[ActorRef]] = Map.empty
@@ -200,8 +193,7 @@ final class ClusterClient(settings: ClusterClientSettings)(
originalSender,
killSwitch,
log,
- serialization
- )
+ serialization)
sessionRef = sessionRef.updated(originalSender, ses)
ses
}
@@ -220,24 +212,23 @@ final class ClusterClient(settings: ClusterClientSettings)(
object ClusterClientSettings {
/**
- * Create settings from the default configuration
- * `sample.cluster.client.grpc`.
- */
+ * Create settings from the default configuration
+ * `sample.cluster.client.grpc`.
+ */
def apply(system: ActorSystem): ClusterClientSettings = {
val config = system.settings.config.getConfig("sample.cluster.client.grpc")
val grpcClientSettings = GrpcClientSettings
- // FIXME service discovery
+ // FIXME service discovery
.connectToServiceAt("127.0.0.1", 50051)(system)
.withDeadline(3.second) // FIXME config
.withTls(false)
new ClusterClientSettings(
bufferSize = config.getInt("buffer-size"),
- grpcClientSettings
- )
+ grpcClientSettings)
}
}
final case class ClusterClientSettings(bufferSize: Int,
- grpcClientSettings: GrpcClientSettings)
+ grpcClientSettings: GrpcClientSettings)
diff --git a/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.scala b/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.scala
index 5775da2..c2e7a41 100644
--- a/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.scala
+++ b/pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.scala
@@ -6,7 +6,7 @@ import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorRef
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator
import org.apache.pekko.event.LoggingAdapter
-import org.apache.pekko.stream.{Materializer, OverflowStrategy}
+import org.apache.pekko.stream.{ Materializer, OverflowStrategy }
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.Timeout
@@ -14,12 +14,11 @@ import scala.concurrent.Future
import scala.util.control.NonFatal
class ClusterClientReceptionistGrpcImpl(
- settings: ClusterReceptionistSettings,
- pubSubMediator: ActorRef,
- serialization: ClusterClientSerialization
-)(implicit
- mat: Materializer,
- log: LoggingAdapter)
+ settings: ClusterReceptionistSettings,
+ pubSubMediator: ActorRef,
+ serialization: ClusterClientSerialization)(implicit
+ mat: Materializer,
+ log: LoggingAdapter)
extends ClusterClientReceptionistService {
override def newSession(in: Source[Req, NotUsed]): Source[Rsp, NotUsed] = {
@@ -32,43 +31,39 @@ class ClusterClientReceptionistGrpcImpl(
// never complete from stream element
completionMatcher = PartialFunction.empty,
// never fail from stream element
- failureMatcher = PartialFunction.empty
- )
+ failureMatcher = PartialFunction.empty)
.map { rsp =>
val payload = serialization.serializePayload(rsp)
Rsp(Some(payload))
}
.mapMaterializedValue { sessionRspRef =>
in.runForeach { req =>
- if (req.req.isSend) {
- val sendReq = req.getSend
- val msg = serialization.deserializePayload(sendReq.payload.get)
- // using sessionRspRef as sender so that replies are emitted to the response stream back to the client
- pubSubMediator.tell(
- DistributedPubSubMediator
- .Send(sendReq.path, msg, sendReq.localAffinity),
- sessionRspRef
- )
- } else if (req.req.isSendToAll) {
- val sendToAllReq = req.getSendToAll
- val msg =
- serialization.deserializePayload(sendToAllReq.payload.get)
- pubSubMediator.tell(
- DistributedPubSubMediator.SendToAll(sendToAllReq.path, msg),
- sessionRspRef
- )
- } else if (req.req.isPublish) {
- val publishReq = req.getPublish
- val msg = serialization.deserializePayload(publishReq.payload.get)
- pubSubMediator.tell(
- DistributedPubSubMediator.Publish(publishReq.topic, msg),
- sessionRspRef
- )
- } else {
- throw new IllegalArgumentException("Unknown request type")
- }
-
+ if (req.req.isSend) {
+ val sendReq = req.getSend
+ val msg = serialization.deserializePayload(sendReq.payload.get)
+ // using sessionRspRef as sender so that replies are emitted to the response stream back to the client
+ pubSubMediator.tell(
+ DistributedPubSubMediator
+ .Send(sendReq.path, msg, sendReq.localAffinity),
+ sessionRspRef)
+ } else if (req.req.isSendToAll) {
+ val sendToAllReq = req.getSendToAll
+ val msg =
+ serialization.deserializePayload(sendToAllReq.payload.get)
+ pubSubMediator.tell(
+ DistributedPubSubMediator.SendToAll(sendToAllReq.path, msg),
+ sessionRspRef)
+ } else if (req.req.isPublish) {
+ val publishReq = req.getPublish
+ val msg = serialization.deserializePayload(publishReq.payload.get)
+ pubSubMediator.tell(
+ DistributedPubSubMediator.Publish(publishReq.topic, msg),
+ sessionRspRef)
+ } else {
+ throw new IllegalArgumentException("Unknown request type")
}
+
+ }
.onComplete { result =>
log.info("Session [{}] completed: {}", sessionId, result)
}(mat.executionContext)
@@ -85,8 +80,7 @@ class ClusterClientReceptionistGrpcImpl(
(pubSubMediator ? DistributedPubSubMediator.Send(
sendReq.path,
msg,
- sendReq.localAffinity
- )).map { rsp =>
+ sendReq.localAffinity)).map { rsp =>
val payload = serialization.serializePayload(rsp)
Rsp(Some(payload))
}
diff --git a/pekko-sample-cluster-client-grpc-scala/src/multi-jvm/scala/sample/cluster/client/grpc/ClusterClientSpec.scala b/pekko-sample-cluster-client-grpc-scala/src/multi-jvm/scala/sample/cluster/client/grpc/ClusterClientSpec.scala
index 342bdd9..8942924 100644
--- a/pekko-sample-cluster-client-grpc-scala/src/multi-jvm/scala/sample/cluster/client/grpc/ClusterClientSpec.scala
+++ b/pekko-sample-cluster-client-grpc-scala/src/multi-jvm/scala/sample/cluster/client/grpc/ClusterClientSpec.scala
@@ -211,7 +211,7 @@ class ClusterClientSpec
def host2 = second
def host3 = third
- //#server
+ // #server
runOn(host1) {
val serviceA = system.actorOf(Props[Service], "serviceA")
ClusterClientReceptionist(system).registerService(serviceA)
@@ -221,20 +221,20 @@ class ClusterClientSpec
val serviceB = system.actorOf(Props[Service], "serviceB")
ClusterClientReceptionist(system).registerService(serviceB)
}
- //#server
+ // #server
runOn(host1, host2, host3, fourth) {
awaitCount(4)
}
enterBarrier("services-replicated")
- //#client
+ // #client
runOn(client) {
val c = system.actorOf(ClusterClient.props(ClusterClientSettings(system)), "client")
c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
c ! ClusterClient.SendToAll("/user/serviceB", "hi")
}
- //#client
+ // #client
runOn(client) {
// note that "hi" was sent to 2 "serviceB"
diff --git a/pekko-sample-cluster-java/build.sbt b/pekko-sample-cluster-java/build.sbt
index f47c7cf..a1c8f8b 100644
--- a/pekko-sample-cluster-java/build.sbt
+++ b/pekko-sample-cluster-java/build.sbt
@@ -16,17 +16,16 @@ lazy val `pekko-sample-cluster-java` = project
Compile / javacOptions ++= Seq("-parameters", "-Xlint:unchecked", "-Xlint:deprecation"),
run / javaOptions ++= Seq("-Xms128m", "-Xmx1024m", "-Djava.library.path=./target/native"),
libraryDependencies ++= Seq(
- "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
- "org.apache.pekko" %% "pekko-cluster-typed" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-cluster-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.11",
- "org.apache.pekko" %% "pekko-multi-node-testkit" % pekkoVersion % Test,
- "org.scalatest" %% "scalatest" % "3.0.8" % Test,
- "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test),
+ "ch.qos.logback" % "logback-classic" % "1.2.11",
+ "org.apache.pekko" %% "pekko-multi-node-testkit" % pekkoVersion % Test,
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test,
+ "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test),
run / fork := false,
Global / cancelable := false,
// disable parallel tests
Test / parallelExecution := false,
- licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
- )
- .configs (MultiJvm)
+ licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))))
+ .configs(MultiJvm)
diff --git a/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala
index 86540bb..e9bd38f 100644
--- a/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala
+++ b/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala
@@ -43,7 +43,7 @@ class StatsSampleSingleMasterSpecMultiJvmNode2 extends StatsSampleSingleMasterSp
class StatsSampleSingleMasterSpecMultiJvmNode3 extends StatsSampleSingleMasterSpec
abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSingleMasterSpecConfig)
- with WordSpecLike with Matchers with BeforeAndAfterAll {
+ with WordSpecLike with Matchers with BeforeAndAfterAll {
import StatsSampleSingleMasterSpecConfig._
@@ -66,7 +66,7 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
val secondAddress = node(second).address
val thirdAddress = node(third).address
- Cluster(system) join firstAddress
+ Cluster(system).join(firstAddress)
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
@@ -81,9 +81,7 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
val workersRouter = ctx.spawn(Routers.pool(2)(StatsWorker.create()), "WorkersRouter")
StatsService.create(workersRouter)
},
- "StatsService",
- ).withSettings(singletonSettings)
- )
+ "StatsService").withSettings(singletonSettings))
testConductor.enter("all-up")
}
@@ -103,4 +101,4 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
}
}
-}
\ No newline at end of file
+}
diff --git a/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
index e716e32..0ac7623 100644
--- a/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
+++ b/pekko-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
@@ -11,7 +11,6 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
-
object StatsSampleSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
// note that this is not the same thing as cluster node roles
@@ -39,8 +38,8 @@ import org.scalatest.Matchers
import org.scalatest.WordSpecLike
abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
- with WordSpecLike with Matchers with BeforeAndAfterAll
- with ImplicitSender {
+ with WordSpecLike with Matchers with BeforeAndAfterAll
+ with ImplicitSender {
import StatsSampleSpecConfig._
@@ -64,7 +63,6 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
Cluster(system).join(firstAddress)
-
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
@@ -108,4 +106,4 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
}
-}
\ No newline at end of file
+}
diff --git a/pekko-sample-cluster-scala/build.sbt b/pekko-sample-cluster-scala/build.sbt
index ceb442b..2c057c1 100644
--- a/pekko-sample-cluster-scala/build.sbt
+++ b/pekko-sample-cluster-scala/build.sbt
@@ -16,17 +16,16 @@ lazy val `pekko-sample-cluster-scala` = project
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
run / javaOptions ++= Seq("-Xms128m", "-Xmx1024m", "-Djava.library.path=./target/native"),
libraryDependencies ++= Seq(
- "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
- "org.apache.pekko" %% "pekko-cluster-typed" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
+ "org.apache.pekko" %% "pekko-cluster-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,
- "ch.qos.logback" % "logback-classic" % "1.2.11",
- "org.apache.pekko" %% "pekko-multi-node-testkit" % pekkoVersion % Test,
- "org.scalatest" %% "scalatest" % "3.0.8" % Test,
- "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test),
+ "ch.qos.logback" % "logback-classic" % "1.2.11",
+ "org.apache.pekko" %% "pekko-multi-node-testkit" % pekkoVersion % Test,
+ "org.scalatest" %% "scalatest" % "3.0.8" % Test,
+ "org.apache.pekko" %% "pekko-actor-testkit-typed" % pekkoVersion % Test),
run / fork := false,
Global / cancelable := false,
// disable parallel tests
Test / parallelExecution := false,
- licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
- )
- .configs (MultiJvm)
+ licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))))
+ .configs(MultiJvm)
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/simple/ClusterListener.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/simple/ClusterListener.scala
index 3ed3e22..76c910b 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/simple/ClusterListener.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/simple/ClusterListener.scala
@@ -49,4 +49,4 @@ object ClusterListener {
Behaviors.same
}
}
-}
\ No newline at end of file
+}
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/App.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/App.scala
index 01f817e..5d4640b 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/App.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/App.scala
@@ -26,15 +26,14 @@ object App {
.pool(numberOfWorkers)(StatsWorker().narrow[StatsWorker.Process])
// the worker has a per word cache, so send the same word to the same local worker child
.withConsistentHashingRouting(1, _.word),
- "WorkerRouter"
- )
+ "WorkerRouter")
val service = ctx.spawn(StatsService(workers), "StatsService")
// published through the receptionist to the other nodes in the cluster
ctx.system.receptionist ! Receptionist
.Register(StatsServiceKey, service)
}
- if (cluster.selfMember.hasRole(("client"))) {
+ if (cluster.selfMember.hasRole("client")) {
val serviceRouter =
ctx.spawn(Routers.group(App.StatsServiceKey), "ServiceRouter")
ctx.spawn(StatsClient(serviceRouter), "Client")
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/AppOneMaster.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/AppOneMaster.scala
index 6bb1c7e..904fb45 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/AppOneMaster.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/AppOneMaster.scala
@@ -31,13 +31,11 @@ object AppOneMaster {
.group(WorkerServiceKey)
// the worker has a per word cache, so send the same word to the same worker
.withConsistentHashingRouting(1, _.word),
- "WorkersRouter"
- )
+ "WorkersRouter")
StatsService(workersRouter)
},
- "StatsService"
- ).withStopMessage(StatsService.Stop)
+ "StatsService").withStopMessage(StatsService.Stop)
.withSettings(singletonSettings)
val serviceProxy = ClusterSingleton(ctx.system).init(serviceSingleton)
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsClient.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsClient.scala
index 6a9cf1d..2c3456f 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsClient.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsClient.scala
@@ -34,4 +34,3 @@ object StatsClient {
}
}
-
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsService.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsService.scala
index b6bd5e3..f416ad7 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsService.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsService.scala
@@ -44,12 +44,12 @@ object StatsAggregator {
private case object Timeout extends Event
private case class CalculationComplete(length: Int) extends Event
- def apply(words: Seq[String], workers: ActorRef[StatsWorker.Process], replyTo: ActorRef[StatsService.Response]): Behavior[Event] =
+ def apply(words: Seq[String], workers: ActorRef[StatsWorker.Process], replyTo: ActorRef[StatsService.Response])
+ : Behavior[Event] =
Behaviors.setup { ctx =>
ctx.setReceiveTimeout(3.seconds, Timeout)
val responseAdapter = ctx.messageAdapter[StatsWorker.Processed](processed =>
- CalculationComplete(processed.length)
- )
+ CalculationComplete(processed.length))
words.foreach { word =>
workers ! StatsWorker.Process(word, responseAdapter)
@@ -57,7 +57,8 @@ object StatsAggregator {
waiting(replyTo, words.size, Nil)
}
- private def waiting(replyTo: ActorRef[StatsService.Response], expectedResponses: Int, results: List[Int]): Behavior[Event] =
+ private def waiting(
+ replyTo: ActorRef[StatsService.Response], expectedResponses: Int, results: List[Int]): Behavior[Event] =
Behaviors.receiveMessage {
case CalculationComplete(length) =>
val newResults = results :+ length
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsWorker.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsWorker.scala
index 2a7e651..b105d9d 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsWorker.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsWorker.scala
@@ -29,7 +29,7 @@ object StatsWorker {
}
private def withCache(ctx: ActorContext[Command],
- cache: Map[String, Int]): Behavior[Command] =
+ cache: Map[String, Int]): Behavior[Command] =
Behaviors.receiveMessage {
case Process(word, replyTo) =>
ctx.log.info("Worker processing request [{}]", word)
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/App.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/App.scala
index ab0410d..06da5a1 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/App.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/App.scala
@@ -1,6 +1,6 @@
package sample.cluster.transformation
-import org.apache.pekko.actor.typed.{ActorSystem, Behavior}
+import org.apache.pekko.actor.typed.{ ActorSystem, Behavior }
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.cluster.typed.Cluster
import com.typesafe.config.ConfigFactory
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Frontend.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Frontend.scala
index 916b8fa..ae2801c 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Frontend.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Frontend.scala
@@ -20,7 +20,6 @@ object Frontend {
private final case class TransformCompleted(originalText: String, transformedText: String) extends Event
private final case class JobFailed(why: String, text: String) extends Event
-
def apply(): Behavior[Event] = Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
// subscribe to available workers
@@ -36,7 +35,8 @@ object Frontend {
}
}
- private def running(ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
+ private def running(
+ ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
Behaviors.receiveMessage {
case WorkersUpdated(newWorkers) =>
ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
@@ -53,7 +53,7 @@ object Frontend {
val text = s"hello-$jobCounter"
ctx.ask(selectedWorker, Worker.TransformText(text, _)) {
case Success(transformedText) => TransformCompleted(transformedText.text, text)
- case Failure(ex) => JobFailed("Processing timed out", text)
+ case Failure(ex) => JobFailed("Processing timed out", text)
}
running(ctx, workers, jobCounter + 1)
}
@@ -67,4 +67,4 @@ object Frontend {
}
}
-//#frontend
\ No newline at end of file
+//#frontend
diff --git a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Worker.scala b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Worker.scala
index 1ceec60..fa24c75 100644
--- a/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Worker.scala
+++ b/pekko-sample-cluster-scala/src/main/scala/sample/cluster/transformation/Worker.scala
@@ -29,4 +29,4 @@ object Worker {
}
}
}
-//#worker
\ No newline at end of file
+//#worker
diff --git a/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala
index 63e37c4..369e3cf 100644
--- a/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala
+++ b/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala
@@ -51,7 +51,7 @@ class StatsSampleSingleMasterSpecMultiJvmNode2 extends StatsSampleSingleMasterSp
class StatsSampleSingleMasterSpecMultiJvmNode3 extends StatsSampleSingleMasterSpec
abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSingleMasterSpecConfig)
- with WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender {
+ with WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender {
import StatsSampleSingleMasterSpecConfig._
@@ -74,7 +74,7 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
val secondAddress = node(second).address
val thirdAddress = node(third).address
- Cluster(system) join firstAddress
+ Cluster(system).join(firstAddress)
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
@@ -89,9 +89,7 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
val workersRouter = ctx.spawn(Routers.pool(2)(StatsWorker()), "WorkersRouter")
StatsService(workersRouter)
},
- "StatsService",
- ).withSettings(singletonSettings)
- )
+ "StatsService").withSettings(singletonSettings))
testConductor.enter("all-up")
}
diff --git a/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
index 56e4282..a9db46c 100644
--- a/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
+++ b/pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
@@ -46,8 +46,8 @@ import org.scalatest.Matchers
import org.scalatest.WordSpecLike
abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
- with WordSpecLike with Matchers with BeforeAndAfterAll
- with ImplicitSender {
+ with WordSpecLike with Matchers with BeforeAndAfterAll
+ with ImplicitSender {
import StatsSampleSpecConfig._
@@ -71,7 +71,6 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
Cluster(system).join(firstAddress)
-
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
diff --git a/pekko-sample-distributed-data-java/build.sbt b/pekko-sample-distributed-data-java/build.sbt
index c1f1bb6..9ac60d9 100644
--- a/pekko-sample-distributed-data-java/build.sbt
+++ b/pekko-sample-distributed-data-java/build.sbt
@@ -30,6 +30,5 @@ val `pekko-sample-distributed-data-java` = project
// show full stack traces and test case durations
Test / testOptions += Tests.Argument("-oDF"),
Test / logBuffered := false,
- licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
- )
- .configs (MultiJvm)
+ licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))))
+ .configs(MultiJvm)
diff --git a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala
index 09915bd..e398343 100644
--- a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala
+++ b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala
@@ -136,4 +136,3 @@ class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMult
}
}
-
diff --git a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala
index c4a4e91..50cc6e4 100644
--- a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala
+++ b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala
@@ -96,4 +96,3 @@ class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with ST
}
}
-
diff --git a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala
index 60f9eb4..35e0f83 100644
--- a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala
+++ b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala
@@ -9,7 +9,7 @@ import org.scalatest.Matchers
* Hooks up MultiNodeSpec with ScalaTest
*/
trait STMultiNodeSpec extends MultiNodeSpecCallbacks
- with WordSpecLike with Matchers with BeforeAndAfterAll {
+ with WordSpecLike with Matchers with BeforeAndAfterAll {
override def beforeAll() = multiNodeSpecBeforeAll()
diff --git a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala
index 829e71c..7eac75c 100644
--- a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala
+++ b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala
@@ -78,7 +78,7 @@ class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeS
shoppingCart ! new ShoppingCart.GetCart(probe.ref)
val cart = probe.expectMessageType[Cart]
cart.items.asScala.toSet should be(Set(
- new LineItem("1", "Apples", 2), new LineItem("2", "Oranges", 3)))
+ new LineItem("1", "Apples", 2), new LineItem("2", "Oranges", 3)))
}
enterBarrier("after-2")
@@ -99,7 +99,7 @@ class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeS
shoppingCart ! new ShoppingCart.GetCart(probe.ref)
val cart = probe.expectMessageType[Cart]
cart.items.asScala.toSet should be(
- Set(new LineItem("1", "Apples", 7), new LineItem("3", "Bananas", 4)))
+ Set(new LineItem("1", "Apples", 7), new LineItem("3", "Bananas", 4)))
}
enterBarrier("after-3")
@@ -108,4 +108,3 @@ class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeS
}
}
-
diff --git a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala
index 331c027..ee321e2 100644
--- a/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala
+++ b/pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala
@@ -92,9 +92,9 @@ class VotingServiceSpec extends MultiNodeSpec(VotingServiceSpec) with STMultiNod
val p = TestProbe[Votes]()
votingService ! new GetVotes(p.ref)
val votes = p.expectMessageType[Votes](3.seconds)
- votes.open should be (false)
+ votes.open should be(false)
import scala.collection.JavaConverters._
- votes.result.asScala.toMap should be (expected)
+ votes.result.asScala.toMap should be(expected)
}
enterBarrier("after-2")
@@ -102,4 +102,3 @@ class VotingServiceSpec extends MultiNodeSpec(VotingServiceSpec) with STMultiNod
}
}
-
diff --git a/pekko-sample-distributed-data-scala/build.sbt b/pekko-sample-distributed-data-scala/build.sbt
index 271b256..21eee9d 100644
--- a/pekko-sample-distributed-data-scala/build.sbt
+++ b/pekko-sample-distributed-data-scala/build.sbt
@@ -30,6 +30,5 @@ val `pekko-sample-distributed-data-scala` = project
// show full stack traces and test case durations
Test / testOptions += Tests.Argument("-oDF"),
Test / logBuffered := false,
- licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
- )
- .configs (MultiJvm)
+ licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))))
+ .configs(MultiJvm)
diff --git a/pekko-sample-distributed-data-scala/project/plugins.sbt b/pekko-sample-distributed-data-scala/project/plugins.sbt
index 3f85c76..6c6cdc7 100644
--- a/pekko-sample-distributed-data-scala/project/plugins.sbt
+++ b/pekko-sample-distributed-data-scala/project/plugins.sbt
@@ -1,2 +1 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
-
diff --git a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala
index 80926ea..c3c54e8 100644
--- a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala
+++ b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala
@@ -8,7 +8,7 @@ import org.apache.pekko.cluster.ddata.LWWMapKey
import org.apache.pekko.cluster.ddata.Replicator._
import org.apache.pekko.cluster.ddata.SelfUniqueAddress
import org.apache.pekko.cluster.ddata.typed.scaladsl.DistributedData
-import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ Update, Get }
+import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ Get, Update }
object ReplicatedCache {
sealed trait Command
@@ -17,7 +17,8 @@ object ReplicatedCache {
final case class Cached(key: String, value: Option[String])
final case class Evict(key: String) extends Command
private sealed trait InternalCommand extends Command
- private case class InternalGetResponse(key: String, replyTo: ActorRef[Cached], rsp: GetResponse[LWWMap[String, String]])
+ private case class InternalGetResponse(key: String, replyTo: ActorRef[Cached],
+ rsp: GetResponse[LWWMap[String, String]])
extends InternalCommand
private case class InternalUpdateResponse(rsp: UpdateResponse[LWWMap[String, String]]) extends InternalCommand
@@ -31,14 +32,16 @@ object ReplicatedCache {
Behaviors.receiveMessage[Command] {
case PutInCache(key, value) =>
replicator.askUpdate(
- askReplyTo => Update(dataKey(key), LWWMap.empty[String, String], WriteLocal, askReplyTo)(_ :+ (key -> value)),
+ askReplyTo =>
+ Update(dataKey(key), LWWMap.empty[String, String], WriteLocal, askReplyTo)(_ :+ (key -> value)),
InternalUpdateResponse.apply)
Behaviors.same
case Evict(key) =>
replicator.askUpdate(
- askReplyTo => Update(dataKey(key), LWWMap.empty[String, String], WriteLocal, askReplyTo)(_.remove(node, key)),
+ askReplyTo =>
+ Update(dataKey(key), LWWMap.empty[String, String], WriteLocal, askReplyTo)(_.remove(node, key)),
InternalUpdateResponse.apply)
Behaviors.same
diff --git a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala
index c8b09ad..7959da0 100644
--- a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala
+++ b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedMetrics.scala
@@ -61,7 +61,7 @@ object ReplicatedMetrics {
replicator.subscribe(UsedHeapKey, InternalSubscribeResponse.apply)
replicator.subscribe(MaxHeapKey, InternalSubscribeResponse.apply)
- val memberUpRef = context.messageAdapter(InternalClusterMemberUp.apply)
+ val memberUpRef = context.messageAdapter(InternalClusterMemberUp.apply)
val memberRemovedRef = context.messageAdapter(InternalClusterMemberRemoved.apply)
cluster.subscriptions ! Subscribe(memberUpRef, classOf[ClusterEvent.MemberUp])
cluster.subscriptions ! Subscribe(memberRemovedRef, classOf[ClusterEvent.MemberRemoved])
@@ -76,16 +76,18 @@ object ReplicatedMetrics {
val max = heap.getMax
replicator.askUpdate(
- askReplyTo => Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(_ :+ (node -> used)),
+ askReplyTo =>
+ Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(_ :+ (node -> used)),
InternalUpdateResponse.apply)
replicator.askUpdate(
- askReplyTo => Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo) { data =>
- data.get(node) match {
- case Some(`max`) => data // unchanged
- case _ => data :+ (node -> max)
- }
- },
+ askReplyTo =>
+ Update(MaxHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo) { data =>
+ data.get(node) match {
+ case Some(`max`) => data // unchanged
+ case _ => data :+ (node -> max)
+ }
+ },
InternalUpdateResponse.apply)
Behaviors.same
@@ -97,7 +99,7 @@ object ReplicatedMetrics {
case InternalSubscribeResponse(c @ Changed(UsedHeapKey)) =>
val usedHeapPercent = UsedHeap(c.get(UsedHeapKey).entries.collect {
case (key, value) if maxHeap.contains(key) =>
- (key -> (value.toDouble / maxHeap(key)) * 100.0)
+ key -> (value.toDouble / maxHeap(key)) * 100.0
})
context.log.debug2("Node {} observed:\n{}", node, usedHeapPercent)
context.system.eventStream ! EventStream.Publish(usedHeapPercent)
@@ -119,7 +121,9 @@ object ReplicatedMetrics {
case Cleanup =>
def cleanupRemoved(data: LWWMap[String, Long]): LWWMap[String, Long] =
- (data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) => d.remove(selfUniqueAddress, key) }
+ (data.entries.keySet -- nodesInCluster).foldLeft(data) { case (d, key) =>
+ d.remove(selfUniqueAddress, key)
+ }
replicator.askUpdate(
askReplyTo => Update(UsedHeapKey, LWWMap.empty[String, Long], WriteLocal, askReplyTo)(cleanupRemoved),
diff --git a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala
index ce7a6aa..f58b85e 100644
--- a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala
+++ b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala
@@ -10,7 +10,7 @@ import org.apache.pekko.cluster.ddata.ReplicatedData
import org.apache.pekko.cluster.ddata.Replicator._
import org.apache.pekko.cluster.ddata.SelfUniqueAddress
import org.apache.pekko.cluster.ddata.typed.scaladsl.DistributedData
-import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ Update, Get }
+import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ Get, Update }
object ShoppingCart {
sealed trait Command
@@ -22,9 +22,11 @@ object ShoppingCart {
final case class LineItem(productId: String, title: String, quantity: Int)
private sealed trait InternalCommand extends Command
- private case class InternalGetResponse(replyTo: ActorRef[Cart], rsp: GetResponse[LWWMap[String, LineItem]]) extends InternalCommand
+ private case class InternalGetResponse(replyTo: ActorRef[Cart], rsp: GetResponse[LWWMap[String, LineItem]])
+ extends InternalCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: UpdateResponse[A]) extends InternalCommand
- private case class InternalRemoveItem(productId: String, getResponse: GetResponse[LWWMap[String, LineItem]]) extends InternalCommand
+ private case class InternalRemoveItem(productId: String, getResponse: GetResponse[LWWMap[String, LineItem]])
+ extends InternalCommand
private val timeout = 3.seconds
private val readMajority = ReadMajority(timeout)
@@ -38,10 +40,9 @@ object ShoppingCart {
def behavior = Behaviors.receiveMessagePartial(
receiveGetCart
- .orElse(receiveAddItem)
- .orElse(receiveRemoveItem)
- .orElse(receiveOther)
- )
+ .orElse(receiveAddItem)
+ .orElse(receiveRemoveItem)
+ .orElse(receiveOther))
def receiveGetCart: PartialFunction[Command, Behavior[Command]] = {
case GetCart(replyTo) =>
@@ -73,9 +74,10 @@ object ShoppingCart {
def receiveAddItem: PartialFunction[Command, Behavior[Command]] = {
case AddItem(item) =>
replicator.askUpdate(
- askReplyTo => Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
- cart => updateCart(cart, item)
- },
+ askReplyTo =>
+ Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
+ cart => updateCart(cart, item)
+ },
InternalUpdateResponse.apply)
Behaviors.same
@@ -115,9 +117,10 @@ object ShoppingCart {
def removeItem(productId: String): Unit = {
replicator.askUpdate(
- askReplyTo => Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
- _.remove(node, productId)
- },
+ askReplyTo =>
+ Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
+ _.remove(node, productId)
+ },
InternalUpdateResponse.apply)
}
diff --git a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala
index f230fd6..c89dc72 100644
--- a/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala
+++ b/pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala
@@ -12,7 +12,7 @@ import org.apache.pekko.cluster.ddata.ReplicatedData
import org.apache.pekko.cluster.ddata.Replicator._
import org.apache.pekko.cluster.ddata.SelfUniqueAddress
import org.apache.pekko.cluster.ddata.typed.scaladsl.DistributedData
-import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ Update, Get }
+import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ Get, Update }
object VotingService {
sealed trait Command
@@ -26,12 +26,12 @@ object VotingService {
private sealed trait InternalCommand extends Command
private case class InternalSubscribeResponse(chg: SubscribeResponse[Flag]) extends InternalCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: UpdateResponse[A]) extends InternalCommand
- private case class InternalGetResponse(replyTo: ActorRef[Votes], rsp: GetResponse[PNCounterMap[String]]) extends InternalCommand
+ private case class InternalGetResponse(replyTo: ActorRef[Votes], rsp: GetResponse[PNCounterMap[String]])
+ extends InternalCommand
def apply(): Behavior[Command] = Behaviors.setup { context =>
DistributedData.withReplicatorMessageAdapter[Command, Flag] { replicatorFlag =>
DistributedData.withReplicatorMessageAdapter[Command, PNCounterMap[String]] { replicatorCounters =>
-
implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress
val OpenedKey = FlagKey("contestOpened")
@@ -59,13 +59,14 @@ object VotingService {
def becomeOpen() = {
replicatorFlag.unsubscribe(OpenedKey)
replicatorFlag.subscribe(ClosedKey, InternalSubscribeResponse.apply)
- Behaviors.receiveMessagePartial(open orElse getVotes(open = true))
+ Behaviors.receiveMessagePartial(open.orElse(getVotes(open = true)))
}
def open: PartialFunction[Command, Behavior[Command]] = {
case Vote(participant) =>
replicatorCounters.askUpdate(
- askReplyTo => Update(CountersKey, PNCounterMap[String](), WriteLocal, askReplyTo)(_.incrementBy(participant, 1)),
+ askReplyTo =>
+ Update(CountersKey, PNCounterMap[String](), WriteLocal, askReplyTo)(_.incrementBy(participant, 1)),
InternalUpdateResponse.apply)
Behaviors.same
diff --git a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala
index 251651f..a08d096 100644
--- a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala
+++ b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedCacheSpec.scala
@@ -136,4 +136,3 @@ class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMult
}
}
-
diff --git a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala
index 7889a48..006bbbd 100644
--- a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala
+++ b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ReplicatedMetricsSpec.scala
@@ -94,4 +94,3 @@ class ReplicatedMetricsSpec extends MultiNodeSpec(ReplicatedMetricsSpec) with ST
}
}
-
diff --git a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala
index 60f9eb4..35e0f83 100644
--- a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala
+++ b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/STMultiNodeSpec.scala
@@ -9,7 +9,7 @@ import org.scalatest.Matchers
* Hooks up MultiNodeSpec with ScalaTest
*/
trait STMultiNodeSpec extends MultiNodeSpecCallbacks
- with WordSpecLike with Matchers with BeforeAndAfterAll {
+ with WordSpecLike with Matchers with BeforeAndAfterAll {
override def beforeAll() = multiNodeSpecBeforeAll()
diff --git a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala
index fd8c008..c2b1a3b 100644
--- a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala
+++ b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala
@@ -105,4 +105,3 @@ class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeS
}
}
-
diff --git a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala
index 2b34072..5d8da2e 100644
--- a/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala
+++ b/pekko-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/VotingServiceSpec.scala
@@ -99,4 +99,3 @@ class VotingServiceSpec extends MultiNodeSpec(VotingServiceSpec) with STMultiNod
}
}
-
diff --git a/pekko-sample-distributed-workers-scala/src/main/scala/worker/CborSerializable.scala b/pekko-sample-distributed-workers-scala/src/main/scala/worker/CborSerializable.scala
index cc8f888..2008b7c 100644
--- a/pekko-sample-distributed-workers-scala/src/main/scala/worker/CborSerializable.scala
+++ b/pekko-sample-distributed-workers-scala/src/main/scala/worker/CborSerializable.scala
@@ -1,6 +1,6 @@
package worker
/**
- * Marker trait for serialization with Jackson CBOR
- */
+ * Marker trait for serialization with Jackson CBOR
+ */
trait CborSerializable
diff --git a/pekko-sample-distributed-workers-scala/src/main/scala/worker/Main.scala b/pekko-sample-distributed-workers-scala/src/main/scala/worker/Main.scala
index c136bb4..42a9558 100644
--- a/pekko-sample-distributed-workers-scala/src/main/scala/worker/Main.scala
+++ b/pekko-sample-distributed-workers-scala/src/main/scala/worker/Main.scala
@@ -7,7 +7,7 @@ import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.cluster.typed.Cluster
import org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko.cluster.typed.SelfUp
import org.apache.pekko.cluster.typed.Subscribe
@@ -74,8 +74,7 @@ object Main {
}
}),
"ClusterSystem",
- config(port, role)
- )
+ config(port, role))
}
def config(port: Int, role: String): Config =
@@ -85,18 +84,17 @@ object Main {
""").withFallback(ConfigFactory.load())
/**
- * To make the sample easier to run we kickstart a Apache Cassandra instance to
- * act as the journal. Apache Cassandra is a great choice of backend for Apache Pekko Persistence but
- * in a real application a pre-existing Apache Cassandra cluster should be used.
- */
+ * To make the sample easier to run we kickstart a Apache Cassandra instance to
+ * act as the journal. Apache Cassandra is a great choice of backend for Apache Pekko Persistence but
+ * in a real application a pre-existing Apache Cassandra cluster should be used.
+ */
def startCassandraDatabase(): Unit = {
val databaseDirectory = new File("target/cassandra-db")
CassandraLauncher.start(
databaseDirectory,
CassandraLauncher.DefaultTestConfigResource,
clean = false,
- port = 9042
- )
+ port = 9042)
// shut the cassandra instance down when the JVM stops
sys.addShutdownHook {
diff --git a/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkExecutor.scala b/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkExecutor.scala
index 0612820..da4168c 100644
--- a/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkExecutor.scala
+++ b/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkExecutor.scala
@@ -10,8 +10,8 @@ import worker.Worker.WorkComplete
import scala.concurrent.duration._
/**
- * Work executor is the actor actually performing the work.
- */
+ * Work executor is the actor actually performing the work.
+ */
object WorkExecutor {
case class ExecuteWork(n: Int, replyTo: ActorRef[WorkComplete])
@@ -29,8 +29,7 @@ object WorkExecutor {
ctx.scheduleOnce(
randomProcessingTime,
doWork.replyTo,
- WorkComplete(result)
- )
+ WorkComplete(result))
Behaviors.same
}
diff --git a/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkManager.scala b/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkManager.scala
index a1d1cc1..a2a07f0 100644
--- a/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkManager.scala
+++ b/pekko-sample-distributed-workers-scala/src/main/scala/worker/WorkManager.scala
@@ -1,22 +1,22 @@
package worker
import org.apache.pekko.Done
-import org.apache.pekko.actor.typed.delivery.WorkPullingProducerController.{MessageWithConfirmation, RequestNext}
-import org.apache.pekko.actor.typed.delivery.{ConsumerController, WorkPullingProducerController}
+import org.apache.pekko.actor.typed.delivery.WorkPullingProducerController.{ MessageWithConfirmation, RequestNext }
+import org.apache.pekko.actor.typed.delivery.{ ConsumerController, WorkPullingProducerController }
import org.apache.pekko.actor.typed.receptionist.ServiceKey
import org.apache.pekko.actor.typed.scaladsl.Behaviors
-import org.apache.pekko.actor.typed.{ActorRef, Behavior}
-import org.apache.pekko.persistence.typed.{PersistenceId, RecoveryCompleted}
-import org.apache.pekko.persistence.typed.scaladsl.{Effect, EventSourcedBehavior}
+import org.apache.pekko.actor.typed.{ ActorRef, Behavior }
+import org.apache.pekko.persistence.typed.{ PersistenceId, RecoveryCompleted }
+import org.apache.pekko.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import org.apache.pekko.util.Timeout
-import worker.WorkState.{WorkAccepted, WorkCompleted, WorkDomainEvent, WorkStarted, WorkInProgressReset}
+import worker.WorkState.{ WorkAccepted, WorkCompleted, WorkDomainEvent, WorkInProgressReset, WorkStarted }
-import scala.concurrent.duration.{FiniteDuration, _}
-import scala.util.{Failure, Success}
+import scala.concurrent.duration.{ FiniteDuration, _ }
+import scala.util.{ Failure, Success }
/**
- * The work manager actor keep tracks of all available workers, and all scheduled and ongoing work items
- */
+ * The work manager actor keep tracks of all available workers, and all scheduled and ongoing work items
+ */
object WorkManager {
val ManagerServiceKey = ServiceKey[ConsumerController.Command[WorkerCommand]]("worker-service-key")
@@ -42,81 +42,83 @@ object WorkManager {
def apply(workTimeout: FiniteDuration): Behavior[Command] =
Behaviors.setup { ctx =>
- implicit val timeout = Timeout(5.seconds)
- val producerController = ctx.spawn(WorkPullingProducerController[WorkerCommand]("work-manager", ManagerServiceKey, None), "producer-controller")
- val requestNextAdapter = ctx.messageAdapter(RequestNextWrapper)
- producerController ! WorkPullingProducerController.Start(requestNextAdapter)
+ implicit val timeout = Timeout(5.seconds)
+ val producerController =
+ ctx.spawn(WorkPullingProducerController[WorkerCommand]("work-manager", ManagerServiceKey, None),
+ "producer-controller")
+ val requestNextAdapter = ctx.messageAdapter(RequestNextWrapper)
+ producerController ! WorkPullingProducerController.Start(requestNextAdapter)
- var requestNext: Option[RequestNext[WorkerCommand]] = None
+ var requestNext: Option[RequestNext[WorkerCommand]] = None
- def tryStartWork(workState: WorkState): Effect[WorkDomainEvent, WorkState] = {
+ def tryStartWork(workState: WorkState): Effect[WorkDomainEvent, WorkState] = {
- if (workState.hasWork) {
- requestNext match {
- case Some(next) =>
- val work = workState.nextWork
- ctx.ask[MessageWithConfirmation[WorkerCommand], Done](next.askNextTo, done => MessageWithConfirmation(DoWork(work), done)) {
- case Success(Done) =>
- WorkIsDone(work.workId)
- case Failure(t) =>
- ctx.log.error("Work failed", t)
- WorkFailed(work.workId, t)
- }
- requestNext = None
- Effect.persist(WorkStarted(work.workId))
- case _ =>
- Effect.none
- }
- } else {
- Effect.none
+ if (workState.hasWork) {
+ requestNext match {
+ case Some(next) =>
+ val work = workState.nextWork
+ ctx.ask[MessageWithConfirmation[WorkerCommand], Done](next.askNextTo,
+ done => MessageWithConfirmation(DoWork(work), done)) {
+ case Success(Done) =>
+ WorkIsDone(work.workId)
+ case Failure(t) =>
+ ctx.log.error("Work failed", t)
+ WorkFailed(work.workId, t)
+ }
+ requestNext = None
+ Effect.persist(WorkStarted(work.workId))
+ case _ =>
+ Effect.none
}
+ } else {
+ Effect.none
}
+ }
- EventSourcedBehavior[Command, WorkDomainEvent, WorkState](
- persistenceId = PersistenceId.ofUniqueId("master"),
- emptyState = WorkState.empty,
- commandHandler = (workState, command) => {
- command match {
- case RequestNextWrapper(rn) =>
- ctx.log.info("work request: {}")
- if (requestNext.isDefined) {
- throw new IllegalStateException(s"Request next when there is already demand ${rn}, ${requestNext}")
- }
- requestNext = Some(rn)
- tryStartWork(workState)
- case TryStartWork =>
- tryStartWork(workState)
- case ResetWorkInProgress =>
- Effect.persist(WorkInProgressReset)
- case WorkIsDone(workId) =>
- Effect.persist[WorkDomainEvent, WorkState](WorkCompleted(workId)).thenRun { newState =>
- ctx.log.info("Work is done {}. New state {}", workId, newState)
- }
+ EventSourcedBehavior[Command, WorkDomainEvent, WorkState](
+ persistenceId = PersistenceId.ofUniqueId("master"),
+ emptyState = WorkState.empty,
+ commandHandler = (workState, command) => {
+ command match {
+ case RequestNextWrapper(rn) =>
+ ctx.log.info("work request: {}")
+ if (requestNext.isDefined) {
+ throw new IllegalStateException(s"Request next when there is already demand ${rn}, ${requestNext}")
+ }
+ requestNext = Some(rn)
+ tryStartWork(workState)
+ case TryStartWork =>
+ tryStartWork(workState)
+ case ResetWorkInProgress =>
+ Effect.persist(WorkInProgressReset)
+ case WorkIsDone(workId) =>
+ Effect.persist[WorkDomainEvent, WorkState](WorkCompleted(workId)).thenRun { newState =>
+ ctx.log.info("Work is done {}. New state {}", workId, newState)
+ }
- case WorkFailed(id, reason) =>
- ctx.log.info("Work failed {} {}", id, reason)
- tryStartWork(workState)
- case work: SubmitWork =>
- // idempotent
- if (workState.isAccepted(work.work.workId)) {
+ case WorkFailed(id, reason) =>
+ ctx.log.info("Work failed {} {}", id, reason)
+ tryStartWork(workState)
+ case work: SubmitWork =>
+ // idempotent
+ if (workState.isAccepted(work.work.workId)) {
+ work.replyTo ! WorkManager.Ack(work.work.workId)
+ Effect.none
+ } else {
+ ctx.log.info("Accepted work: {}", work.work.workId)
+ Effect.persist(WorkAccepted(work.work)).thenRun { _ =>
+ // Ack back to original sender
work.replyTo ! WorkManager.Ack(work.work.workId)
- Effect.none
- } else {
- ctx.log.info("Accepted work: {}", work.work.workId)
- Effect.persist(WorkAccepted(work.work)).thenRun { _ =>
- // Ack back to original sender
- work.replyTo ! WorkManager.Ack(work.work.workId)
- ctx.self ! TryStartWork
- }
+ ctx.self ! TryStartWork
}
- }
- },
- eventHandler = (workState, event) => workState.updated(event)
- ).receiveSignal {
- case (state, RecoveryCompleted) =>
- // Any in progress work from the previous incarnation is retried
- ctx.self ! ResetWorkInProgress
- }
+ }
+ }
+ },
+ eventHandler = (workState, event) => workState.updated(event)).receiveSignal {
+ case (state, RecoveryCompleted) =>
+ // Any in progress work from the previous incarnation is retried
+ ctx.self ! ResetWorkInProgress
}
+ }
}
diff --git a/pekko-sample-distributed-workers-scala/src/main/scala/worker/Worker.scala b/pekko-sample-distributed-workers-scala/src/main/scala/worker/Worker.scala
index 970bbb4..7c099cb 100644
--- a/pekko-sample-distributed-workers-scala/src/main/scala/worker/Worker.scala
+++ b/pekko-sample-distributed-workers-scala/src/main/scala/worker/Worker.scala
@@ -1,6 +1,5 @@
package worker
-
import org.apache.pekko.actor.typed._
import org.apache.pekko.actor.typed.scaladsl._
import worker.WorkExecutor.ExecuteWork
@@ -14,19 +13,22 @@ import org.apache.pekko.actor.typed.delivery.ConsumerController
object Worker {
sealed trait Message
- case class DeliveredMessage(confirmTo: ActorRef[ConsumerController.Confirmed], message: WorkManager.WorkerCommand, seqNr: Long) extends Message
+ case class DeliveredMessage(confirmTo: ActorRef[ConsumerController.Confirmed], message: WorkManager.WorkerCommand,
+ seqNr: Long) extends Message
case class WorkComplete(result: String) extends Message
case class WorkTimeout() extends Message
def apply(
workExecutorFactory: () => Behavior[ExecuteWork] = () => WorkExecutor()): Behavior[Message] =
Behaviors.setup[Message] { ctx =>
- val consumerController = ctx.spawn(ConsumerController[WorkManager.WorkerCommand](WorkManager.ManagerServiceKey), "consumer-controller")
- val deliverAdapter = ctx.messageAdapter[ConsumerController.Delivery[WorkManager.WorkerCommand]](d => DeliveredMessage(d.confirmTo, d.message, d.seqNr))
- consumerController ! ConsumerController.Start(deliverAdapter)
- Behaviors
- .supervise(new Worker(ctx, workExecutorFactory).idle())
- .onFailure[Exception](SupervisorStrategy.restart)
+ val consumerController =
+ ctx.spawn(ConsumerController[WorkManager.WorkerCommand](WorkManager.ManagerServiceKey), "consumer-controller")
+ val deliverAdapter = ctx.messageAdapter[ConsumerController.Delivery[WorkManager.WorkerCommand]](d =>
+ DeliveredMessage(d.confirmTo, d.message, d.seqNr))
+ consumerController ! ConsumerController.Start(deliverAdapter)
+ Behaviors
+ .supervise(new Worker(ctx, workExecutorFactory).idle())
+ .onFailure[Exception](SupervisorStrategy.restart)
}
}
@@ -45,35 +47,36 @@ class Worker private (
}
def idle(workExecutor: ActorRef[ExecuteWork] = createWorkExecutor()): Behavior[Worker.Message] =
- Behaviors.receiveMessagePartial[Worker.Message] {
- case DeliveredMessage(confirmTo, message, _) =>
- message match {
- case WorkManager.DoWork(w@Work(workId, job)) =>
- ctx.log.info("Got work: {}", w)
- workExecutor ! WorkExecutor.ExecuteWork(job, ctx.self)
- working(workExecutor, confirmTo)
- }
- }
+ Behaviors.receiveMessagePartial[Worker.Message] {
+ case DeliveredMessage(confirmTo, message, _) =>
+ message match {
+ case WorkManager.DoWork(w @ Work(workId, job)) =>
+ ctx.log.info("Got work: {}", w)
+ workExecutor ! WorkExecutor.ExecuteWork(job, ctx.self)
+ working(workExecutor, confirmTo)
+ }
+ }
- def working(workExecutor: ActorRef[ExecuteWork], confirmTo: ActorRef[ConsumerController.Confirmed]): Behavior[Worker.Message] =
- Behaviors
- .receiveMessagePartial[Worker.Message] {
- case Worker.WorkComplete(result) =>
- ctx.log.info("Work is complete. Result {}", result)
- confirmTo ! ConsumerController.Confirmed
- idle(workExecutor)
- case _: DeliveredMessage =>
- ctx.log.warn("Yikes. Reliable delivery told me to do work, while I'm already working.")
- Behaviors.unhandled
+ def working(workExecutor: ActorRef[ExecuteWork], confirmTo: ActorRef[ConsumerController.Confirmed])
+ : Behavior[Worker.Message] =
+ Behaviors
+ .receiveMessagePartial[Worker.Message] {
+ case Worker.WorkComplete(result) =>
+ ctx.log.info("Work is complete. Result {}", result)
+ confirmTo ! ConsumerController.Confirmed
+ idle(workExecutor)
+ case _: DeliveredMessage =>
+ ctx.log.warn("Yikes. Reliable delivery told me to do work, while I'm already working.")
+ Behaviors.unhandled
- }
- .receiveSignal {
- case (_, Terminated(_)) =>
- ctx.log.info("Work executor terminated")
- // The work is confirmed meaning it won't be re-delivered. Sending back a failure would need
- // to be done explicitly
- confirmTo ! ConsumerController.Confirmed
- idle(createWorkExecutor())
- }
+ }
+ .receiveSignal {
+ case (_, Terminated(_)) =>
+ ctx.log.info("Work executor terminated")
+ // The work is confirmed meaning it won't be re-delivered. Sending back a failure would need
+ // to be done explicitly
+ confirmTo ! ConsumerController.Confirmed
+ idle(createWorkExecutor())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org