You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:37:01 UTC

[incubator-pekko-samples] 01/02: WIP

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch minor-kafka-updaes
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 131b8bb56e98fc2ce66e47d5410c8f24d501a8a4
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Mon Mar 16 15:36:26 2020 +0000

    WIP
---
 ...ble.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 0 -> 18702 bytes
 ...ble.Vector.nullSlotAndCopy(Object[][],_int).cfg |   5 +++
 ...ctorPointer.gotoPosWritable1(int,_int,_int).bgv | Bin 0 -> 9978 bytes
 ...ctorPointer.gotoPosWritable1(int,_int,_int).cfg |   5 +++
 ...ollection.immutable.Vector.appended(Object).bgv | Bin 0 -> 251627 bytes
 ...ollection.immutable.Vector.appended(Object).cfg |   5 +++
 ...ble.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 0 -> 18700 bytes
 ...ble.Vector.nullSlotAndCopy(Object[][],_int).cfg |   5 +++
 .../processor/src/main/resources/application.conf  |   5 ++-
 .../main/scala/sample/sharding/kafka/Main.scala    |  14 ++++---
 .../sample/sharding/kafka/ProcessorSettings.scala  |   2 +-
 .../scala/sample/sharding/kafka/UserEvents.scala   |  45 ++++++++-------------
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |   8 ++--
 .../sample/sharding/kafka/UserGrpcService.scala    |   4 +-
 14 files changed, 53 insertions(+), 45 deletions(-)

diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
new file mode 100644
index 0000000..317ce7a
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
new file mode 100644
index 0000000..9a4840a
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+  name " HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+  method "HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+  date 1583857594397
+end_compilation
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv
new file mode 100644
index 0000000..190c24c
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
new file mode 100644
index 0000000..b8126a4
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+  name " HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]"
+  method "HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]"
+  date 1583859506333
+end_compilation
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv
new file mode 100644
index 0000000..0bc4add
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
new file mode 100644
index 0000000..67a37d2
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+  name " HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]"
+  method "HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]"
+  date 1583857658632
+end_compilation
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
new file mode 100644
index 0000000..259b2be
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
new file mode 100644
index 0000000..9c7e37a
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+  name " HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+  method "HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+  date 1583857594379
+end_compilation
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index 306e957..6f5594d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -26,8 +26,9 @@ akka {
 
   cluster {
     seed-nodes = [
-      "akka://KafkaToSharding@127.0.0.1:2551",
-      "akka://KafkaToSharding@127.0.0.1:2552"]
+      "akka://KafkaToSharding@127.0.0.1:2551"
+      "akka://KafkaToSharding@127.0.0.1:2552"
+    ]
 
     sharding {
       retry-interval = 200ms
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 301bd8a..283c9d1 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -9,14 +9,13 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
 import akka.management.scaladsl.AkkaManagement
 import akka.stream.Materializer
 import com.typesafe.config.{Config, ConfigFactory}
-import sample.sharding.kafka.UserEvents.Message
 
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 
 sealed trait Command
 case object NodeMemberUp extends Command
-final case class ShardingStarted(region: ActorRef[Message]) extends Command
+final case class ShardingStarted(region: ActorRef[UserEvents.Command]) extends Command
 
 object Main {
   def main(args: Array[String]): Unit = {
@@ -24,6 +23,9 @@ object Main {
     def isInt(s: String): Boolean = s.matches("""\d+""")
 
     args.toList match {
+      case single :: Nil if isInt(single) =>
+        val nr = single.toInt
+        init(2550 + nr, 8550 + nr, 8080 + nr)
       case portString :: managementPort :: frontEndPort :: Nil
           if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) =>
         init(portString.toInt, managementPort.toInt, frontEndPort.toInt)
@@ -47,14 +49,14 @@ object Main {
         starting(ctx, None, joinedCluster = false, settings)
     }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
 
-    def start(ctx: ActorContext[Command], region: ActorRef[Message],  settings: ProcessorSettings): Behavior[Command] = {
-      ctx.log.info("Sharding started and joine cluster. Starting event processor")
+    def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings): Behavior[Command] = {
+      ctx.log.info("Sharding started and joined cluster. Starting event processor")
       val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, settings), "kafka-event-processor")
       val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, region)
       running(binding, eventProcessor)
     }
 
-    def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[Message]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors
+    def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors
       .receive[Command] {
         case (ctx, ShardingStarted(region)) if joinedCluster =>
           ctx.log.info("Sharding has started")
@@ -79,7 +81,7 @@ object Main {
       }
 
 
-    def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[Message]): Future[Http.ServerBinding] = {
+    def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = {
       val mat = Materializer.createMaterializer(system.toClassic)
       val service: HttpRequest => Future[HttpResponse] =
         UserServiceHandler(new UserGrpcService(system, region))(mat, system.toClassic)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
index af569c1..a47bc75 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
@@ -36,5 +36,5 @@ final class ProcessorSettings(val bootstrapServers: String, val topics: List[Str
    * By using the same consumer group id as our entity type key name we can setup multiple consumer groups and connect
    * each with a different sharded entity coordinator.
    */
-  val entityTypeKey: EntityTypeKey[UserEvents.Message] = EntityTypeKey(groupId)
+  val entityTypeKey: EntityTypeKey[UserEvents.Command] = EntityTypeKey(groupId)
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 9c94230..2865705 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -4,7 +4,6 @@ import akka.Done
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.ClusterShardingSettings
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity}
 import akka.kafka.cluster.sharding.KafkaClusterSharding
 
@@ -12,39 +11,36 @@ import scala.concurrent.Future
 import scala.concurrent.duration._
 
 object UserEvents {
-  def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Message]] = {
+  def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Command]] = {
     import system.executionContext
-    messageExtractor(settings).map(messageExtractor => {
+    KafkaClusterSharding(settings.system).messageExtractorNoEnvelope(
+      timeout = 10.seconds,
+      topic = settings.topics.head,
+      entityIdExtractor = (msg: Command) => msg.userId,
+      settings = settings.kafkaConsumerSettings()
+    ).map(messageExtractor => {
       system.log.info("Message extractor created. Initializing sharding")
       ClusterSharding(system).init(
         Entity(settings.entityTypeKey)(createBehavior = _ => UserEvents())
           .withAllocationStrategy(new ExternalShardAllocationStrategy(system, settings.entityTypeKey.name))
-          .withMessageExtractor(messageExtractor)
-          .withSettings(ClusterShardingSettings(system)))
+          .withMessageExtractor(messageExtractor))
     })
   }
 
-  sealed trait Message extends CborSerializable {
+  sealed trait Command extends CborSerializable {
     def userId: String
   }
-  sealed trait UserEvent extends Message
-  case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent
-  case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done])
-      extends UserEvent
+  final case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) extends Command
+  final case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends Command
 
-  sealed trait UserQuery extends Message
-  case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery
-  case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
+  // state
+  final case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
 
-  def apply(): Behavior[Message] = running(RunningTotal(0, 0))
+  def apply(): Behavior[Command] = running(RunningTotal(0, 0))
 
-  private def running(runningTotal: RunningTotal): Behavior[Message] = {
+  private def running(runningTotal: RunningTotal): Behavior[Command] = {
     Behaviors.setup { ctx =>
-      Behaviors.receiveMessage[Message] {
-        case UserAction(_, desc, ack) =>
-          ctx.log.info("user event {}", desc)
-          ack.tell(Done)
-          Behaviors.same
+      Behaviors.receiveMessage[Command] {
         case UserPurchase(id, product, quantity, price, ack) =>
           ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
           ack.tell(Done)
@@ -59,13 +55,4 @@ object UserEvents {
       }
     }
   }
-
-  private def messageExtractor(settings: ProcessorSettings): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
-    KafkaClusterSharding(settings.system).messageExtractorNoEnvelope(
-      timeout = 10.seconds,
-      topic = settings.topics.head,
-      entityIdExtractor = (msg: Message) => msg.userId,
-      settings = settings.kafkaConsumerSettings()
-    )
-  }
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 39c1c5b..9ad8305 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -10,7 +10,6 @@ import akka.kafka.cluster.sharding.KafkaClusterSharding
 import akka.kafka.scaladsl.{Committer, Consumer}
 import akka.kafka.{CommitterSettings, Subscriptions}
 import akka.pattern.retry
-import sample.sharding.kafka.UserEvents.Message
 import sample.sharding.kafka.serialization.UserPurchaseProto
 
 import scala.concurrent.duration._
@@ -23,22 +22,21 @@ object UserEventsKafkaProcessor {
 
   private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
 
-  def apply(shardRegion: ActorRef[Message], processorSettings: ProcessorSettings): Behavior[Nothing] = {
+  def apply(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings): Behavior[Nothing] = {
     Behaviors
       .setup[Command] { ctx =>
         implicit val classic: ActorSystem = ctx.system.toClassic
         implicit val ec: ExecutionContextExecutor = ctx.executionContext
         implicit val scheduler: Scheduler = classic.scheduler
 
-        val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(ctx.system, processorSettings.entityTypeKey)
+        val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(processorSettings.entityTypeKey)
 
         val subscription = Subscriptions
           .topics(processorSettings.topics: _*)
-          // convert rebalance listener to classic ActorRef
           .withRebalanceListener(rebalanceListener.toClassic)
 
         val stream: Future[Done] = Consumer.sourceWithOffsetContext(processorSettings.kafkaConsumerSettings(), subscription)
-          // MapAsync and Retries will be replaced by reliable delivery in the next 2.6 version
+          // MapAsync and Retries can be replaced by reliable delivery
           .mapAsync(20) { record =>
             ctx.log.info(s"user id consumed kafka partition ${record.key()}->${record.partition()}")
             retry(() =>
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index ebce25d..2e947c6 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -3,12 +3,12 @@ package sample.sharding.kafka
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
 import akka.util.Timeout
-import sample.sharding.kafka.UserEvents.{GetRunningTotal, Message, RunningTotal}
+import sample.sharding.kafka.UserEvents.{GetRunningTotal, Command, RunningTotal}
 
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContextExecutor, Future}
 
-class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Message]) extends UserService {
+class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Command]) extends UserService {
 
   implicit val timeout: Timeout = Timeout(5.seconds)
   implicit val sched: Scheduler = system.scheduler


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org