You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:49 UTC

[incubator-pekko-samples] 04/09: RebalanceListener

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit f53f3b0a0adbc72aec268fc456f741676afa45f9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Tue Feb 18 17:15:27 2020 -0500

    RebalanceListener
---
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 131 ++++++++++++++++-----
 .../main/scala/sample/sharding/kafka/Main.scala    |  12 +-
 .../sample/sharding/kafka/TopicListener.scala      |  54 ---------
 .../scala/sample/sharding/kafka/UserEvents.scala   |  23 ++--
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |  11 +-
 .../sample/sharding/kafka/UserGrpcService.scala    |   4 +-
 6 files changed, 127 insertions(+), 108 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 5562c42..5bbd885 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -2,34 +2,64 @@ package akka.kafka
 
 import java.util.concurrent.atomic.AtomicInteger
 
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.external.ExternalShardAllocation
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.cluster.typed.Cluster
 import akka.kafka.scaladsl.MetadataClient
 import akka.util.Timeout._
 import org.apache.kafka.common.utils.Utils
 
-import scala.concurrent.{ExecutionContextExecutor, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success}
 
+/**
+ * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka.
+ */
 object KafkaClusterSharding {
   private val metadataActorCounter = new AtomicInteger
 
+  /**
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
+   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
+   * the Kafka Consumer connection required to retrieve the number of partitions.
+   *
+   * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
   def messageExtractor[M](system: ActorSystem,
-                          groupId: String,
                           topic: String,
                           timeout: FiniteDuration,
                           settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
     getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher)
+      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
 
+  /**
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
+   * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
+   * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
+   * a field from the Entity to use as the entity id for the hashing strategy.
+   *
+   * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
   def messageExtractorNoEnvelope[M](system: ActorSystem,
-                                    groupId: String,
                                     topic: String,
                                     timeout: FiniteDuration,
                                     entityIdExtractor: M => String,
                                     settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
     getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher)
+      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher)
 
   private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
     implicit val ec: ExecutionContextExecutor = system.dispatcher
@@ -44,35 +74,72 @@ object KafkaClusterSharding {
       count
     }
   }
-}
 
-trait KafkaClusterSharding {
-  def groupId: String
-  def kafkaPartitions: Int
+  sealed trait KafkaClusterSharding {
+    def kafkaPartitions: Int
+    def shardId(entityId: String): String = {
+      // simplified version of Kafka's `DefaultPartitioner` implementation
+      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+      partition.toString
+    }
+  }
 
-  def shardId(entityId: String): String = {
-    // simplified version of Kafka's `DefaultPartitioner` implementation
-    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-    s"$groupId-$partition"
+  final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int)
+    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
+    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
   }
-}
 
-class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int)
-  extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
-  override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-  override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-}
+  final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, entityIdExtractor: M => String)
+    extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
+    override def entityId(message: M): String = entityIdExtractor(message)
+    override def unwrapMessage(message: M): M = message
+  }
 
-/**
- * Caveats
- * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
- * - Values are passed as `null` to the partitioner.
- * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
- *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
- *   partitioner doesn't make use of any other Kafka Cluster metadata.
- */
-class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String)
-  extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
-  override def entityId(message: M): String = entityIdExtractor(message)
-  override def unwrapMessage(message: M): M = message
-}
+  // TODO:
+  // - will require `akka-actors-typed` as another provided dep, or should we just return a classic actor?
+  // - returning a typed actor is more flexible for the user so that they can easy create it under the `user` guardian
+  //   when running akka typed. an alternative would be to create the actor ourself as a system actor, like is done with
+  //   the KafkaConsumerActor for the metadata client.
+  /**
+   * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events created by the Kafka consumer group rebalance
+   * listener. As partitions are assigned to this consumer group member we update the External Sharding strategy
+   * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+   */
+  object RebalanceListener {
+    def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
+      Behaviors.setup { ctx =>
+        val typeKeyName = typeKey.name
+        val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
+        val address = Cluster(ctx.system).selfMember.address
+        Behaviors.receive[ConsumerRebalanceEvent] {
+          case (ctx, TopicPartitionsAssigned(_, partitions)) =>
+            import ctx.executionContext
+            val partitionsList = partitions.mkString(",")
+            ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]",
+              typeKeyName, address, partitionsList)
+            val updates = partitions.map { tp =>
+              val shardId = tp.partition().toString
+              // Kafka partition number becomes the akka shard id
+              // TODO: support assigning more than 1 shard id at once?
+              shardAllocationClient.updateShardLocation(shardId, address)
+            }
+            // TODO: pipeToSelf since we're closing over local state?
+            Future
+              .sequence(updates)
+              // each Future returns successfully once a majority of cluster nodes receive the update.
+              // there's no point blocking here because the rebalance listener is triggered asynchronously.  if we want
+              // to block rebalances then we should provide an implementing using the `PartitionAssignmentHandler` instead
+              .onComplete {
+                case Success(_) =>
+                  ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
+                    typeKeyName, address, partitionsList)
+                case Failure(ex) =>
+                  ctx.log.error("A failure occurred while updating cluster shards", ex)
+              }
+            Behaviors.same
+          case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
+        }
+      }
+  }
+}
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index a695853..53e9d4d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -18,7 +18,7 @@ import scala.util.{Failure, Success}
 sealed trait Command
 case object NodeMemberUp extends Command
 case object StartProcessor extends Command
-final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
+final case class MessageExtractor(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
 
 object Main {
   def main(args: Array[String]): Unit = {
@@ -57,12 +57,16 @@ object Main {
           ctx.self.tell(StartProcessor)
           starting(Some(extractor))
         case (ctx, StartProcessor) if extractor.isDefined =>
-          UserEvents.init(ctx.system, extractor.get)
-          val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor")
+          val messageExtractor = extractor.get
+          val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor"))
+          UserEvents.init(ctx.system, messageExtractor, processorSettings.groupId)
+          val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(messageExtractor), "kafka-event-processor")
           ctx.watch(eventProcessor)
           ctx.log.info("Processor started.")
-          val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get)
+          val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, messageExtractor)
           running(binding, eventProcessor)
+        case (ctx, StartProcessor) =>
+          Behaviors.same
       }
 
     def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
deleted file mode 100644
index 3b59790..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package sample.sharding.kafka
-
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.external._
-import akka.cluster.typed.Cluster
-import akka.kafka.ConsumerRebalanceEvent
-import akka.kafka.TopicPartitionsAssigned
-import akka.kafka.TopicPartitionsRevoked
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-
-object TopicListener {
-  def apply(groupId: String, typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
-    Behaviors.setup { ctx =>
-      import ctx.executionContext
-      val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
-      ctx.system.scheduler.scheduleAtFixedRate(10.seconds, 20.seconds) { () =>
-        shardAllocationClient.shardLocations().onComplete {
-          case Success(shardLocations) =>
-            ctx.log.info("Current shard locations {}", shardLocations.locations)
-          case Failure(t) =>
-            ctx.log.error("failed to get shard locations", t)
-        }
-      }
-      val address = Cluster(ctx.system).selfMember.address
-      Behaviors.receiveMessage[ConsumerRebalanceEvent] {
-        case TopicPartitionsAssigned(sub, partitions) =>
-          // TODO
-          // - log all partitions assigned in one log line
-          // - block for shard allocation to complete, add configurable timeout
-          partitions.foreach(tp => {
-            val shardId = s"$groupId-${tp.partition()}"
-            ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
-            // kafka partition becomes the akka shard
-            val done = shardAllocationClient.updateShardLocation(shardId, address)
-            done.onComplete { result =>
-              ctx.log.info("Result for updating shard {}: {}", shardId, result)
-            }
-
-          })
-          Behaviors.same
-        case TopicPartitionsRevoked(_, topicPartitions) =>
-          ctx.log.info(
-            "Partitions [{}] of group [{}] revoked from current node. New location will update shard allocation",
-            topicPartitions.mkString(","),
-            groupId)
-          Behaviors.same
-      }
-    }
-}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 69d815d..1cfa62d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -7,17 +7,13 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
 import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor}
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
-import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor}
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding}
 import org.apache.kafka.common.serialization.StringDeserializer
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
 object UserEvents {
-
-  val TypeKey: EntityTypeKey[UserEvents.Message] =
-    EntityTypeKey[UserEvents.Message]("user-processing")
-
   sealed trait Message extends CborSerializable {
     def userId: String
   }
@@ -60,12 +56,11 @@ object UserEvents {
    * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka
    * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding.
    */
-  def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = {
+  def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
     KafkaClusterSharding.messageExtractorNoEnvelope(
       system = system.toClassic,
       timeout = 10.seconds,
-      groupId = processorConfig.groupId,
       topic = processorConfig.topics.head,
       entityIdExtractor = (msg: Message) => msg.userId,
       settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
@@ -73,13 +68,17 @@ object UserEvents {
     )
   }
 
-  def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] =
+  def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[Message] = {
+    // create an Akka Cluster Sharding `EntityTypeKey` for `UserEvents` for this Kafka consumer group
+    val typeKey = EntityTypeKey[UserEvents.Message](groupId)
     ClusterSharding(system).init(
-      Entity(TypeKey)(createBehavior = _ => UserEvents())
-        .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
+      Entity(typeKey)(createBehavior = _ => UserEvents())
+        // NOTE: why does `ExternalShardAllocationStrategy` not accept the type key type itself?
+        .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
         .withMessageExtractor(messageExtractor)
         .withSettings(ClusterShardingSettings(system)))
+  }
 
-  def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] =
-    init(system, messageExtractor).narrow[UserQuery]
+  def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[UserQuery] =
+    init(system, messageExtractor, groupId).narrow[UserQuery]
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 155a51c..941f177 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -9,8 +9,8 @@ import akka.actor.typed.scaladsl.AskPattern._
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.cluster.sharding.typed.ShardingMessageExtractor
-import akka.kafka.ConsumerSettings
-import akka.kafka.Subscriptions
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions}
 import akka.kafka.scaladsl.Consumer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
@@ -40,8 +40,9 @@ object UserEventsKafkaProcessor {
         implicit val scheduler: Scheduler = classic.scheduler
         // TODO config
         val timeout = Timeout(3.seconds)
-        val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
-        val shardRegion = UserEvents.init(ctx.system, extractor)
+        val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
+        val rebalanceListener = ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), "kafka-cluster-sharding-rebalance-listener")
+        val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
@@ -51,7 +52,7 @@ object UserEventsKafkaProcessor {
 
         val subscription = Subscriptions
           .topics(processorSettings.topics: _*)
-          .withRebalanceListener(rebalancerRef.toClassic)
+          .withRebalanceListener(rebalanceListener.toClassic)
 
         val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
           Consumer.plainSource(consumerSettings, subscription)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index f3acb7c..3a3e808 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -5,6 +5,7 @@ import akka.actor.typed.ActorSystem
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.cluster.sharding.typed.ShardingMessageExtractor
 import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
 import sample.sharding.kafka.UserEvents.GetRunningTotal
 import sample.sharding.kafka.UserEvents.RunningTotal
 
@@ -17,7 +18,8 @@ class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtracto
   implicit val sched = system.scheduler
   implicit val ec = system.executionContext
 
-  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor)
+  private val processorSettings = ProcessorConfig(ConfigFactory.load().getConfig("kafka-to-sharding-processor"))
+  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor, processorSettings.groupId)
 
   override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
     shardRegion


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org