You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:45 UTC

[incubator-pekko-samples] branch wip-seglo-kafka-sharding created (now 1f23cf1)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


      at 1f23cf1  Reference alpakka-kafka-cluster-sharding module

This branch includes the following new commits:

     new 34595d0  WIP
     new 5c118c6  WIP- use systemActorOf to create metadata client
     new fb0f3f2  WIP- async message extractor
     new f53f3b0  RebalanceListener
     new 687eb63  Typos
     new 47869c4  Return rebalance listener as classic ActorRef
     new 7aa68a2  Create as extension
     new 27b6c80  Use Alpakka Kafka snapshot
     new 1f23cf1  Reference alpakka-kafka-cluster-sharding module

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 06/09: Return rebalance listener as classic ActorRef

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 47869c4c264a033049fd9833ea57ffceb5ba1bf0
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Feb 19 14:58:44 2020 -0500

    Return rebalance listener as classic ActorRef
---
 akka-sample-kafka-to-sharding-scala/README.md      |  12 ++-
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 106 ++++++++++++---------
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  27 +++---
 3 files changed, 81 insertions(+), 64 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index fbbaf7d..20ccf8b 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -152,18 +152,20 @@ Each forwarding messaging is followed by log for the same entity on the current
 Using Akka management we can see the shard allocations and the number of entities per shard (uses `curl` and `jq`):
 
 ```
+# Node 1:
+curl -v localhost:8551/cluster/shards/user-processing | jq
 
-// Node 1:
- curl -v localhost:8551/cluster/shards/user-processing | jq
-
-// Node 2:
- curl -v localhost:8552/cluster/shards/user-processing | jq
+# Node 2:
+curl -v localhost:8552/cluster/shards/user-processing | jq
 ```
 
 We can count the number of shards on each:
 
 ```
+# Node 1:
 curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l
+# Node 2:
+curl -s localhost:8552/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l
 ```
 
 The number of shards will depend on which entities have received messages.
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 5bbd885..2fbaa7f 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -3,8 +3,10 @@ package akka.kafka
 import java.util.concurrent.atomic.AtomicInteger
 
 import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.adapter._
 import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.annotation.{ApiMayChange, InternalApi}
 import akka.cluster.sharding.external.ExternalShardAllocation
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
@@ -24,6 +26,8 @@ object KafkaClusterSharding {
   private val metadataActorCounter = new AtomicInteger
 
   /**
+   * API MAY CHANGE
+   *
    * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
    * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
    *
@@ -31,9 +35,10 @@ object KafkaClusterSharding {
    * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
    * the Kafka Consumer connection required to retrieve the number of partitions.
    *
-   * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
    * that entities are routed to the same Entity type.
    */
+  @ApiMayChange
   def messageExtractor[M](system: ActorSystem,
                           topic: String,
                           timeout: FiniteDuration,
@@ -42,6 +47,8 @@ object KafkaClusterSharding {
       .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
 
   /**
+   * API MAY CHANGE
+   *
    * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
    * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
    *
@@ -50,9 +57,10 @@ object KafkaClusterSharding {
    * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
    * a field from the Entity to use as the entity id for the hashing strategy.
    *
-   * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
    * that entities are routed to the same Entity type.
    */
+  @ApiMayChange
   def messageExtractorNoEnvelope[M](system: ActorSystem,
                                     topic: String,
                                     timeout: FiniteDuration,
@@ -75,6 +83,7 @@ object KafkaClusterSharding {
     }
   }
 
+  @InternalApi
   sealed trait KafkaClusterSharding {
     def kafkaPartitions: Int
     def shardId(entityId: String): String = {
@@ -84,62 +93,69 @@ object KafkaClusterSharding {
     }
   }
 
-  final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int)
+  @InternalApi
+  final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
     extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
     override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
     override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
   }
 
-  final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, entityIdExtractor: M => String)
+  @InternalApi
+  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
     extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
     override def entityId(message: M): String = entityIdExtractor(message)
     override def unwrapMessage(message: M): M = message
   }
 
-  // TODO:
-  // - will require `akka-actors-typed` as another provided dep, or should we just return a classic actor?
-  // - returning a typed actor is more flexible for the user so that they can easy create it under the `user` guardian
-  //   when running akka typed. an alternative would be to create the actor ourself as a system actor, like is done with
-  //   the KafkaConsumerActor for the metadata client.
+  // TODO: will require `akka-actors-typed` as a provided dep
   /**
-   * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events created by the Kafka consumer group rebalance
-   * listener. As partitions are assigned to this consumer group member we update the External Sharding strategy
+   * API MAY CHANGE
+   *
+   * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is used
+   * to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member the
+   * rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy
    * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+   *
+   * Returns an Akka classic [[ActorRef]] that can be passed to an Alpakka Kafka [[ConsumerSettings]].
    */
-  object RebalanceListener {
-    def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
-      Behaviors.setup { ctx =>
-        val typeKeyName = typeKey.name
-        val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
-        val address = Cluster(ctx.system).selfMember.address
-        Behaviors.receive[ConsumerRebalanceEvent] {
-          case (ctx, TopicPartitionsAssigned(_, partitions)) =>
-            import ctx.executionContext
-            val partitionsList = partitions.mkString(",")
-            ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]",
-              typeKeyName, address, partitionsList)
-            val updates = partitions.map { tp =>
-              val shardId = tp.partition().toString
-              // Kafka partition number becomes the akka shard id
-              // TODO: support assigning more than 1 shard id at once?
-              shardAllocationClient.updateShardLocation(shardId, address)
+  @ApiMayChange
+  def rebalanceListener(system: ActorSystem, typeKey: EntityTypeKey[_]): ActorRef = {
+    val actor: Behavior[ConsumerRebalanceEvent] = Behaviors.setup { ctx =>
+      val typeKeyName = typeKey.name
+      val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
+      val address = Cluster(ctx.system).selfMember.address
+      Behaviors.receive[ConsumerRebalanceEvent] {
+        case (ctx, TopicPartitionsAssigned(_, partitions)) =>
+          import ctx.executionContext
+          val partitionsList = partitions.mkString(",")
+          ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]",
+            typeKeyName, address, partitionsList)
+          val updates = partitions.map { tp =>
+            val shardId = tp.partition().toString
+            // Kafka partition number becomes the akka shard id
+            // TODO: support assigning more than 1 shard id at once?
+            shardAllocationClient.updateShardLocation(shardId, address)
+          }
+          Future
+            .sequence(updates)
+            // Each Future returns successfully once a majority of cluster nodes receive the update. There's no point
+            // blocking here because the rebalance listener is triggered asynchronously. If we want to block during
+            // rebalance then we should provide an implementation using the `PartitionAssignmentHandler` instead
+            .onComplete {
+              case Success(_) =>
+                ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
+                  typeKeyName, address, partitionsList)
+              case Failure(ex) =>
+                ctx.log.error("A failure occurred while updating cluster shards", ex)
             }
-            // TODO: pipeToSelf since we're closing over local state?
-            Future
-              .sequence(updates)
-              // each Future returns successfully once a majority of cluster nodes receive the update.
-              // there's no point blocking here because the rebalance listener is triggered asynchronously.  if we want
-              // to block rebalances then we should provide an implementing using the `PartitionAssignmentHandler` instead
-              .onComplete {
-                case Success(_) =>
-                  ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
-                    typeKeyName, address, partitionsList)
-                case Failure(ex) =>
-                  ctx.log.error("A failure occurred while updating cluster shards", ex)
-              }
-            Behaviors.same
-          case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
-        }
+          Behaviors.same
+        case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
       }
+    }
+
+    system
+      .toTyped
+      .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener")
+      .toClassic
   }
 }
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 941f177..43ad4bb 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -10,10 +10,9 @@ import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions}
-import akka.kafka.scaladsl.Consumer
-import akka.stream.scaladsl.Sink
-import akka.stream.scaladsl.Source
+import akka.kafka.{CommitterSettings, ConsumerMessage, ConsumerSettings, KafkaClusterSharding, Subscriptions}
+import akka.kafka.scaladsl.{Committer, Consumer}
+import akka.stream.scaladsl.SourceWithContext
 import akka.util.Timeout
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -41,10 +40,10 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
-        val rebalanceListener = ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), "kafka-cluster-sharding-rebalance-listener")
+        val rebalanceListener = KafkaClusterSharding.rebalanceListener(classic, typeKey)
         val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
         val consumerSettings =
-          ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
+          ConsumerSettings(classic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
             .withGroupId(processorSettings.groupId)
             .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -52,12 +51,11 @@ object UserEventsKafkaProcessor {
 
         val subscription = Subscriptions
           .topics(processorSettings.topics: _*)
-          .withRebalanceListener(rebalanceListener.toClassic)
+          .withRebalanceListener(rebalanceListener)
 
-        val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
-          Consumer.plainSource(consumerSettings, subscription)
+        val kafkaConsumer: SourceWithContext[ConsumerRecord[String, Array[Byte]], ConsumerMessage.CommittableOffset, Consumer.Control] =
+          Consumer.sourceWithOffsetContext(consumerSettings, subscription)
 
-        // TODO use committable source and reliable delivery (once released)?
         val stream: Future[Done] = kafkaConsumer
           .log("kafka-consumer")
           .filter(_.key() != null) // no entity id
@@ -65,7 +63,7 @@ object UserEventsKafkaProcessor {
             // alternatively the user id could be in the message rather than use the kafka key
             ctx.log.info(s"entityId->partition ${record.key()}->${record.partition()}")
             ctx.log.info("Forwarding message for entity {} to cluster sharding", record.key())
-            // idempotency?
+            // TODO idempotency? reliable delivery (once released)?
             retry(
               () =>
                 shardRegion.ask[Done](replyTo => {
@@ -77,10 +75,11 @@ object UserEventsKafkaProcessor {
                     purchaseProto.price,
                     replyTo)
                 })(timeout, ctx.system.scheduler),
-              3,
-              1.second)
+              attempts = 3,
+              delay = 1.second
+            )
           }
-          .runWith(Sink.ignore)
+          .runWith(Committer.sinkWithOffsetContext(CommitterSettings(classic)))
 
         stream.onComplete { result =>
           ctx.self ! KafkaConsumerStopped(result)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 04/09: RebalanceListener

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit f53f3b0a0adbc72aec268fc456f741676afa45f9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Tue Feb 18 17:15:27 2020 -0500

    RebalanceListener
---
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 131 ++++++++++++++++-----
 .../main/scala/sample/sharding/kafka/Main.scala    |  12 +-
 .../sample/sharding/kafka/TopicListener.scala      |  54 ---------
 .../scala/sample/sharding/kafka/UserEvents.scala   |  23 ++--
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  11 +-
 .../sample/sharding/kafka/UserGrpcService.scala    |   4 +-
 6 files changed, 127 insertions(+), 108 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 5562c42..5bbd885 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -2,34 +2,64 @@ package akka.kafka
 
 import java.util.concurrent.atomic.AtomicInteger
 
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.external.ExternalShardAllocation
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.cluster.typed.Cluster
 import akka.kafka.scaladsl.MetadataClient
 import akka.util.Timeout._
 import org.apache.kafka.common.utils.Utils
 
-import scala.concurrent.{ExecutionContextExecutor, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success}
 
+/**
+ * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka.
+ */
 object KafkaClusterSharding {
   private val metadataActorCounter = new AtomicInteger
 
+  /**
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
+   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
+   * the Kafka Consumer connection required to retrieve the number of partitions.
+   *
+   * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
   def messageExtractor[M](system: ActorSystem,
-                          groupId: String,
                           topic: String,
                           timeout: FiniteDuration,
                           settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
     getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher)
+      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
 
+  /**
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
+   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
+   * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
+   * a field from the Entity to use as the entity id for the hashing strategy.
+   *
+   * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
   def messageExtractorNoEnvelope[M](system: ActorSystem,
-                                    groupId: String,
                                     topic: String,
                                     timeout: FiniteDuration,
                                     entityIdExtractor: M => String,
                                     settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
     getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher)
+      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher)
 
   private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
     implicit val ec: ExecutionContextExecutor = system.dispatcher
@@ -44,35 +74,72 @@ object KafkaClusterSharding {
       count
     }
   }
-}
 
-trait KafkaClusterSharding {
-  def groupId: String
-  def kafkaPartitions: Int
+  sealed trait KafkaClusterSharding {
+    def kafkaPartitions: Int
+    def shardId(entityId: String): String = {
+      // simplified version of Kafka's `DefaultPartitioner` implementation
+      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+      partition.toString
+    }
+  }
 
-  def shardId(entityId: String): String = {
-    // simplified version of Kafka's `DefaultPartitioner` implementation
-    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-    s"$groupId-$partition"
+  final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int)
+    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
+    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
   }
-}
 
-class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int)
-  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
-  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-}
+  final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, entityIdExtractor: M => String)
+    extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
+    override def entityId(message: M): String = entityIdExtractor(message)
+    override def unwrapMessage(message: M): M = message
+  }
 
-/**
- * Caveats
- * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
- * - Values are passed as `null` to the partitioner.
- * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
- *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
- *   partitioner doesn't make use of any other Kafka Cluster metadata.
- */
-class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String)
-  extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
-  override def entityId(message: M): String = entityIdExtractor(message)
-  override def unwrapMessage(message: M): M = message
-}
+  // TODO:
+  // - will require `akka-actors-typed` as another provided dep, or should we just return a classic actor?
+  // - returning a typed actor is more flexible for the user so that they can easy create it under the `user` guardian
+  //   when running akka typed. an alternative would be to create the actor ourself as a system actor, like is done with
+  //   the KafkaConsumerActor for the metadata client.
+  /**
+   * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events created by the Kafka consumer group rebalance
+   * listener. As partitions are assigned to this consumer group member we update the External Sharding strategy
+   * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+   */
+  object RebalanceListener {
+    def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
+      Behaviors.setup { ctx =>
+        val typeKeyName = typeKey.name
+        val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
+        val address = Cluster(ctx.system).selfMember.address
+        Behaviors.receive[ConsumerRebalanceEvent] {
+          case (ctx, TopicPartitionsAssigned(_, partitions)) =>
+            import ctx.executionContext
+            val partitionsList = partitions.mkString(",")
+            ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]",
+              typeKeyName, address, partitionsList)
+            val updates = partitions.map { tp =>
+              val shardId = tp.partition().toString
+              // Kafka partition number becomes the akka shard id
+              // TODO: support assigning more than 1 shard id at once?
+              shardAllocationClient.updateShardLocation(shardId, address)
+            }
+            // TODO: pipeToSelf since we're closing over local state?
+            Future
+              .sequence(updates)
+              // each Future returns successfully once a majority of cluster nodes receive the update.
+              // there's no point blocking here because the rebalance listener is triggered asynchronously.  if we want
+              // to block rebalances then we should provide an implementing using the `PartitionAssignmentHandler` instead
+              .onComplete {
+                case Success(_) =>
+                  ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
+                    typeKeyName, address, partitionsList)
+                case Failure(ex) =>
+                  ctx.log.error("A failure occurred while updating cluster shards", ex)
+              }
+            Behaviors.same
+          case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
+        }
+      }
+  }
+}
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index a695853..53e9d4d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -18,7 +18,7 @@ import scala.util.{Failure, Success}
 sealed trait Command
 case object NodeMemberUp extends Command
 case object StartProcessor extends Command
-final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
+final case class MessageExtractor(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
 
 object Main {
   def main(args: Array[String]): Unit = {
@@ -57,12 +57,16 @@ object Main {
           ctx.self.tell(StartProcessor)
           starting(Some(extractor))
         case (ctx, StartProcessor) if extractor.isDefined =>
-          UserEvents.init(ctx.system, extractor.get)
-          val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor")
+          val messageExtractor = extractor.get
+          val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor"))
+          UserEvents.init(ctx.system, messageExtractor, processorSettings.groupId)
+          val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(messageExtractor), "kafka-event-processor")
           ctx.watch(eventProcessor)
           ctx.log.info("Processor started.")
-          val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get)
+          val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, messageExtractor)
           running(binding, eventProcessor)
+        case (ctx, StartProcessor) =>
+          Behaviors.same
       }
 
     def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
deleted file mode 100644
index 3b59790..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package sample.sharding.kafka
-
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.external._
-import akka.cluster.typed.Cluster
-import akka.kafka.ConsumerRebalanceEvent
-import akka.kafka.TopicPartitionsAssigned
-import akka.kafka.TopicPartitionsRevoked
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-
-object TopicListener {
-  def apply(groupId: String, typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
-    Behaviors.setup { ctx =>
-      import ctx.executionContext
-      val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
-      ctx.system.scheduler.scheduleAtFixedRate(10.seconds, 20.seconds) { () =>
-        shardAllocationClient.shardLocations().onComplete {
-          case Success(shardLocations) =>
-            ctx.log.info("Current shard locations {}", shardLocations.locations)
-          case Failure(t) =>
-            ctx.log.error("failed to get shard locations", t)
-        }
-      }
-      val address = Cluster(ctx.system).selfMember.address
-      Behaviors.receiveMessage[ConsumerRebalanceEvent] {
-        case TopicPartitionsAssigned(sub, partitions) =>
-          // TODO
-          // - log all partitions assigned in one log line
-          // - block for shard allocation to complete, add configurable timeout
-          partitions.foreach(tp => {
-            val shardId = s"$groupId-${tp.partition()}"
-            ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
-            // kafka partition becomes the akka shard
-            val done = shardAllocationClient.updateShardLocation(shardId, address)
-            done.onComplete { result =>
-              ctx.log.info("Result for updating shard {}: {}", shardId, result)
-            }
-
-          })
-          Behaviors.same
-        case TopicPartitionsRevoked(_, topicPartitions) =>
-          ctx.log.info(
-            "Partitions [{}] of group [{}] revoked from current node. New location will update shard allocation",
-            topicPartitions.mkString(","),
-            groupId)
-          Behaviors.same
-      }
-    }
-}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 69d815d..1cfa62d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -7,17 +7,13 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
 import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor}
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
-import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor}
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding}
 import org.apache.kafka.common.serialization.StringDeserializer
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
 object UserEvents {
-
-  val TypeKey: EntityTypeKey[UserEvents.Message] =
-    EntityTypeKey[UserEvents.Message]("user-processing")
-
   sealed trait Message extends CborSerializable {
     def userId: String
   }
@@ -60,12 +56,11 @@ object UserEvents {
    * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka
    * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding.
    */
-  def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = {
+  def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
     KafkaClusterSharding.messageExtractorNoEnvelope(
       system = system.toClassic,
       timeout = 10.seconds,
-      groupId = processorConfig.groupId,
       topic = processorConfig.topics.head,
       entityIdExtractor = (msg: Message) => msg.userId,
       settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
@@ -73,13 +68,17 @@ object UserEvents {
     )
   }
 
-  def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] =
+  def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[Message] = {
+    // create an Akka Cluster Sharding `EntityTypeKey` for `UserEvents` for this Kafka consumer group
+    val typeKey = EntityTypeKey[UserEvents.Message](groupId)
     ClusterSharding(system).init(
-      Entity(TypeKey)(createBehavior = _ => UserEvents())
-        .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
+      Entity(typeKey)(createBehavior = _ => UserEvents())
+        // NOTE: why does `ExternalShardAllocationStrategy` not accept the type key type itself?
+        .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
         .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))
+  }
 
-  def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] =
-    init(system, messageExtractor).narrow[UserQuery]
+  def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[UserQuery] =
+    init(system, messageExtractor, groupId).narrow[UserQuery]
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 155a51c..941f177 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -9,8 +9,8 @@ import akka.actor.typed.scaladsl.AskPattern._
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.cluster.sharding.typed.ShardingMessageExtractor
-import akka.kafka.ConsumerSettings
-import akka.kafka.Subscriptions
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions}
 import akka.kafka.scaladsl.Consumer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
@@ -40,8 +40,9 @@ object UserEventsKafkaProcessor {
         implicit val scheduler: Scheduler = classic.scheduler
         // TODO config
         val timeout = Timeout(3.seconds)
-        val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
-        val shardRegion = UserEvents.init(ctx.system, extractor)
+        val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
+        val rebalanceListener = ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), "kafka-cluster-sharding-rebalance-listener")
+        val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
@@ -51,7 +52,7 @@ object UserEventsKafkaProcessor {
 
         val subscription = Subscriptions
           .topics(processorSettings.topics: _*)
-          .withRebalanceListener(rebalancerRef.toClassic)
+          .withRebalanceListener(rebalanceListener.toClassic)
 
         val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
           Consumer.plainSource(consumerSettings, subscription)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index f3acb7c..3a3e808 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -5,6 +5,7 @@ import akka.actor.typed.ActorSystem
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
 import sample.sharding.kafka.UserEvents.GetRunningTotal
 import sample.sharding.kafka.UserEvents.RunningTotal
 
@@ -17,7 +18,8 @@ class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtracto
   implicit val sched = system.scheduler
   implicit val ec = system.executionContext
 
-  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor)
+  private val processorSettings = ProcessorConfig(ConfigFactory.load().getConfig("kafka-to-sharding-processor"))
+  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor, processorSettings.groupId)
 
   override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
     shardRegion


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 02/09: WIP- use systemActorOf to create metadata client

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 5c118c69be77f2ceac59f4c04c4e561eaa59738d
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Fri Feb 14 13:14:22 2020 -0500

    WIP- use systemActorOf to create metadata client
---
 .../akka/kafka/KafkaShardingMessageExtractor.scala | 78 ++++++++++++----------
 .../scala/sample/sharding/kafka/UserEvents.scala   | 29 ++++----
 2 files changed, 60 insertions(+), 47 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
index f02eb41..14353ab 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
@@ -1,52 +1,61 @@
 package akka.kafka
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import akka.actor.{ActorSystem, ExtendedActorSystem}
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy
 import akka.kafka.scaladsl.MetadataClient
 import akka.util.Timeout._
-import org.apache.kafka.clients.producer.Partitioner
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner
-import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster}
+import org.apache.kafka.common.utils.Utils
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContextExecutor}
-import scala.jdk.CollectionConverters._
-
-private[kafka] trait DefaultKafkaShardingMessageExtractor {
-  implicit val actorSystem: ActorSystem
-  implicit val timeout: FiniteDuration
-  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+import scala.concurrent.{Await, ExecutionContext}
 
-  val clientSettings: ConsumerSettings[_, _]
-  val groupId: String
-  val topic: String
+object DefaultKafkaShardingMessageExtractor {
+  sealed trait PartitionCountStrategy {
+    def groupId: String
+    def partitions: Int
+  }
+  final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy
+  final case class RetrieveFromKafka(
+                                      system: ActorSystem,
+                                      timeout: FiniteDuration,
+                                      groupId: String,
+                                      topic: String,
+                                      settings: ConsumerSettings[_,_])
+                                    extends PartitionCountStrategy {
+    import RetrieveFromKafka._
+    private implicit val ec: ExecutionContext = system.dispatcher
+    lazy val partitions: Int = {
+      val actorNum = metadataActorCounter.getAndIncrement()
+      val consumerActor = system
+        .asInstanceOf[ExtendedActorSystem]
+        .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
+      val metadataClient = MetadataClient.create(consumerActor, timeout)
+      val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
+      numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId))
+      Await.result(numPartitions, timeout)
+    }
+  }
+  object RetrieveFromKafka {
+    private val metadataActorCounter = new AtomicInteger
+  }
+}
 
-  private val CLUSTER_ID = "cluster-id"
-  private val kafkaPartitioner = partitioner()
-  private val kafkaCluster = cluster(partitions())
+private[kafka] trait DefaultKafkaShardingMessageExtractor {
+  val strategy: PartitionCountStrategy
+  private val groupId: String = strategy.groupId
+  private val kafkaPartitions: Int = strategy.partitions
 
   def shardId(entityId: String): String = {
-    val partition = kafkaPartitioner
-      .partition(topic, entityId, entityId.getBytes(), null, null, kafkaCluster)
+    // simplified version of Kafka's `DefaultPartitioner` implementation
+    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
     s"$groupId-$partition"
   }
-
-  def partitions(): List[PartitionInfo] = {
-    val consumerActor = actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), "metadata-consumer-actor")
-    val metadataClient = MetadataClient.create(consumerActor, timeout)
-    val partitions = metadataClient.getPartitionsFor(topic)
-    partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for topic %s for group %s", p.length, topic, groupId))
-    Await.result(partitions, timeout)
-  }
-
-  def cluster(partitions: List[PartitionInfo]): KafkaCluster =
-    new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava)
-
-  def partitioner(): Partitioner = new DefaultPartitioner()
 }
 
-final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
-                                            (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy)
   extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
   override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
   override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
@@ -60,8 +69,7 @@ final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSetting
  *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
  *   partitioner doesn't make use of any other Kafka Cluster metadata.
  */
-abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
-                                                  (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy)
   extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
   override def unwrapMessage(message: M): M = message
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f0f4f90..5cf3321 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -7,6 +7,7 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
 import akka.cluster.sharding.typed.ClusterShardingSettings
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka}
 import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
 import org.apache.kafka.common.serialization.StringDeserializer
 
@@ -53,24 +54,28 @@ object UserEvents {
     }
   }
 
-  /*
-   * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying DefaultPartitioner so that the same murmur2
-   * hashing algorithm is used for Kafka and Akka Cluster Sharding
+  /**
+   * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for
+   * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
+   * (murmur2) with Akka Cluster Sharding.
    */
-  class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: String, groupId: String)
-                              (implicit actorSystem: akka.actor.ActorSystem, timeout: FiniteDuration)
-      extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, groupId) {
+  class UserIdMessageExtractor(strategy: PartitionCountStrategy)
+      extends KafkaShardingNoEnvelopeExtractor[Message](strategy) {
     def entityId(message: Message): String = message.userId
   }
 
   def init(system: ActorSystem[_]): ActorRef[Message] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
-    implicit val classic: akka.actor.ActorSystem = system.toClassic
-    implicit val timeout: FiniteDuration = 10.seconds
-    val clientSettings = ConsumerSettings(classic, new StringDeserializer, new StringDeserializer)
-    val topic = processorConfig.topics.head
-    val groupId = processorConfig.groupId
-    val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, groupId)
+    val messageExtractor = new UserIdMessageExtractor(
+      strategy = RetrieveFromKafka(
+        system = system.toClassic,
+        timeout = 10.seconds,
+        groupId = processorConfig.groupId,
+        topic = processorConfig.topics.head,
+        settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
+          .withBootstrapServers(processorConfig.bootstrapServers)
+      )
+    )
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 01/09: WIP

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 34595d0d50bc3c0edf9dad25e9bd73153627c79e
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Thu Feb 13 17:41:23 2020 -0500

    WIP
---
 akka-sample-kafka-to-sharding-scala/README.md      | 25 ++++++--
 akka-sample-kafka-to-sharding-scala/build.sbt      | 13 ++++-
 .../kafka/src/main/resources/logback.xml           | 23 ++++++++
 .../sharding/embeddedkafka/KafkaBroker.scala       | 22 +++++++
 .../processor/src/main/resources/application.conf  |  3 +-
 .../akka/kafka/KafkaShardingMessageExtractor.scala | 67 ++++++++++++++++++++++
 .../sample/sharding/kafka/ProcessorConfig.scala    |  8 +--
 .../sample/sharding/kafka/TopicListener.scala      | 18 +++---
 .../scala/sample/sharding/kafka/UserEvents.scala   | 33 ++++++-----
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  6 +-
 .../producer/src/main/resources/application.conf   |  2 +-
 .../sharding/kafka/producer/ProducerConfig.scala   | 19 +-----
 .../kafka/producer/UserEventProducer.scala         | 19 +-----
 13 files changed, 188 insertions(+), 70 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index af1433c..7194fe2 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -30,17 +30,32 @@ The sample is made up of three applications:
 * `processor` An Akka Cluster Sharding application that reads the Kafka topic and forwards the messages to a sharded
               entity that represents a user and a gRPC front end for accessing the sharded actors state
 * `client` A gRPC client for interacting with the cluster
+* `kafka` A local Kafka server
               
 The sample demonstrates how the external shard allocation strategy can used so messages are processed locally.
 
 The sample depends on a Kafka broker running locally on port `9092` with a topic with 128 partitions called `user-events.`
-[Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or run locally following [these instructions](https://kafka.apache.org/quickstart).
+[Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or run locally using the optional `kafka` project.
 
-Update the `applications.conf`s in each project to point to your Kafka broker if not running on `localhost:9092`
+* Run the local Kafka server. This project will also create the `user-events` topic.
+
+```
+sbt "kafka / run"
+```
+
+In the Kafka server window you'll see the following when the server is ready:
+
+```
+12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$   Kafka running on port '9092'
+12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$   Topic 'user-events' with '128' partitions created
+```
+
+If you want to use a different Kafka cluster then then update the `applications.conf`s in each project to point to your 
+Kafka broker if not running on `localhost:9092`.
 
 
-* Create a topic with 128 partitions, or update application.conf with the desired number of
-  partitions e.g. a command from your Kafka installation:
+* _(Optional)_ If you do not run the local Kafka server then you must create a topic with 128 partitions, or update 
+  application.conf with the desired number of partitions e.g. a command from your Kafka installation:
   
 ```
   bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 128 --topic user-events
@@ -104,7 +119,7 @@ As there is only one node we get 100% locality, each forwarded message is proces
 Now let's see that remain true once we add more nodes to the Akka Cluster, add another with different ports:
 
 ```
- sbt "processor / run 2552 8552 8082"
+sbt "processor / run 2552 8552 8082"
 ```
 
 When this starts up we'll see Kafka assign partitions to the new node (it is in the same consumer group):
diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 10f8cbb..5a08bb7 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,8 +1,8 @@
 val AkkaVersion = "2.6.3"
-// TODO upgrade to 2.0.0
-val AlpakkaKafkaVersion = "1.1.0"
+val AlpakkaKafkaVersion = "2.0.1"
 val AkkaManagementVersion = "1.0.5"
 val AkkaHttpVersion = "10.1.11"
+val KafkaVersion = "2.4.0"
 val LogbackVersion = "1.2.3"
 
 ThisBuild / scalaVersion := "2.13.1"
@@ -22,6 +22,15 @@ Global / cancelable := true // ctrl-c
 
 lazy val `akka-sample-kafka-to-sharding` = project.in(file(".")).aggregate(producer, processor, client)
 
+lazy val kafka = project
+  .in(file("kafka"))
+  .settings(
+    libraryDependencies ++= Seq(
+      "ch.qos.logback" % "logback-classic" % LogbackVersion,
+      "org.slf4j" % "log4j-over-slf4j" % "1.7.26",
+      "io.github.embeddedkafka" %% "embedded-kafka" % KafkaVersion),
+    cancelable := false)
+
 lazy val client = project
   .in(file("client"))
   .enablePlugins(AkkaGrpcPlugin, JavaAgent)
diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml b/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
new file mode 100644
index 0000000..c747bc5
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/resources/logback.xml
@@ -0,0 +1,23 @@
+<configuration>
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>kafka/target/kafka.log</file>
+        <append>false</append>
+        <encoder>
+            <pattern>%d{ISO8601} %-5level [%-20.20thread] [%-36.36logger{36}]  %msg%n%rEx</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level [%-20.20thread] %-36.36logger{36}  %msg%n%rEx</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="sample.sharding.embeddedkafka" levle="INFO">
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <root level="INFO">
+        <appender-ref ref="FILE" />
+    </root>
+</configuration>
diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
new file mode 100644
index 0000000..90663a3
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
@@ -0,0 +1,22 @@
+package sample.sharding.embeddedkafka
+
+import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.slf4j.LoggerFactory
+
+object KafkaBroker extends App with EmbeddedKafka {
+  val log = LoggerFactory.getLogger(this.getClass)
+
+  val port = 9092
+  val topic = "user-events"
+  val partitions = 128
+
+  val config = EmbeddedKafkaConfig(kafkaPort = port)
+  val server = EmbeddedKafka.start()(config)
+
+  createCustomTopic(topic = topic, partitions = partitions)
+
+  log.info(s"Kafka running on port '$port'")
+  log.info(s"Topic '$topic' with '$partitions' partitions created")
+
+  server.broker.awaitShutdown()
+}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index 76f1266..ef7e721 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -1,8 +1,7 @@
 kafka-to-sharding-processor {
   bootstrap-servers = "localhost:9092"
-  topic = "user-events"
+  topics = ["user-events"]
   group = "group-1"
-  nr-partitions = 128
 }
 
 akka.http {
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
new file mode 100644
index 0000000..f02eb41
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
@@ -0,0 +1,67 @@
+package akka.kafka
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.scaladsl.MetadataClient
+import akka.util.Timeout._
+import org.apache.kafka.clients.producer.Partitioner
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner
+import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContextExecutor}
+import scala.jdk.CollectionConverters._
+
+private[kafka] trait DefaultKafkaShardingMessageExtractor {
+  implicit val actorSystem: ActorSystem
+  implicit val timeout: FiniteDuration
+  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+
+  val clientSettings: ConsumerSettings[_, _]
+  val groupId: String
+  val topic: String
+
+  private val CLUSTER_ID = "cluster-id"
+  private val kafkaPartitioner = partitioner()
+  private val kafkaCluster = cluster(partitions())
+
+  def shardId(entityId: String): String = {
+    val partition = kafkaPartitioner
+      .partition(topic, entityId, entityId.getBytes(), null, null, kafkaCluster)
+    s"$groupId-$partition"
+  }
+
+  def partitions(): List[PartitionInfo] = {
+    val consumerActor = actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), "metadata-consumer-actor")
+    val metadataClient = MetadataClient.create(consumerActor, timeout)
+    val partitions = metadataClient.getPartitionsFor(topic)
+    partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for topic %s for group %s", p.length, topic, groupId))
+    Await.result(partitions, timeout)
+  }
+
+  def cluster(partitions: List[PartitionInfo]): KafkaCluster =
+    new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava)
+
+  def partitioner(): Partitioner = new DefaultPartitioner()
+}
+
+final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
+                                            (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
+  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+}
+
+/**
+ * Caveats
+ * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
+ * - Values are passed as `null` to the partitioner.
+ * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
+ *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
+ *   partitioner doesn't make use of any other Kafka Cluster metadata.
+ */
+abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
+                                                  (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+  extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
+  override def unwrapMessage(message: M): M = message
+}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
index fd07ac4..1749aee 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorConfig.scala
@@ -1,14 +1,14 @@
 package sample.sharding.kafka
 
 import com.typesafe.config.Config
+import scala.jdk.CollectionConverters._
 
 case object ProcessorConfig {
   def apply(config: Config): ProcessorConfig =
     new ProcessorConfig(
       config.getString("bootstrap-servers"),
-      config.getString("topic"),
-      config.getString("group"),
-      config.getInt("nr-partitions"))
+      config.getStringList("topics").asScala.toList,
+      config.getString("group"))
 }
 
-final class ProcessorConfig(val bootstrapServers: String, val topic: String, val groupId: String, val nrPartitions: Int)
+final class ProcessorConfig(val bootstrapServers: String, val topics: List[String], val groupId: String)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
index ac62b4c..79be5e3 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
@@ -14,7 +14,7 @@ import scala.util.Failure
 import scala.util.Success
 
 object TopicListener {
-  def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
+  def apply(groupId: String, typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
     Behaviors.setup { ctx =>
       import ctx.executionContext
       val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
@@ -28,21 +28,23 @@ object TopicListener {
       }
       val address = Cluster(ctx.system).selfMember.address
       Behaviors.receiveMessage[ConsumerRebalanceEvent] {
-        case TopicPartitionsAssigned(_, partitions) =>
-          partitions.foreach(partition => {
-            ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", partition.partition())
+        case TopicPartitionsAssigned(sub, partitions) =>
+          partitions.foreach(tp => {
+            val shardId = s"$groupId-${tp.partition()}"
+            ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
             // kafka partition becomes the akka shard
-            val done = shardAllocationClient.updateShardLocation(partition.partition().toString, address)
+            val done = shardAllocationClient.updateShardLocation(shardId, address)
             done.onComplete { result =>
-              ctx.log.info("Result for updating shard {}: {}", partition, result)
+              ctx.log.info("Result for updating shard {}: {}", shardId, result)
             }
 
           })
           Behaviors.same
         case TopicPartitionsRevoked(_, topicPartitions) =>
           ctx.log.info(
-            "Partitions [{}] revoked from current node. New location will update shard allocation",
-            topicPartitions.mkString(","))
+            "Partitions [{}] of group [{}] revoked from current node. New location will update shard allocation",
+            topicPartitions.mkString(","),
+            groupId)
           Behaviors.same
       }
     }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f093fff..f0f4f90 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -1,16 +1,16 @@
 package sample.sharding.kafka
 
 import akka.Done
-import akka.actor.typed.ActorRef
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.Behaviors
+import akka.actor.typed.scaladsl.adapter._
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
 import akka.cluster.sharding.typed.ClusterShardingSettings
-import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor
-import akka.cluster.sharding.typed.scaladsl.ClusterSharding
-import akka.cluster.sharding.typed.scaladsl.Entity
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
+import org.apache.kafka.common.serialization.StringDeserializer
+
+import scala.concurrent.duration._
 
 object UserEvents {
 
@@ -54,20 +54,27 @@ object UserEvents {
   }
 
   /*
-   * The murmur2 message extractor matches kafka's default partitioning when messages
-   * have keys that are strings
+   * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying DefaultPartitioner so that the same murmur2
+   * hashing algorithm is used for Kafka and Akka Cluster Sharding
    */
-  class UserIdMessageExtractor(nrKafkaPartitions: Int)
-      extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) {
-    override def entityId(message: Message): String = message.userId
+  class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: String, groupId: String)
+                              (implicit actorSystem: akka.actor.ActorSystem, timeout: FiniteDuration)
+      extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, groupId) {
+    def entityId(message: Message): String = message.userId
   }
 
   def init(system: ActorSystem[_]): ActorRef[Message] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
+    implicit val classic: akka.actor.ActorSystem = system.toClassic
+    implicit val timeout: FiniteDuration = 10.seconds
+    val clientSettings = ConsumerSettings(classic, new StringDeserializer, new StringDeserializer)
+    val topic = processorConfig.topics.head
+    val groupId = processorConfig.groupId
+    val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, groupId)
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
-        .withMessageExtractor(new UserIdMessageExtractor(processorConfig.nrPartitions))
+        .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))
   }
 
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 8c69c83..511c3bc 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -39,7 +39,7 @@ object UserEventsKafkaProcessor {
         implicit val scheduler: Scheduler = classic.scheduler
         // TODO config
         val timeout = Timeout(3.seconds)
-        val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), "rebalancerRef")
+        val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
         val shardRegion = UserEvents.init(ctx.system)
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
@@ -48,7 +48,9 @@ object UserEventsKafkaProcessor {
             .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
             .withStopTimeout(0.seconds)
 
-        val subscription = Subscriptions.topics(processorSettings.topic).withRebalanceListener(rebalancerRef.toClassic)
+        val subscription = Subscriptions
+          .topics(processorSettings.topics: _*)
+          .withRebalanceListener(rebalancerRef.toClassic)
 
         val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
           Consumer.plainSource(consumerSettings, subscription)
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
index 2025bb6..38d51c0 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
@@ -1,5 +1,5 @@
 kafka-to-sharding-producer {
-  bootstrap-servers = "localhost:9092"
+  bootstrap-servers = "localhost:9094"
   topic = "user-events"
 
   # can be one of:
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
index f11b2b7..9d4c374 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/ProducerConfig.scala
@@ -2,28 +2,13 @@ package sharding.kafka.producer
 
 import com.typesafe.config.Config
 
-sealed trait Partitioning
-case object Default extends Partitioning
-case object Explicit extends Partitioning
-
-object Partitioning {
-  def valueOf(input: String): Partitioning = input.toLowerCase match {
-    case "explicit" => Explicit
-    case _          => Default
-  }
-}
-
 case object ProducerConfig {
   def apply(config: Config): ProducerConfig =
     new ProducerConfig(
       config.getString("bootstrap-servers"),
-      config.getString("topic"),
-      Partitioning.valueOf(config.getString("partitioning")),
-      config.getInt("nr-partitions"))
+      config.getString("topic"))
 }
 
 final class ProducerConfig(
     val bootstrapServers: String,
-    val topic: String,
-    val partitioning: Partitioning,
-    val nrPartitions: Int)
+    val topic: String)
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index c7d2869..846dee7 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -52,23 +52,10 @@ object UserEventProducer extends App {
         val product = products(Random.nextInt(products.size))
         val message = UserPurchaseProto(randomEntityId, product, quantity, price).toByteArray
         log.info("Sending message to user {}", randomEntityId)
-        producerRecord(randomEntityId, message)
-
-      })
-      .runWith(Producer.plainSink(producerSettings))
-
-  def producerRecord(entityId: String, message: Array[Byte]): ProducerRecord[String, Array[Byte]] = {
-    producerConfig.partitioning match {
-      case Default =>
         // rely on the default kafka partitioner to hash the key and distribute among shards
         // the logic of the default partitioner must be replicated in MessageExtractor entityId -> shardId function
-        new ProducerRecord[String, Array[Byte]](producerConfig.topic, entityId, message)
-      case Explicit =>
-        // this logic MUST be replicated in the MessageExtractor entityId -> shardId function!
-        val shardAndPartition = (Utils.toPositive(Utils.murmur2(entityId.getBytes(StandardCharsets.UTF_8))) % producerConfig.nrPartitions)
-        log.info(s"entityId->partition ${entityId}->${shardAndPartition}")
-        new ProducerRecord[String, Array[Byte]](producerConfig.topic, shardAndPartition, entityId, message)
-    }
-  }
+        new ProducerRecord[String, Array[Byte]](producerConfig.topic, randomEntityId, message)
 
+      })
+      .runWith(Producer.plainSink(producerSettings))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 08/09: Use Alpakka Kafka snapshot

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 27b6c800a34b66e4878562e9cd0ffe286550aa36
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Feb 26 16:56:45 2020 -0500

    Use Alpakka Kafka snapshot
---
 akka-sample-kafka-to-sharding-scala/build.sbt      |   2 +-
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 212 ---------------------
 .../scala/sample/sharding/kafka/UserEvents.scala   |   1 -
 3 files changed, 1 insertion(+), 214 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 5a08bb7..1a3c8a7 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,5 +1,5 @@
 val AkkaVersion = "2.6.3"
-val AlpakkaKafkaVersion = "2.0.1"
+val AlpakkaKafkaVersion = "2.0.2+2-88deb905+20200226-1650"
 val AkkaManagementVersion = "1.0.5"
 val AkkaHttpVersion = "10.1.11"
 val KafkaVersion = "2.4.0"
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
deleted file mode 100644
index 6c981d0..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-package akka.kafka
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.adapter._
-import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId}
-import akka.annotation.{ApiMayChange, InternalApi}
-import akka.cluster.sharding.external.ExternalShardAllocation
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
-import akka.cluster.typed.Cluster
-import akka.kafka
-import akka.kafka.scaladsl.MetadataClient
-import akka.util.Timeout._
-import org.apache.kafka.common.utils.Utils
-
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContextExecutor, Future}
-import scala.util.{Failure, Success}
-
-/**
- * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka.
- */
-class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension {
-  import KafkaClusterSharding._
-
-  private val metadataActorCounter = new AtomicInteger
-  private var _messageExtractor: Option[KafkaShardingMessageExtractor[_]] = None
-  private var _messageExtractorNoEnvelope: Option[KafkaShardingNoEnvelopeExtractor[_]] = None
-
-  /**
-   * API MAY CHANGE
-   *
-   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
-   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
-   *
-   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
-   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
-   * the Kafka Consumer connection required to retrieve the number of partitions.
-   *
-   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
-   * that entities are routed to the same Entity type.
-   */
-  @ApiMayChange
-  def messageExtractor[M](topic: String,
-                          timeout: FiniteDuration,
-                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = _messageExtractor match {
-  case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingMessageExtractor[M]])
-  case _ =>
-    getPartitionCount(topic, timeout, settings)
-      .map { kafkaPartitions =>
-        val extractor = new KafkaShardingMessageExtractor[M](kafkaPartitions)
-        _messageExtractor = Some(extractor)
-        extractor
-      }(system.dispatcher)
-  }
-
-  /**
-   * API MAY CHANGE
-   *
-   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
-   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
-   *
-   * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
-   *
-   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
-   * that entities are routed to the same Entity type.
-   */
-  @ApiMayChange
-  def messageExtractor[M](kafkaPartitions: Int): Future[KafkaShardingMessageExtractor[M]] =
-    Future.successful(new KafkaShardingMessageExtractor[M](kafkaPartitions))
-
-  /**
-   * API MAY CHANGE
-   *
-   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
-   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
-   *
-   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
-   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
-   * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
-   * a field from the Entity to use as the entity id for the hashing strategy.
-   *
-   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
-   * that entities are routed to the same Entity type.
-   */
-  @ApiMayChange
-  def messageExtractorNoEnvelope[M](topic: String,
-                                    timeout: FiniteDuration,
-                                    entityIdExtractor: M => String,
-                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =  _messageExtractorNoEnvelope match {
-    case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingNoEnvelopeExtractor[M]])
-    case _ =>
-      getPartitionCount(topic, timeout, settings)
-        .map { kafkaPartitions =>
-          val extractor = new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)
-          _messageExtractorNoEnvelope = Some(extractor)
-          extractor
-        }(system.dispatcher)
-  }
-
-  /**
-   * API MAY CHANGE
-   *
-   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
-   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
-   *
-   * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
-   *
-   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
-   * that entities are routed to the same Entity type.
-   */
-  @ApiMayChange
-  def messageExtractorNoEnvelope[M](kafkaPartitions: Int, entityIdExtractor: M => String): Future[KafkaShardingNoEnvelopeExtractor[M]] =
-    Future.successful(new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))
-
-  private def getPartitionCount[M](topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
-    implicit val ec: ExecutionContextExecutor = system.dispatcher
-    val actorNum = metadataActorCounter.getAndIncrement()
-    val consumerActor = system
-      .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
-    val metadataClient = MetadataClient.create(consumerActor, timeout)
-    val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
-    numPartitions.map { count =>
-      system.log.info("Retrieved {} partitions for topic '{}'", count, topic)
-      count
-    }
-  }
-
-  // TODO: will require `akka-actors-typed` as a provided dep
-  /**
-   * API MAY CHANGE
-   *
-   * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is used
-   * to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member the
-   * rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy
-   * accordingly so that entities are (eventually) routed to the local Akka cluster member.
-   *
-   * Returns an Akka classic [[ActorRef]] that can be passed to an Alpakka Kafka [[ConsumerSettings]].
-   */
-  @ApiMayChange
-  def rebalanceListener(system: ActorSystem, typeKey: EntityTypeKey[_]): ActorRef = {
-    val actor: Behavior[ConsumerRebalanceEvent] = Behaviors.setup { ctx =>
-      val typeKeyName = typeKey.name
-      val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
-      val address = Cluster(ctx.system).selfMember.address
-      Behaviors.receive[ConsumerRebalanceEvent] {
-        case (ctx, TopicPartitionsAssigned(_, partitions)) =>
-          import ctx.executionContext
-          val partitionsList = partitions.mkString(",")
-          ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]",
-            typeKeyName, address, partitionsList)
-          val updates = partitions.map { tp =>
-            val shardId = tp.partition().toString
-            // Kafka partition number becomes the akka shard id
-            // TODO: support assigning more than 1 shard id at once?
-            shardAllocationClient.updateShardLocation(shardId, address)
-          }
-          Future
-            .sequence(updates)
-            // Each Future returns successfully once a majority of cluster nodes receive the update. There's no point
-            // blocking here because the rebalance listener is triggered asynchronously. If we want to block during
-            // rebalance then we should provide an implementation using the `PartitionAssignmentHandler` instead
-            .onComplete {
-              case Success(_) =>
-                ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
-                  typeKeyName, address, partitionsList)
-              case Failure(ex) =>
-                ctx.log.error("A failure occurred while updating cluster shards", ex)
-            }
-          Behaviors.same
-        case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
-      }
-    }
-
-    system
-      .toTyped
-      .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener")
-      .toClassic
-  }
-}
-
-object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {
-  @InternalApi
-  sealed trait KafkaClusterShardingContract {
-    def kafkaPartitions: Int
-    def shardId(entityId: String): String = {
-      // simplified version of Kafka's `DefaultPartitioner` implementation
-      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-      partition.toString
-    }
-  }
-
-  @InternalApi
-  final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
-    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterShardingContract {
-    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-  }
-
-  @InternalApi
-  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
-    extends ShardingMessageExtractor[M, M] with KafkaClusterShardingContract {
-    override def entityId(message: M): String = entityIdExtractor(message)
-    override def unwrapMessage(message: M): M = message
-  }
-
-  override def createExtension(system: ExtendedActorSystem): kafka.KafkaClusterSharding =
-    new KafkaClusterSharding(system)
-}
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 763bc49..a7b85d3 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -72,7 +72,6 @@ object UserEvents {
     val typeKey = EntityTypeKey[UserEvents.Message](groupId)
     ClusterSharding(system).init(
       Entity(typeKey)(createBehavior = _ => UserEvents())
-        // NOTE: why does `ExternalShardAllocationStrategy` not accept the type key type itself?
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
         .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 05/09: Typos

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 687eb63340b7569b19be80d8890af8995e268228
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Feb 19 12:14:29 2020 -0500

    Typos
---
 akka-sample-kafka-to-sharding-scala/README.md                         | 4 ++--
 .../processor/src/main/resources/application.conf                     | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 4913792..fbbaf7d 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -32,9 +32,9 @@ The sample is made up of three applications:
 * `client` A gRPC client for interacting with the cluster
 * `kafka` A local Kafka server
               
-The sample demonstrates how the external shard allocation strategy can used so messages are processed locally.
+The sample demonstrates how the external shard allocation strategy can be used so messages are processed locally.
 
-The sample depends on a Kafka broker running locally on port `9092` with a topic with 128 partitions called `user-events.`
+The sample depends on a Kafka broker running locally on port `9092` with a topic with 128 partitions called `user-events`. 
 [Kafka can be run in Docker](https://github.com/wurstmeister/kafka-docker) or run locally using the optional `kafka` project.
 
 * Run the local Kafka server. This project will also create the `user-events` topic.
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
index ef7e721..18a2c14 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf
@@ -1,7 +1,7 @@
 kafka-to-sharding-processor {
   bootstrap-servers = "localhost:9092"
   topics = ["user-events"]
-  group = "group-1"
+  group = "user-processing"
 }
 
 akka.http {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 03/09: WIP- async message extractor

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit fb0f3f28a5a8b5eb71658342d03dc0f77c5d78c9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Fri Feb 14 15:58:05 2020 -0500

    WIP- async message extractor
---
 akka-sample-kafka-to-sharding-scala/README.md      |  8 +--
 .../sharding/embeddedkafka/KafkaBroker.scala       |  8 +--
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 78 ++++++++++++++++++++
 .../akka/kafka/KafkaShardingMessageExtractor.scala | 75 -------------------
 .../main/scala/sample/sharding/kafka/Main.scala    | 84 ++++++++++++++--------
 .../sample/sharding/kafka/TopicListener.scala      |  3 +
 .../scala/sample/sharding/kafka/UserEvents.scala   | 48 ++++++-------
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  5 +-
 .../sample/sharding/kafka/UserGrpcService.scala    |  5 +-
 .../producer/src/main/resources/application.conf   | 11 +--
 .../kafka/producer/UserEventProducer.scala         |  7 +-
 11 files changed, 174 insertions(+), 158 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 7194fe2..4913792 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -154,16 +154,16 @@ Using Akka management we can see the shard allocations and the number of entitie
 ```
 
 // Node 1:
- curl -v localhost:8551/cluster/shards/user-processing  | jq
+ curl -v localhost:8551/cluster/shards/user-processing | jq
 
 // Node 2:
- curl -v localhost:8552/cluster/shards/user-processing  | jq
+ curl -v localhost:8552/cluster/shards/user-processing | jq
 ```
 
 We can count the number of shards on each:
 
 ```
-curl -v localhost:8551/cluster/shards/user-processing  | jq -r "." | grep shardId  | wc
+curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l
 ```
 
 The number of shards will depend on which entities have received messages.
@@ -181,7 +181,7 @@ the correct node even if that moves due to a kafka rebalance.
 A gRPC client is included which can be started with...
 
 ```
- sbt "client/run"
+sbt "client/run"
 ```
 
 It assumes there is one of the nodes running its front end port on 8081. 
diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
index 90663a3..7e5c81c 100644
--- a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
@@ -10,13 +10,13 @@ object KafkaBroker extends App with EmbeddedKafka {
   val topic = "user-events"
   val partitions = 128
 
-  val config = EmbeddedKafkaConfig(kafkaPort = port)
-  val server = EmbeddedKafka.start()(config)
+  implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = port)
+  val server = EmbeddedKafka.start()
 
   createCustomTopic(topic = topic, partitions = partitions)
 
-  log.info(s"Kafka running on port '$port'")
-  log.info(s"Topic '$topic' with '$partitions' partitions created")
+  log.info(s"Kafka running: localhost:$port")
+  log.info(s"Topic '$topic' with $partitions partitions created")
 
   server.broker.awaitShutdown()
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
new file mode 100644
index 0000000..5562c42
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -0,0 +1,78 @@
+package akka.kafka
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.scaladsl.MetadataClient
+import akka.util.Timeout._
+import org.apache.kafka.common.utils.Utils
+
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.concurrent.duration._
+
+object KafkaClusterSharding {
+  private val metadataActorCounter = new AtomicInteger
+
+  def messageExtractor[M](system: ActorSystem,
+                          groupId: String,
+                          topic: String,
+                          timeout: FiniteDuration,
+                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
+    getPartitionCount(system, topic, timeout, settings)
+      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher)
+
+  def messageExtractorNoEnvelope[M](system: ActorSystem,
+                                    groupId: String,
+                                    topic: String,
+                                    timeout: FiniteDuration,
+                                    entityIdExtractor: M => String,
+                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
+    getPartitionCount(system, topic, timeout, settings)
+      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher)
+
+  private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
+    implicit val ec: ExecutionContextExecutor = system.dispatcher
+    val actorNum = metadataActorCounter.getAndIncrement()
+    val consumerActor = system
+      .asInstanceOf[ExtendedActorSystem]
+      .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
+    val metadataClient = MetadataClient.create(consumerActor, timeout)
+    val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
+    numPartitions.map { count =>
+      system.log.info("Retrieved {} partitions for topic '{}'", count, topic)
+      count
+    }
+  }
+}
+
+trait KafkaClusterSharding {
+  def groupId: String
+  def kafkaPartitions: Int
+
+  def shardId(entityId: String): String = {
+    // simplified version of Kafka's `DefaultPartitioner` implementation
+    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+    s"$groupId-$partition"
+  }
+}
+
+class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int)
+  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
+  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+}
+
+/**
+ * Caveats
+ * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
+ * - Values are passed as `null` to the partitioner.
+ * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
+ *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
+ *   partitioner doesn't make use of any other Kafka Cluster metadata.
+ */
+class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String)
+  extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
+  override def entityId(message: M): String = entityIdExtractor(message)
+  override def unwrapMessage(message: M): M = message
+}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
deleted file mode 100644
index 14353ab..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package akka.kafka
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
-import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy
-import akka.kafka.scaladsl.MetadataClient
-import akka.util.Timeout._
-import org.apache.kafka.common.utils.Utils
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
-
-object DefaultKafkaShardingMessageExtractor {
-  sealed trait PartitionCountStrategy {
-    def groupId: String
-    def partitions: Int
-  }
-  final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy
-  final case class RetrieveFromKafka(
-                                      system: ActorSystem,
-                                      timeout: FiniteDuration,
-                                      groupId: String,
-                                      topic: String,
-                                      settings: ConsumerSettings[_,_])
-                                    extends PartitionCountStrategy {
-    import RetrieveFromKafka._
-    private implicit val ec: ExecutionContext = system.dispatcher
-    lazy val partitions: Int = {
-      val actorNum = metadataActorCounter.getAndIncrement()
-      val consumerActor = system
-        .asInstanceOf[ExtendedActorSystem]
-        .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
-      val metadataClient = MetadataClient.create(consumerActor, timeout)
-      val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
-      numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId))
-      Await.result(numPartitions, timeout)
-    }
-  }
-  object RetrieveFromKafka {
-    private val metadataActorCounter = new AtomicInteger
-  }
-}
-
-private[kafka] trait DefaultKafkaShardingMessageExtractor {
-  val strategy: PartitionCountStrategy
-  private val groupId: String = strategy.groupId
-  private val kafkaPartitions: Int = strategy.partitions
-
-  def shardId(entityId: String): String = {
-    // simplified version of Kafka's `DefaultPartitioner` implementation
-    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-    s"$groupId-$partition"
-  }
-}
-
-final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy)
-  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
-  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-}
-
-/**
- * Caveats
- * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
- * - Values are passed as `null` to the partitioner.
- * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
- *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
- *   partitioner doesn't make use of any other Kafka Cluster metadata.
- */
-abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy)
-  extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
-  override def unwrapMessage(message: M): M = message
-}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 96fce26..a695853 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -1,21 +1,24 @@
 package sample.sharding.kafka
 
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Terminated
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Terminated}
 import akka.cluster.ClusterEvent.MemberUp
-import akka.cluster.typed.Cluster
-import akka.cluster.typed.Subscribe
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.cluster.typed.{Cluster, Subscribe}
 import akka.http.scaladsl._
-import akka.http.scaladsl.model.HttpRequest
-import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
 import akka.management.scaladsl.AkkaManagement
 import akka.stream.Materializer
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
 
 import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+sealed trait Command
+case object NodeMemberUp extends Command
+case object StartProcessor extends Command
+final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
 
 object Main {
   def main(args: Array[String]): Unit = {
@@ -25,45 +28,69 @@ object Main {
     args.toList match {
       case portString :: managementPort :: frontEndPort :: Nil
           if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) =>
-        startNode(portString.toInt, managementPort.toInt, frontEndPort.toInt)
+        init(portString.toInt, managementPort.toInt, frontEndPort.toInt)
       case _ =>
         throw new IllegalArgumentException("usage: <remotingPort> <managementPort> <frontEndPort>")
     }
   }
 
-  def startNode(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
-    ActorSystem(Behaviors.setup[MemberUp] {
+  def init(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
+    ActorSystem(Behaviors.setup[Command] {
       ctx =>
-        implicit val mat = Materializer.createMaterializer(ctx.system.toClassic)
-        implicit val ec = ctx.executionContext
         AkkaManagement(ctx.system.toClassic).start()
-        // maybe don't start until part of the cluster, or add health check
-        val binding = startGrpc(ctx.system, mat, frontEndPort)
+
         val cluster = Cluster(ctx.system)
-        cluster.subscriptions.tell(Subscribe(ctx.self, classOf[MemberUp]))
+        val subscriber = ctx.spawn(clusterUpSubscriber(cluster, ctx.self), "cluster-subscriber")
+        cluster.subscriptions.tell(Subscribe(subscriber, classOf[MemberUp]))
+
+        ctx.pipeToSelf(UserEvents.messageExtractor(ctx.system)) {
+          case Success(extractor) => MessageExtractor(extractor)
+          case Failure(ex) => throw new Exception(ex)
+        }
+
+        starting()
+    }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+
+    def starting(extractor: Option[ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]] = None): Behavior[Command] = Behaviors
+      .receive[Command] {
+        case (ctx, MessageExtractor(extractor)) =>
+          ctx.self.tell(StartProcessor)
+          starting(Some(extractor))
+        case (ctx, StartProcessor) if extractor.isDefined =>
+          UserEvents.init(ctx.system, extractor.get)
+          val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor")
+          ctx.watch(eventProcessor)
+          ctx.log.info("Processor started.")
+          val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get)
+          running(binding, eventProcessor)
+      }
+
+    def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors
+      .receiveSignal {
+        case (ctx, Terminated(`processor`)) =>
+          ctx.log.warn("Kafka event processor stopped. Shutting down")
+          binding.map(_.unbind())(ctx.executionContext)
+          Behaviors.stopped
+      }
+
+    def clusterUpSubscriber(cluster: Cluster, parent: ActorRef[Command]): Behavior[MemberUp] = Behaviors.setup[MemberUp] {
+      ctx =>
         Behaviors
           .receiveMessage[MemberUp] {
             case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress =>
               ctx.log.info("Joined the cluster. Starting sharding and kafka processor")
-              UserEvents.init(ctx.system)
-              val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor")
-              ctx.watch(eventProcessor)
+              parent.tell(StartProcessor)
               Behaviors.same
             case MemberUp(member) =>
               ctx.log.info("Member up {}", member)
               Behaviors.same
           }
-          .receiveSignal {
-            case (ctx, Terminated(_)) =>
-              ctx.log.warn("Kafka event processor stopped. Shutting down")
-              binding.map(_.unbind())
-              Behaviors.stopped
-          }
-    }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+    }
 
-    def startGrpc(system: ActorSystem[_], mat: Materializer, frontEndPort: Int): Future[Http.ServerBinding] = {
+    def startGrpc(system: ActorSystem[_], frontEndPort: Int, extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Future[Http.ServerBinding] = {
+      val mat = Materializer.createMaterializer(system.toClassic)
       val service: HttpRequest => Future[HttpResponse] =
-        UserServiceHandler(new UserGrpcService(system))(mat, system.toClassic)
+        UserServiceHandler(new UserGrpcService(system, extractor))(mat, system.toClassic)
       Http()(system.toClassic).bindAndHandleAsync(
         service,
         interface = "127.0.0.1",
@@ -79,5 +106,4 @@ object Main {
        """).withFallback(ConfigFactory.load())
 
   }
-
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
index 79be5e3..3b59790 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
@@ -29,6 +29,9 @@ object TopicListener {
       val address = Cluster(ctx.system).selfMember.address
       Behaviors.receiveMessage[ConsumerRebalanceEvent] {
         case TopicPartitionsAssigned(sub, partitions) =>
+          // TODO
+          // - log all partitions assigned in one log line
+          // - block for shard allocation to complete, add configurable timeout
           partitions.foreach(tp => {
             val shardId = s"$groupId-${tp.partition()}"
             ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 5cf3321..69d815d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -5,12 +5,12 @@ import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.ClusterShardingSettings
+import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor}
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
-import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka}
-import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor}
 import org.apache.kafka.common.serialization.StringDeserializer
 
+import scala.concurrent.Future
 import scala.concurrent.duration._
 
 object UserEvents {
@@ -47,7 +47,8 @@ object UserEvents {
             runningTotal.copy(
               totalPurchases = runningTotal.totalPurchases + 1,
               amountSpent = runningTotal.amountSpent + (quantity * price)))
-        case GetRunningTotal(_, replyTo) =>
+        case GetRunningTotal(id, replyTo) =>
+          ctx.log.info("user {} running total queried", id)
           replyTo ! runningTotal
           Behaviors.same
       }
@@ -55,35 +56,30 @@ object UserEvents {
   }
 
   /**
-   * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for
-   * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
-   * (murmur2) with Akka Cluster Sharding.
+   * Asynchronously get the Akka Cluster Sharding [[ShardingMessageExtractor]]. Given a topic we can automatically
+   * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding.
    */
-  class UserIdMessageExtractor(strategy: PartitionCountStrategy)
-      extends KafkaShardingNoEnvelopeExtractor[Message](strategy) {
-    def entityId(message: Message): String = message.userId
-  }
-
-  def init(system: ActorSystem[_]): ActorRef[Message] = {
+  def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
-    val messageExtractor = new UserIdMessageExtractor(
-      strategy = RetrieveFromKafka(
-        system = system.toClassic,
-        timeout = 10.seconds,
-        groupId = processorConfig.groupId,
-        topic = processorConfig.topics.head,
-        settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
-          .withBootstrapServers(processorConfig.bootstrapServers)
-      )
+    KafkaClusterSharding.messageExtractorNoEnvelope(
+      system = system.toClassic,
+      timeout = 10.seconds,
+      groupId = processorConfig.groupId,
+      topic = processorConfig.topics.head,
+      entityIdExtractor = (msg: Message) => msg.userId,
+      settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
+        .withBootstrapServers(processorConfig.bootstrapServers)
     )
+  }
+
+  def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] =
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
         .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))
-  }
 
-  def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = {
-    init(system).narrow[UserQuery]
-  }
+  def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] =
+    init(system, messageExtractor).narrow[UserQuery]
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 511c3bc..155a51c 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -8,6 +8,7 @@ import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
+import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.kafka.ConsumerSettings
 import akka.kafka.Subscriptions
 import akka.kafka.scaladsl.Consumer
@@ -30,7 +31,7 @@ object UserEventsKafkaProcessor {
   sealed trait Command
   private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
 
-  def apply(): Behavior[Nothing] = {
+  def apply(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Behavior[Nothing] = {
     Behaviors
       .setup[Command] { ctx =>
         val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor"))
@@ -40,7 +41,7 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
-        val shardRegion = UserEvents.init(ctx.system)
+        val shardRegion = UserEvents.init(ctx.system, extractor)
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index adfaf67..f3acb7c 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -3,6 +3,7 @@ package sample.sharding.kafka
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
 import akka.actor.typed.scaladsl.AskPattern._
+import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.util.Timeout
 import sample.sharding.kafka.UserEvents.GetRunningTotal
 import sample.sharding.kafka.UserEvents.RunningTotal
@@ -10,13 +11,13 @@ import sample.sharding.kafka.UserEvents.RunningTotal
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
-class UserGrpcService(system: ActorSystem[_]) extends UserService {
+class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends UserService {
 
   implicit val timeout = Timeout(5.seconds)
   implicit val sched = system.scheduler
   implicit val ec = system.executionContext
 
-  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system)
+  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor)
 
   override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
     shardRegion
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
index 38d51c0..0c08ab1 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
@@ -1,13 +1,4 @@
 kafka-to-sharding-producer {
-  bootstrap-servers = "localhost:9094"
+  bootstrap-servers = "localhost:9092"
   topic = "user-events"
-
-  # can be one of:
-  # - default: to put the entity id in the key and use the default kafka partitioner
-  # - explicit: to specify the partition explicitly, based on the entity id
-  # if you have control of both the producer and consumer it is better to be explicit to make sure
-  # that both sides align
-  partitioning = "explicit"
-
-  nr-partitions = 128
 }
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index 846dee7..a20418c 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -1,7 +1,5 @@
 package sharding.kafka.producer
 
-import java.nio.charset.StandardCharsets
-
 import akka.Done
 import akka.actor.ActorSystem
 import akka.event.Logging
@@ -10,9 +8,7 @@ import akka.kafka.scaladsl.Producer
 import akka.stream.scaladsl.Source
 import com.typesafe.config.ConfigFactory
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import sample.sharding.kafka.serialization.user_events.UserPurchaseProto
 
 import scala.concurrent.Future
@@ -55,7 +51,6 @@ object UserEventProducer extends App {
         // rely on the default kafka partitioner to hash the key and distribute among shards
         // the logic of the default partitioner must be replicated in MessageExtractor entityId -> shardId function
         new ProducerRecord[String, Array[Byte]](producerConfig.topic, randomEntityId, message)
-
       })
       .runWith(Producer.plainSink(producerSettings))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 07/09: Create as extension

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 7aa68a292e0b20cb14584849e3f9ac149623239d
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Feb 26 13:40:02 2020 -0500

    Create as extension
---
 akka-sample-kafka-to-sharding-scala/README.md      |   6 +-
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 129 ++++++++++++++-------
 .../scala/sample/sharding/kafka/UserEvents.scala   |   3 +-
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |   2 +-
 4 files changed, 95 insertions(+), 45 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 20ccf8b..732db5b 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -46,8 +46,8 @@ sbt "kafka / run"
 In the Kafka server window you'll see the following when the server is ready:
 
 ```
-12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$   Kafka running on port '9092'
-12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$   Topic 'user-events' with '128' partitions created
+12:06:59.711 INFO  [run-main-0          ] s.s.embeddedkafka.KafkaBroker$        Kafka running: localhost:9092
+12:06:59.711 INFO  [run-main-0          ] s.s.embeddedkafka.KafkaBroker$        Topic 'user-events' with 128 partitions created
 ```
 
 If you want to use a different Kafka cluster then then update the `applications.conf`s in each project to point to your 
@@ -183,7 +183,7 @@ the correct node even if that moves due to a kafka rebalance.
 A gRPC client is included which can be started with...
 
 ```
-sbt "client/run"
+sbt "client / run"
 ```
 
 It assumes there is one of the nodes running its front end port on 8081. 
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 2fbaa7f..6c981d0 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -5,12 +5,13 @@ import java.util.concurrent.atomic.AtomicInteger
 import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.adapter._
 import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId}
 import akka.annotation.{ApiMayChange, InternalApi}
 import akka.cluster.sharding.external.ExternalShardAllocation
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
 import akka.cluster.typed.Cluster
+import akka.kafka
 import akka.kafka.scaladsl.MetadataClient
 import akka.util.Timeout._
 import org.apache.kafka.common.utils.Utils
@@ -20,10 +21,14 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
 import scala.util.{Failure, Success}
 
 /**
- * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka.
+ * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka.
  */
-object KafkaClusterSharding {
+class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension {
+  import KafkaClusterSharding._
+
   private val metadataActorCounter = new AtomicInteger
+  private var _messageExtractor: Option[KafkaShardingMessageExtractor[_]] = None
+  private var _messageExtractorNoEnvelope: Option[KafkaShardingNoEnvelopeExtractor[_]] = None
 
   /**
    * API MAY CHANGE
@@ -39,12 +44,33 @@ object KafkaClusterSharding {
    * that entities are routed to the same Entity type.
    */
   @ApiMayChange
-  def messageExtractor[M](system: ActorSystem,
-                          topic: String,
+  def messageExtractor[M](topic: String,
                           timeout: FiniteDuration,
-                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
-    getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
+                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = _messageExtractor match {
+  case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingMessageExtractor[M]])
+  case _ =>
+    getPartitionCount(topic, timeout, settings)
+      .map { kafkaPartitions =>
+        val extractor = new KafkaShardingMessageExtractor[M](kafkaPartitions)
+        _messageExtractor = Some(extractor)
+        extractor
+      }(system.dispatcher)
+  }
+
+  /**
+   * API MAY CHANGE
+   *
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
+   *
+   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
+  @ApiMayChange
+  def messageExtractor[M](kafkaPartitions: Int): Future[KafkaShardingMessageExtractor[M]] =
+    Future.successful(new KafkaShardingMessageExtractor[M](kafkaPartitions))
 
   /**
    * API MAY CHANGE
@@ -61,19 +87,39 @@ object KafkaClusterSharding {
    * that entities are routed to the same Entity type.
    */
   @ApiMayChange
-  def messageExtractorNoEnvelope[M](system: ActorSystem,
-                                    topic: String,
+  def messageExtractorNoEnvelope[M](topic: String,
                                     timeout: FiniteDuration,
                                     entityIdExtractor: M => String,
-                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
-    getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher)
+                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =  _messageExtractorNoEnvelope match {
+    case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingNoEnvelopeExtractor[M]])
+    case _ =>
+      getPartitionCount(topic, timeout, settings)
+        .map { kafkaPartitions =>
+          val extractor = new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)
+          _messageExtractorNoEnvelope = Some(extractor)
+          extractor
+        }(system.dispatcher)
+  }
 
-  private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
+  /**
+   * API MAY CHANGE
+   *
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
+   *
+   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
+  @ApiMayChange
+  def messageExtractorNoEnvelope[M](kafkaPartitions: Int, entityIdExtractor: M => String): Future[KafkaShardingNoEnvelopeExtractor[M]] =
+    Future.successful(new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))
+
+  private def getPartitionCount[M](topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
     implicit val ec: ExecutionContextExecutor = system.dispatcher
     val actorNum = metadataActorCounter.getAndIncrement()
     val consumerActor = system
-      .asInstanceOf[ExtendedActorSystem]
       .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
     val metadataClient = MetadataClient.create(consumerActor, timeout)
     val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
@@ -83,30 +129,6 @@ object KafkaClusterSharding {
     }
   }
 
-  @InternalApi
-  sealed trait KafkaClusterSharding {
-    def kafkaPartitions: Int
-    def shardId(entityId: String): String = {
-      // simplified version of Kafka's `DefaultPartitioner` implementation
-      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-      partition.toString
-    }
-  }
-
-  @InternalApi
-  final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
-    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
-    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-  }
-
-  @InternalApi
-  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
-    extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
-    override def entityId(message: M): String = entityIdExtractor(message)
-    override def unwrapMessage(message: M): M = message
-  }
-
   // TODO: will require `akka-actors-typed` as a provided dep
   /**
    * API MAY CHANGE
@@ -158,4 +180,33 @@ object KafkaClusterSharding {
       .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener")
       .toClassic
   }
+}
+
+object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {
+  @InternalApi
+  sealed trait KafkaClusterShardingContract {
+    def kafkaPartitions: Int
+    def shardId(entityId: String): String = {
+      // simplified version of Kafka's `DefaultPartitioner` implementation
+      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+      partition.toString
+    }
+  }
+
+  @InternalApi
+  final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
+    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterShardingContract {
+    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+  }
+
+  @InternalApi
+  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
+    extends ShardingMessageExtractor[M, M] with KafkaClusterShardingContract {
+    override def entityId(message: M): String = entityIdExtractor(message)
+    override def unwrapMessage(message: M): M = message
+  }
+
+  override def createExtension(system: ExtendedActorSystem): kafka.KafkaClusterSharding =
+    new KafkaClusterSharding(system)
 }
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 1cfa62d..763bc49 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -58,8 +58,7 @@ object UserEvents {
    */
   def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
-    KafkaClusterSharding.messageExtractorNoEnvelope(
-      system = system.toClassic,
+    KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope(
       timeout = 10.seconds,
       topic = processorConfig.topics.head,
       entityIdExtractor = (msg: Message) => msg.userId,
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 43ad4bb..6c8d4c0 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -40,7 +40,7 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
-        val rebalanceListener = KafkaClusterSharding.rebalanceListener(classic, typeKey)
+        val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(classic, typeKey)
         val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
         val consumerSettings =
           ConsumerSettings(classic, new StringDeserializer, new ByteArrayDeserializer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 09/09: Reference alpakka-kafka-cluster-sharding module

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 1f23cf1da016827152ca9dbc1d37c187615d5ad7
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Tue Mar 3 18:19:12 2020 -0500

    Reference alpakka-kafka-cluster-sharding module
---
 akka-sample-kafka-to-sharding-scala/build.sbt | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 1a3c8a7..0c28f5b 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,5 +1,5 @@
 val AkkaVersion = "2.6.3"
-val AlpakkaKafkaVersion = "2.0.2+2-88deb905+20200226-1650"
+val AlpakkaKafkaVersion = "2.0.2+7-af96d529+20200303-1814"
 val AkkaManagementVersion = "1.0.5"
 val AkkaHttpVersion = "10.1.11"
 val KafkaVersion = "2.4.0"
@@ -45,6 +45,7 @@ lazy val processor = project
   .settings(javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test")
   .settings(libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion,
+      "com.typesafe.akka" %% "akka-stream-kafka-cluster-sharding" % AlpakkaKafkaVersion,
       "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
       "com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
       "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org