You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:49 UTC
[incubator-pekko-samples] 04/09: RebalanceListener
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit f53f3b0a0adbc72aec268fc456f741676afa45f9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Tue Feb 18 17:15:27 2020 -0500
RebalanceListener
---
.../scala/akka/kafka/KafkaClusterSharding.scala | 131 ++++++++++++++++-----
.../main/scala/sample/sharding/kafka/Main.scala | 12 +-
.../sample/sharding/kafka/TopicListener.scala | 54 ---------
.../scala/sample/sharding/kafka/UserEvents.scala | 23 ++--
.../sharding/kafka/UserEventsKafkaProcessor.scala | 11 +-
.../sample/sharding/kafka/UserGrpcService.scala | 4 +-
6 files changed, 127 insertions(+), 108 deletions(-)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 5562c42..5bbd885 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -2,34 +2,64 @@ package akka.kafka
import java.util.concurrent.atomic.AtomicInteger
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.external.ExternalShardAllocation
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.cluster.typed.Cluster
import akka.kafka.scaladsl.MetadataClient
import akka.util.Timeout._
import org.apache.kafka.common.utils.Utils
-import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success}
+/**
+ * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka.
+ */
object KafkaClusterSharding {
private val metadataActorCounter = new AtomicInteger
+ /**
+ * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+ * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+ *
+ * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
+ * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
+ * the Kafka Consumer connection required to retrieve the number of partitions.
+ *
+ * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+ * that entities are routed to the same Entity type.
+ */
def messageExtractor[M](system: ActorSystem,
- groupId: String,
topic: String,
timeout: FiniteDuration,
settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
getPartitionCount(system, topic, timeout, settings)
- .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher)
+ .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
+ /**
+ * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+ * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+ *
+ * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
+ * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
+ * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
+ * a field from the Entity to use as the entity id for the hashing strategy.
+ *
+ * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+ * that entities are routed to the same Entity type.
+ */
def messageExtractorNoEnvelope[M](system: ActorSystem,
- groupId: String,
topic: String,
timeout: FiniteDuration,
entityIdExtractor: M => String,
settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(system, topic, timeout, settings)
- .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher)
+ .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher)
private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
implicit val ec: ExecutionContextExecutor = system.dispatcher
@@ -44,35 +74,72 @@ object KafkaClusterSharding {
count
}
}
-}
-trait KafkaClusterSharding {
- def groupId: String
- def kafkaPartitions: Int
+ sealed trait KafkaClusterSharding {
+ def kafkaPartitions: Int
+ def shardId(entityId: String): String = {
+ // simplified version of Kafka's `DefaultPartitioner` implementation
+ val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+ partition.toString
+ }
+ }
- def shardId(entityId: String): String = {
- // simplified version of Kafka's `DefaultPartitioner` implementation
- val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
- s"$groupId-$partition"
+ final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int)
+ extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
+ override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+ override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
}
-}
-class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int)
- extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
- override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
- override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-}
+ final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, entityIdExtractor: M => String)
+ extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
+ override def entityId(message: M): String = entityIdExtractor(message)
+ override def unwrapMessage(message: M): M = message
+ }
-/**
- * Caveats
- * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
- * - Values are passed as `null` to the partitioner.
- * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
- * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
- * partitioner doesn't make use of any other Kafka Cluster metadata.
- */
-class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String)
- extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
- override def entityId(message: M): String = entityIdExtractor(message)
- override def unwrapMessage(message: M): M = message
-}
+ // TODO:
+ // - will require `akka-actors-typed` as another provided dep, or should we just return a classic actor?
+ // - returning a typed actor is more flexible for the user so that they can easy create it under the `user` guardian
+ // when running akka typed. an alternative would be to create the actor ourself as a system actor, like is done with
+ // the KafkaConsumerActor for the metadata client.
+ /**
+ * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events created by the Kafka consumer group rebalance
+ * listener. As partitions are assigned to this consumer group member we update the External Sharding strategy
+ * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+ */
+ object RebalanceListener {
+ def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
+ Behaviors.setup { ctx =>
+ val typeKeyName = typeKey.name
+ val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName)
+ val address = Cluster(ctx.system).selfMember.address
+ Behaviors.receive[ConsumerRebalanceEvent] {
+ case (ctx, TopicPartitionsAssigned(_, partitions)) =>
+ import ctx.executionContext
+ val partitionsList = partitions.mkString(",")
+ ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]",
+ typeKeyName, address, partitionsList)
+ val updates = partitions.map { tp =>
+ val shardId = tp.partition().toString
+ // Kafka partition number becomes the akka shard id
+ // TODO: support assigning more than 1 shard id at once?
+ shardAllocationClient.updateShardLocation(shardId, address)
+ }
+ // TODO: pipeToSelf since we're closing over local state?
+ Future
+ .sequence(updates)
+ // each Future returns successfully once a majority of cluster nodes receive the update.
+ // there's no point blocking here because the rebalance listener is triggered asynchronously. if we want
+ // to block rebalances then we should provide an implementing using the `PartitionAssignmentHandler` instead
+ .onComplete {
+ case Success(_) =>
+ ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]",
+ typeKeyName, address, partitionsList)
+ case Failure(ex) =>
+ ctx.log.error("A failure occurred while updating cluster shards", ex)
+ }
+ Behaviors.same
+ case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index a695853..53e9d4d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -18,7 +18,7 @@ import scala.util.{Failure, Success}
sealed trait Command
case object NodeMemberUp extends Command
case object StartProcessor extends Command
-final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
+final case class MessageExtractor(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
object Main {
def main(args: Array[String]): Unit = {
@@ -57,12 +57,16 @@ object Main {
ctx.self.tell(StartProcessor)
starting(Some(extractor))
case (ctx, StartProcessor) if extractor.isDefined =>
- UserEvents.init(ctx.system, extractor.get)
- val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor")
+ val messageExtractor = extractor.get
+ val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor"))
+ UserEvents.init(ctx.system, messageExtractor, processorSettings.groupId)
+ val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(messageExtractor), "kafka-event-processor")
ctx.watch(eventProcessor)
ctx.log.info("Processor started.")
- val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get)
+ val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, messageExtractor)
running(binding, eventProcessor)
+ case (ctx, StartProcessor) =>
+ Behaviors.same
}
def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
deleted file mode 100644
index 3b59790..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package sample.sharding.kafka
-
-import akka.actor.typed.Behavior
-import akka.actor.typed.scaladsl.Behaviors
-import akka.cluster.sharding.external._
-import akka.cluster.typed.Cluster
-import akka.kafka.ConsumerRebalanceEvent
-import akka.kafka.TopicPartitionsAssigned
-import akka.kafka.TopicPartitionsRevoked
-import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
-
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-
-object TopicListener {
- def apply(groupId: String, typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] =
- Behaviors.setup { ctx =>
- import ctx.executionContext
- val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name)
- ctx.system.scheduler.scheduleAtFixedRate(10.seconds, 20.seconds) { () =>
- shardAllocationClient.shardLocations().onComplete {
- case Success(shardLocations) =>
- ctx.log.info("Current shard locations {}", shardLocations.locations)
- case Failure(t) =>
- ctx.log.error("failed to get shard locations", t)
- }
- }
- val address = Cluster(ctx.system).selfMember.address
- Behaviors.receiveMessage[ConsumerRebalanceEvent] {
- case TopicPartitionsAssigned(sub, partitions) =>
- // TODO
- // - log all partitions assigned in one log line
- // - block for shard allocation to complete, add configurable timeout
- partitions.foreach(tp => {
- val shardId = s"$groupId-${tp.partition()}"
- ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
- // kafka partition becomes the akka shard
- val done = shardAllocationClient.updateShardLocation(shardId, address)
- done.onComplete { result =>
- ctx.log.info("Result for updating shard {}: {}", shardId, result)
- }
-
- })
- Behaviors.same
- case TopicPartitionsRevoked(_, topicPartitions) =>
- ctx.log.info(
- "Partitions [{}] of group [{}] revoked from current node. New location will update shard allocation",
- topicPartitions.mkString(","),
- groupId)
- Behaviors.same
- }
- }
-}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 69d815d..1cfa62d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -7,17 +7,13 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor}
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
-import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor}
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
import scala.concurrent.duration._
object UserEvents {
-
- val TypeKey: EntityTypeKey[UserEvents.Message] =
- EntityTypeKey[UserEvents.Message]("user-processing")
-
sealed trait Message extends CborSerializable {
def userId: String
}
@@ -60,12 +56,11 @@ object UserEvents {
* retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka
* [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding.
*/
- def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = {
+ def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
KafkaClusterSharding.messageExtractorNoEnvelope(
system = system.toClassic,
timeout = 10.seconds,
- groupId = processorConfig.groupId,
topic = processorConfig.topics.head,
entityIdExtractor = (msg: Message) => msg.userId,
settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
@@ -73,13 +68,17 @@ object UserEvents {
)
}
- def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] =
+ def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[Message] = {
+ // create an Akka Cluster Sharding `EntityTypeKey` for `UserEvents` for this Kafka consumer group
+ val typeKey = EntityTypeKey[UserEvents.Message](groupId)
ClusterSharding(system).init(
- Entity(TypeKey)(createBehavior = _ => UserEvents())
- .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
+ Entity(typeKey)(createBehavior = _ => UserEvents())
+ // NOTE: why does `ExternalShardAllocationStrategy` not accept the type key type itself?
+ .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
.withMessageExtractor(messageExtractor)
.withSettings(ClusterShardingSettings(system)))
+ }
- def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] =
- init(system, messageExtractor).narrow[UserQuery]
+ def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[UserQuery] =
+ init(system, messageExtractor, groupId).narrow[UserQuery]
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 155a51c..941f177 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -9,8 +9,8 @@ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.sharding.typed.ShardingMessageExtractor
-import akka.kafka.ConsumerSettings
-import akka.kafka.Subscriptions
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
@@ -40,8 +40,9 @@ object UserEventsKafkaProcessor {
implicit val scheduler: Scheduler = classic.scheduler
// TODO config
val timeout = Timeout(3.seconds)
- val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
- val shardRegion = UserEvents.init(ctx.system, extractor)
+ val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
+ val rebalanceListener = ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), "kafka-cluster-sharding-rebalance-listener")
+ val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
val consumerSettings =
ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(processorSettings.bootstrapServers)
@@ -51,7 +52,7 @@ object UserEventsKafkaProcessor {
val subscription = Subscriptions
.topics(processorSettings.topics: _*)
- .withRebalanceListener(rebalancerRef.toClassic)
+ .withRebalanceListener(rebalanceListener.toClassic)
val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
Consumer.plainSource(consumerSettings, subscription)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index f3acb7c..3a3e808 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -5,6 +5,7 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.AskPattern._
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
import sample.sharding.kafka.UserEvents.GetRunningTotal
import sample.sharding.kafka.UserEvents.RunningTotal
@@ -17,7 +18,8 @@ class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtracto
implicit val sched = system.scheduler
implicit val ec = system.executionContext
- private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor)
+ private val processorSettings = ProcessorConfig(ConfigFactory.load().getConfig("kafka-to-sharding-processor"))
+ private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor, processorSettings.groupId)
override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
shardRegion
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org