You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:37:01 UTC
[incubator-pekko-samples] 01/02: WIP
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch minor-kafka-updaes
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 131b8bb56e98fc2ce66e47d5410c8f24d501a8a4
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Mon Mar 16 15:36:26 2020 +0000
WIP
---
...ble.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 0 -> 18702 bytes
...ble.Vector.nullSlotAndCopy(Object[][],_int).cfg | 5 +++
...ctorPointer.gotoPosWritable1(int,_int,_int).bgv | Bin 0 -> 9978 bytes
...ctorPointer.gotoPosWritable1(int,_int,_int).cfg | 5 +++
...ollection.immutable.Vector.appended(Object).bgv | Bin 0 -> 251627 bytes
...ollection.immutable.Vector.appended(Object).cfg | 5 +++
...ble.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 0 -> 18700 bytes
...ble.Vector.nullSlotAndCopy(Object[][],_int).cfg | 5 +++
.../processor/src/main/resources/application.conf | 5 ++-
.../main/scala/sample/sharding/kafka/Main.scala | 14 ++++---
.../sample/sharding/kafka/ProcessorSettings.scala | 2 +-
.../scala/sample/sharding/kafka/UserEvents.scala | 45 ++++++++-------------
.../sharding/kafka/UserEventsKafkaProcessor.scala | 8 ++--
.../sample/sharding/kafka/UserGrpcService.scala | 4 +-
14 files changed, 53 insertions(+), 45 deletions(-)
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
new file mode 100644
index 0000000..317ce7a
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
new file mode 100644
index 0000000..9a4840a
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+ name " HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+ method "HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+ date 1583857594397
+end_compilation
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv
new file mode 100644
index 0000000..190c24c
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
new file mode 100644
index 0000000..b8126a4
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+ name " HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]"
+ method "HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]"
+ date 1583859506333
+end_compilation
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv
new file mode 100644
index 0000000..0bc4add
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
new file mode 100644
index 0000000..67a37d2
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+ name " HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]"
+ method "HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]"
+ date 1583857658632
+end_compilation
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv
new file mode 100644
index 0000000..259b2be
Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv differ
diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
new file mode 100644
index 0000000..9c7e37a
--- /dev/null
+++ b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg
@@ -0,0 +1,5 @@
+begin_compilation
+ name " HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+ method "HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]"
+ date 1583857594379
+end_compilation
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index 306e957..6f5594d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -26,8 +26,9 @@ akka {
cluster {
seed-nodes = [
- "akka://KafkaToSharding@127.0.0.1:2551",
- "akka://KafkaToSharding@127.0.0.1:2552"]
+ "akka://KafkaToSharding@127.0.0.1:2551"
+ "akka://KafkaToSharding@127.0.0.1:2552"
+ ]
sharding {
retry-interval = 200ms
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 301bd8a..283c9d1 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -9,14 +9,13 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.management.scaladsl.AkkaManagement
import akka.stream.Materializer
import com.typesafe.config.{Config, ConfigFactory}
-import sample.sharding.kafka.UserEvents.Message
import scala.concurrent.Future
import scala.util.{Failure, Success}
sealed trait Command
case object NodeMemberUp extends Command
-final case class ShardingStarted(region: ActorRef[Message]) extends Command
+final case class ShardingStarted(region: ActorRef[UserEvents.Command]) extends Command
object Main {
def main(args: Array[String]): Unit = {
@@ -24,6 +23,9 @@ object Main {
def isInt(s: String): Boolean = s.matches("""\d+""")
args.toList match {
+ case single :: Nil if isInt(single) =>
+ val nr = single.toInt
+ init(2550 + nr, 8550 + nr, 8080 + nr)
case portString :: managementPort :: frontEndPort :: Nil
if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) =>
init(portString.toInt, managementPort.toInt, frontEndPort.toInt)
@@ -47,14 +49,14 @@ object Main {
starting(ctx, None, joinedCluster = false, settings)
}, "KafkaToSharding", config(remotingPort, akkaManagementPort))
- def start(ctx: ActorContext[Command], region: ActorRef[Message], settings: ProcessorSettings): Behavior[Command] = {
- ctx.log.info("Sharding started and joine cluster. Starting event processor")
+ def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings): Behavior[Command] = {
+ ctx.log.info("Sharding started and joined cluster. Starting event processor")
val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, settings), "kafka-event-processor")
val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, region)
running(binding, eventProcessor)
}
- def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[Message]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors
+ def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors
.receive[Command] {
case (ctx, ShardingStarted(region)) if joinedCluster =>
ctx.log.info("Sharding has started")
@@ -79,7 +81,7 @@ object Main {
}
- def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[Message]): Future[Http.ServerBinding] = {
+ def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = {
val mat = Materializer.createMaterializer(system.toClassic)
val service: HttpRequest => Future[HttpResponse] =
UserServiceHandler(new UserGrpcService(system, region))(mat, system.toClassic)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
index af569c1..a47bc75 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala
@@ -36,5 +36,5 @@ final class ProcessorSettings(val bootstrapServers: String, val topics: List[Str
* By using the same consumer group id as our entity type key name we can setup multiple consumer groups and connect
* each with a different sharded entity coordinator.
*/
- val entityTypeKey: EntityTypeKey[UserEvents.Message] = EntityTypeKey(groupId)
+ val entityTypeKey: EntityTypeKey[UserEvents.Command] = EntityTypeKey(groupId)
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 9c94230..2865705 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -4,7 +4,6 @@ import akka.Done
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity}
import akka.kafka.cluster.sharding.KafkaClusterSharding
@@ -12,39 +11,36 @@ import scala.concurrent.Future
import scala.concurrent.duration._
object UserEvents {
- def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Message]] = {
+ def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Command]] = {
import system.executionContext
- messageExtractor(settings).map(messageExtractor => {
+ KafkaClusterSharding(settings.system).messageExtractorNoEnvelope(
+ timeout = 10.seconds,
+ topic = settings.topics.head,
+ entityIdExtractor = (msg: Command) => msg.userId,
+ settings = settings.kafkaConsumerSettings()
+ ).map(messageExtractor => {
system.log.info("Message extractor created. Initializing sharding")
ClusterSharding(system).init(
Entity(settings.entityTypeKey)(createBehavior = _ => UserEvents())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, settings.entityTypeKey.name))
- .withMessageExtractor(messageExtractor)
- .withSettings(ClusterShardingSettings(system)))
+ .withMessageExtractor(messageExtractor))
})
}
- sealed trait Message extends CborSerializable {
+ sealed trait Command extends CborSerializable {
def userId: String
}
- sealed trait UserEvent extends Message
- case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent
- case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done])
- extends UserEvent
+ final case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) extends Command
+ final case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends Command
- sealed trait UserQuery extends Message
- case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery
- case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
+ // state
+ final case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
- def apply(): Behavior[Message] = running(RunningTotal(0, 0))
+ def apply(): Behavior[Command] = running(RunningTotal(0, 0))
- private def running(runningTotal: RunningTotal): Behavior[Message] = {
+ private def running(runningTotal: RunningTotal): Behavior[Command] = {
Behaviors.setup { ctx =>
- Behaviors.receiveMessage[Message] {
- case UserAction(_, desc, ack) =>
- ctx.log.info("user event {}", desc)
- ack.tell(Done)
- Behaviors.same
+ Behaviors.receiveMessage[Command] {
case UserPurchase(id, product, quantity, price, ack) =>
ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
ack.tell(Done)
@@ -59,13 +55,4 @@ object UserEvents {
}
}
}
-
- private def messageExtractor(settings: ProcessorSettings): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
- KafkaClusterSharding(settings.system).messageExtractorNoEnvelope(
- timeout = 10.seconds,
- topic = settings.topics.head,
- entityIdExtractor = (msg: Message) => msg.userId,
- settings = settings.kafkaConsumerSettings()
- )
- }
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 39c1c5b..9ad8305 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -10,7 +10,6 @@ import akka.kafka.cluster.sharding.KafkaClusterSharding
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka.{CommitterSettings, Subscriptions}
import akka.pattern.retry
-import sample.sharding.kafka.UserEvents.Message
import sample.sharding.kafka.serialization.UserPurchaseProto
import scala.concurrent.duration._
@@ -23,22 +22,21 @@ object UserEventsKafkaProcessor {
private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
- def apply(shardRegion: ActorRef[Message], processorSettings: ProcessorSettings): Behavior[Nothing] = {
+ def apply(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings): Behavior[Nothing] = {
Behaviors
.setup[Command] { ctx =>
implicit val classic: ActorSystem = ctx.system.toClassic
implicit val ec: ExecutionContextExecutor = ctx.executionContext
implicit val scheduler: Scheduler = classic.scheduler
- val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(ctx.system, processorSettings.entityTypeKey)
+ val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(processorSettings.entityTypeKey)
val subscription = Subscriptions
.topics(processorSettings.topics: _*)
- // convert rebalance listener to classic ActorRef
.withRebalanceListener(rebalanceListener.toClassic)
val stream: Future[Done] = Consumer.sourceWithOffsetContext(processorSettings.kafkaConsumerSettings(), subscription)
- // MapAsync and Retries will be replaced by reliable delivery in the next 2.6 version
+ // MapAsync and Retries can be replaced by reliable delivery
.mapAsync(20) { record =>
ctx.log.info(s"user id consumed kafka partition ${record.key()}->${record.partition()}")
retry(() =>
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index ebce25d..2e947c6 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -3,12 +3,12 @@ package sample.sharding.kafka
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
import akka.util.Timeout
-import sample.sharding.kafka.UserEvents.{GetRunningTotal, Message, RunningTotal}
+import sample.sharding.kafka.UserEvents.{GetRunningTotal, Command, RunningTotal}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
-class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Message]) extends UserService {
+class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Command]) extends UserService {
implicit val timeout: Timeout = Timeout(5.seconds)
implicit val sched: Scheduler = system.scheduler
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org