You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:48 UTC

[incubator-pekko-samples] 03/09: WIP- async message extractor

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit fb0f3f28a5a8b5eb71658342d03dc0f77c5d78c9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Fri Feb 14 15:58:05 2020 -0500

    WIP- async message extractor
---
 akka-sample-kafka-to-sharding-scala/README.md      |  8 +--
 .../sharding/embeddedkafka/KafkaBroker.scala       |  8 +--
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 78 ++++++++++++++++++++
 .../akka/kafka/KafkaShardingMessageExtractor.scala | 75 -------------------
 .../main/scala/sample/sharding/kafka/Main.scala    | 84 ++++++++++++++--------
 .../sample/sharding/kafka/TopicListener.scala      |  3 +
 .../scala/sample/sharding/kafka/UserEvents.scala   | 48 ++++++-------
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  5 +-
 .../sample/sharding/kafka/UserGrpcService.scala    |  5 +-
 .../producer/src/main/resources/application.conf   | 11 +--
 .../kafka/producer/UserEventProducer.scala         |  7 +-
 11 files changed, 174 insertions(+), 158 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 7194fe2..4913792 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -154,16 +154,16 @@ Using Akka management we can see the shard allocations and the number of entitie
 ```
 
 // Node 1:
- curl -v localhost:8551/cluster/shards/user-processing  | jq
+ curl -v localhost:8551/cluster/shards/user-processing | jq
 
 // Node 2:
- curl -v localhost:8552/cluster/shards/user-processing  | jq
+ curl -v localhost:8552/cluster/shards/user-processing | jq
 ```
 
 We can count the number of shards on each:
 
 ```
-curl -v localhost:8551/cluster/shards/user-processing  | jq -r "." | grep shardId  | wc
+curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l
 ```
 
 The number of shards will depend on which entities have received messages.
@@ -181,7 +181,7 @@ the correct node even if that moves due to a kafka rebalance.
 A gRPC client is included which can be started with...
 
 ```
- sbt "client/run"
+sbt "client/run"
 ```
 
 It assumes there is one of the nodes running its front end port on 8081. 
diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
index 90663a3..7e5c81c 100644
--- a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
@@ -10,13 +10,13 @@ object KafkaBroker extends App with EmbeddedKafka {
   val topic = "user-events"
   val partitions = 128
 
-  val config = EmbeddedKafkaConfig(kafkaPort = port)
-  val server = EmbeddedKafka.start()(config)
+  implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = port)
+  val server = EmbeddedKafka.start()
 
   createCustomTopic(topic = topic, partitions = partitions)
 
-  log.info(s"Kafka running on port '$port'")
-  log.info(s"Topic '$topic' with '$partitions' partitions created")
+  log.info(s"Kafka running: localhost:$port")
+  log.info(s"Topic '$topic' with $partitions partitions created")
 
   server.broker.awaitShutdown()
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
new file mode 100644
index 0000000..5562c42
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -0,0 +1,78 @@
+package akka.kafka
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.scaladsl.MetadataClient
+import akka.util.Timeout._
+import org.apache.kafka.common.utils.Utils
+
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.concurrent.duration._
+
+object KafkaClusterSharding {
+  private val metadataActorCounter = new AtomicInteger
+
+  def messageExtractor[M](system: ActorSystem,
+                          groupId: String,
+                          topic: String,
+                          timeout: FiniteDuration,
+                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
+    getPartitionCount(system, topic, timeout, settings)
+      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher)
+
+  def messageExtractorNoEnvelope[M](system: ActorSystem,
+                                    groupId: String,
+                                    topic: String,
+                                    timeout: FiniteDuration,
+                                    entityIdExtractor: M => String,
+                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
+    getPartitionCount(system, topic, timeout, settings)
+      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher)
+
+  private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
+    implicit val ec: ExecutionContextExecutor = system.dispatcher
+    val actorNum = metadataActorCounter.getAndIncrement()
+    val consumerActor = system
+      .asInstanceOf[ExtendedActorSystem]
+      .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
+    val metadataClient = MetadataClient.create(consumerActor, timeout)
+    val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
+    numPartitions.map { count =>
+      system.log.info("Retrieved {} partitions for topic '{}'", count, topic)
+      count
+    }
+  }
+}
+
+trait KafkaClusterSharding {
+  def groupId: String
+  def kafkaPartitions: Int
+
+  def shardId(entityId: String): String = {
+    // simplified version of Kafka's `DefaultPartitioner` implementation
+    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+    s"$groupId-$partition"
+  }
+}
+
+class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int)
+  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
+  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+}
+
+/**
+ * Caveats
+ * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
+ * - Values are passed as `null` to the partitioner.
+ * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
+ *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
+ *   partitioner doesn't make use of any other Kafka Cluster metadata.
+ */
+class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String)
+  extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
+  override def entityId(message: M): String = entityIdExtractor(message)
+  override def unwrapMessage(message: M): M = message
+}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
deleted file mode 100644
index 14353ab..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package akka.kafka
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
-import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy
-import akka.kafka.scaladsl.MetadataClient
-import akka.util.Timeout._
-import org.apache.kafka.common.utils.Utils
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
-
-object DefaultKafkaShardingMessageExtractor {
-  sealed trait PartitionCountStrategy {
-    def groupId: String
-    def partitions: Int
-  }
-  final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy
-  final case class RetrieveFromKafka(
-                                      system: ActorSystem,
-                                      timeout: FiniteDuration,
-                                      groupId: String,
-                                      topic: String,
-                                      settings: ConsumerSettings[_,_])
-                                    extends PartitionCountStrategy {
-    import RetrieveFromKafka._
-    private implicit val ec: ExecutionContext = system.dispatcher
-    lazy val partitions: Int = {
-      val actorNum = metadataActorCounter.getAndIncrement()
-      val consumerActor = system
-        .asInstanceOf[ExtendedActorSystem]
-        .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
-      val metadataClient = MetadataClient.create(consumerActor, timeout)
-      val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
-      numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId))
-      Await.result(numPartitions, timeout)
-    }
-  }
-  object RetrieveFromKafka {
-    private val metadataActorCounter = new AtomicInteger
-  }
-}
-
-private[kafka] trait DefaultKafkaShardingMessageExtractor {
-  val strategy: PartitionCountStrategy
-  private val groupId: String = strategy.groupId
-  private val kafkaPartitions: Int = strategy.partitions
-
-  def shardId(entityId: String): String = {
-    // simplified version of Kafka's `DefaultPartitioner` implementation
-    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-    s"$groupId-$partition"
-  }
-}
-
-final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy)
-  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
-  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-}
-
-/**
- * Caveats
- * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
- * - Values are passed as `null` to the partitioner.
- * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
- *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
- *   partitioner doesn't make use of any other Kafka Cluster metadata.
- */
-abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy)
-  extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
-  override def unwrapMessage(message: M): M = message
-}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 96fce26..a695853 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -1,21 +1,24 @@
 package sample.sharding.kafka
 
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Terminated
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Terminated}
 import akka.cluster.ClusterEvent.MemberUp
-import akka.cluster.typed.Cluster
-import akka.cluster.typed.Subscribe
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.cluster.typed.{Cluster, Subscribe}
 import akka.http.scaladsl._
-import akka.http.scaladsl.model.HttpRequest
-import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
 import akka.management.scaladsl.AkkaManagement
 import akka.stream.Materializer
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
 
 import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+sealed trait Command
+case object NodeMemberUp extends Command
+case object StartProcessor extends Command
+final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
 
 object Main {
   def main(args: Array[String]): Unit = {
@@ -25,45 +28,69 @@ object Main {
     args.toList match {
       case portString :: managementPort :: frontEndPort :: Nil
           if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) =>
-        startNode(portString.toInt, managementPort.toInt, frontEndPort.toInt)
+        init(portString.toInt, managementPort.toInt, frontEndPort.toInt)
       case _ =>
         throw new IllegalArgumentException("usage: <remotingPort> <managementPort> <frontEndPort>")
     }
   }
 
-  def startNode(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
-    ActorSystem(Behaviors.setup[MemberUp] {
+  def init(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
+    ActorSystem(Behaviors.setup[Command] {
       ctx =>
-        implicit val mat = Materializer.createMaterializer(ctx.system.toClassic)
-        implicit val ec = ctx.executionContext
         AkkaManagement(ctx.system.toClassic).start()
-        // maybe don't start until part of the cluster, or add health check
-        val binding = startGrpc(ctx.system, mat, frontEndPort)
+
         val cluster = Cluster(ctx.system)
-        cluster.subscriptions.tell(Subscribe(ctx.self, classOf[MemberUp]))
+        val subscriber = ctx.spawn(clusterUpSubscriber(cluster, ctx.self), "cluster-subscriber")
+        cluster.subscriptions.tell(Subscribe(subscriber, classOf[MemberUp]))
+
+        ctx.pipeToSelf(UserEvents.messageExtractor(ctx.system)) {
+          case Success(extractor) => MessageExtractor(extractor)
+          case Failure(ex) => throw new Exception(ex)
+        }
+
+        starting()
+    }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+
+    def starting(extractor: Option[ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]] = None): Behavior[Command] = Behaviors
+      .receive[Command] {
+        case (ctx, MessageExtractor(extractor)) =>
+          ctx.self.tell(StartProcessor)
+          starting(Some(extractor))
+        case (ctx, StartProcessor) if extractor.isDefined =>
+          UserEvents.init(ctx.system, extractor.get)
+          val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor")
+          ctx.watch(eventProcessor)
+          ctx.log.info("Processor started.")
+          val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get)
+          running(binding, eventProcessor)
+      }
+
+    def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors
+      .receiveSignal {
+        case (ctx, Terminated(`processor`)) =>
+          ctx.log.warn("Kafka event processor stopped. Shutting down")
+          binding.map(_.unbind())(ctx.executionContext)
+          Behaviors.stopped
+      }
+
+    def clusterUpSubscriber(cluster: Cluster, parent: ActorRef[Command]): Behavior[MemberUp] = Behaviors.setup[MemberUp] {
+      ctx =>
         Behaviors
           .receiveMessage[MemberUp] {
             case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress =>
               ctx.log.info("Joined the cluster. Starting sharding and kafka processor")
-              UserEvents.init(ctx.system)
-              val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor")
-              ctx.watch(eventProcessor)
+              parent.tell(StartProcessor)
               Behaviors.same
             case MemberUp(member) =>
               ctx.log.info("Member up {}", member)
               Behaviors.same
           }
-          .receiveSignal {
-            case (ctx, Terminated(_)) =>
-              ctx.log.warn("Kafka event processor stopped. Shutting down")
-              binding.map(_.unbind())
-              Behaviors.stopped
-          }
-    }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+    }
 
-    def startGrpc(system: ActorSystem[_], mat: Materializer, frontEndPort: Int): Future[Http.ServerBinding] = {
+    def startGrpc(system: ActorSystem[_], frontEndPort: Int, extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Future[Http.ServerBinding] = {
+      val mat = Materializer.createMaterializer(system.toClassic)
       val service: HttpRequest => Future[HttpResponse] =
-        UserServiceHandler(new UserGrpcService(system))(mat, system.toClassic)
+        UserServiceHandler(new UserGrpcService(system, extractor))(mat, system.toClassic)
       Http()(system.toClassic).bindAndHandleAsync(
         service,
         interface = "127.0.0.1",
@@ -79,5 +106,4 @@ object Main {
        """).withFallback(ConfigFactory.load())
 
   }
-
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
index 79be5e3..3b59790 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
@@ -29,6 +29,9 @@ object TopicListener {
       val address = Cluster(ctx.system).selfMember.address
       Behaviors.receiveMessage[ConsumerRebalanceEvent] {
         case TopicPartitionsAssigned(sub, partitions) =>
+          // TODO
+          // - log all partitions assigned in one log line
+          // - block for shard allocation to complete, add configurable timeout
           partitions.foreach(tp => {
             val shardId = s"$groupId-${tp.partition()}"
             ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 5cf3321..69d815d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -5,12 +5,12 @@ import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.ClusterShardingSettings
+import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor}
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
-import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka}
-import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor}
 import org.apache.kafka.common.serialization.StringDeserializer
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
 object UserEvents {
@@ -47,7 +47,8 @@ object UserEvents {
             runningTotal.copy(
               totalPurchases = runningTotal.totalPurchases + 1,
               amountSpent = runningTotal.amountSpent + (quantity * price)))
-        case GetRunningTotal(_, replyTo) =>
+        case GetRunningTotal(id, replyTo) =>
+          ctx.log.info("user {} running total queried", id)
           replyTo ! runningTotal
           Behaviors.same
       }
@@ -55,35 +56,30 @@ object UserEvents {
   }
 
   /**
-   * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for
-   * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
-   * (murmur2) with Akka Cluster Sharding.
+   * Asynchronously get the Akka Cluster Sharding [[ShardingMessageExtractor]]. Given a topic we can automatically
+   * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding.
    */
-  class UserIdMessageExtractor(strategy: PartitionCountStrategy)
-      extends KafkaShardingNoEnvelopeExtractor[Message](strategy) {
-    def entityId(message: Message): String = message.userId
-  }
-
-  def init(system: ActorSystem[_]): ActorRef[Message] = {
+  def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
-    val messageExtractor = new UserIdMessageExtractor(
-      strategy = RetrieveFromKafka(
-        system = system.toClassic,
-        timeout = 10.seconds,
-        groupId = processorConfig.groupId,
-        topic = processorConfig.topics.head,
-        settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
-          .withBootstrapServers(processorConfig.bootstrapServers)
-      )
+    KafkaClusterSharding.messageExtractorNoEnvelope(
+      system = system.toClassic,
+      timeout = 10.seconds,
+      groupId = processorConfig.groupId,
+      topic = processorConfig.topics.head,
+      entityIdExtractor = (msg: Message) => msg.userId,
+      settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
+        .withBootstrapServers(processorConfig.bootstrapServers)
     )
+  }
+
+  def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] =
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
         .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))
-  }
 
-  def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = {
-    init(system).narrow[UserQuery]
-  }
+  def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] =
+    init(system, messageExtractor).narrow[UserQuery]
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 511c3bc..155a51c 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -8,6 +8,7 @@ import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
+import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.kafka.ConsumerSettings
 import akka.kafka.Subscriptions
 import akka.kafka.scaladsl.Consumer
@@ -30,7 +31,7 @@ object UserEventsKafkaProcessor {
   sealed trait Command
   private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
 
-  def apply(): Behavior[Nothing] = {
+  def apply(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Behavior[Nothing] = {
     Behaviors
       .setup[Command] { ctx =>
         val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor"))
@@ -40,7 +41,7 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
-        val shardRegion = UserEvents.init(ctx.system)
+        val shardRegion = UserEvents.init(ctx.system, extractor)
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index adfaf67..f3acb7c 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -3,6 +3,7 @@ package sample.sharding.kafka
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
 import akka.actor.typed.scaladsl.AskPattern._
+import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.util.Timeout
 import sample.sharding.kafka.UserEvents.GetRunningTotal
 import sample.sharding.kafka.UserEvents.RunningTotal
@@ -10,13 +11,13 @@ import sample.sharding.kafka.UserEvents.RunningTotal
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
-class UserGrpcService(system: ActorSystem[_]) extends UserService {
+class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends UserService {
 
   implicit val timeout = Timeout(5.seconds)
   implicit val sched = system.scheduler
   implicit val ec = system.executionContext
 
-  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system)
+  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor)
 
   override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
     shardRegion
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
index 38d51c0..0c08ab1 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
@@ -1,13 +1,4 @@
 kafka-to-sharding-producer {
-  bootstrap-servers = "localhost:9094"
+  bootstrap-servers = "localhost:9092"
   topic = "user-events"
-
-  # can be one of:
-  # - default: to put the entity id in the key and use the default kafka partitioner
-  # - explicit: to specify the partition explicitly, based on the entity id
-  # if you have control of both the producer and consumer it is better to be explicit to make sure
-  # that both sides align
-  partitioning = "explicit"
-
-  nr-partitions = 128
 }
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index 846dee7..a20418c 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -1,7 +1,5 @@
 package sharding.kafka.producer
 
-import java.nio.charset.StandardCharsets
-
 import akka.Done
 import akka.actor.ActorSystem
 import akka.event.Logging
@@ -10,9 +8,7 @@ import akka.kafka.scaladsl.Producer
 import akka.stream.scaladsl.Source
 import com.typesafe.config.ConfigFactory
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import sample.sharding.kafka.serialization.user_events.UserPurchaseProto
 
 import scala.concurrent.Future
@@ -55,7 +51,6 @@ object UserEventProducer extends App {
         // rely on the default kafka partitioner to hash the key and distribute among shards
         // the logic of the default partitioner must be replicated in MessageExtractor entityId -> shardId function
         new ProducerRecord[String, Array[Byte]](producerConfig.topic, randomEntityId, message)
-
       })
       .runWith(Producer.plainSink(producerSettings))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org