You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:47 UTC

[incubator-pekko-samples] 02/09: WIP- use systemActorOf to create metadata client

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 5c118c69be77f2ceac59f4c04c4e561eaa59738d
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Fri Feb 14 13:14:22 2020 -0500

    WIP- use systemActorOf to create metadata client
---
 .../akka/kafka/KafkaShardingMessageExtractor.scala | 78 ++++++++++++----------
 .../scala/sample/sharding/kafka/UserEvents.scala   | 29 ++++----
 2 files changed, 60 insertions(+), 47 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
index f02eb41..14353ab 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
@@ -1,52 +1,61 @@
 package akka.kafka
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import akka.actor.{ActorSystem, ExtendedActorSystem}
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy
 import akka.kafka.scaladsl.MetadataClient
 import akka.util.Timeout._
-import org.apache.kafka.clients.producer.Partitioner
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner
-import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster}
+import org.apache.kafka.common.utils.Utils
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContextExecutor}
-import scala.jdk.CollectionConverters._
-
-private[kafka] trait DefaultKafkaShardingMessageExtractor {
-  implicit val actorSystem: ActorSystem
-  implicit val timeout: FiniteDuration
-  implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+import scala.concurrent.{Await, ExecutionContext}
 
-  val clientSettings: ConsumerSettings[_, _]
-  val groupId: String
-  val topic: String
+object DefaultKafkaShardingMessageExtractor {
+  sealed trait PartitionCountStrategy {
+    def groupId: String
+    def partitions: Int
+  }
+  final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy
+  final case class RetrieveFromKafka(
+                                      system: ActorSystem,
+                                      timeout: FiniteDuration,
+                                      groupId: String,
+                                      topic: String,
+                                      settings: ConsumerSettings[_,_])
+                                    extends PartitionCountStrategy {
+    import RetrieveFromKafka._
+    private implicit val ec: ExecutionContext = system.dispatcher
+    lazy val partitions: Int = {
+      val actorNum = metadataActorCounter.getAndIncrement()
+      val consumerActor = system
+        .asInstanceOf[ExtendedActorSystem]
+        .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
+      val metadataClient = MetadataClient.create(consumerActor, timeout)
+      val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
+      numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId))
+      Await.result(numPartitions, timeout)
+    }
+  }
+  object RetrieveFromKafka {
+    private val metadataActorCounter = new AtomicInteger
+  }
+}
 
-  private val CLUSTER_ID = "cluster-id"
-  private val kafkaPartitioner = partitioner()
-  private val kafkaCluster = cluster(partitions())
+private[kafka] trait DefaultKafkaShardingMessageExtractor {
+  val strategy: PartitionCountStrategy
+  private val groupId: String = strategy.groupId
+  private val kafkaPartitions: Int = strategy.partitions
 
   def shardId(entityId: String): String = {
-    val partition = kafkaPartitioner
-      .partition(topic, entityId, entityId.getBytes(), null, null, kafkaCluster)
+    // simplified version of Kafka's `DefaultPartitioner` implementation
+    val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
     s"$groupId-$partition"
   }
-
-  def partitions(): List[PartitionInfo] = {
-    val consumerActor = actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), "metadata-consumer-actor")
-    val metadataClient = MetadataClient.create(consumerActor, timeout)
-    val partitions = metadataClient.getPartitionsFor(topic)
-    partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for topic %s for group %s", p.length, topic, groupId))
-    Await.result(partitions, timeout)
-  }
-
-  def cluster(partitions: List[PartitionInfo]): KafkaCluster =
-    new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava)
-
-  def partitioner(): Partitioner = new DefaultPartitioner()
 }
 
-final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
-                                            (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy)
   extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
   override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
   override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
@@ -60,8 +69,7 @@ final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSetting
  *   only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
  *   partitioner doesn't make use of any other Kafka Cluster metadata.
  */
-abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
-                                                  (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy)
   extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
   override def unwrapMessage(message: M): M = message
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f0f4f90..5cf3321 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -7,6 +7,7 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
 import akka.cluster.sharding.external.ExternalShardAllocationStrategy
 import akka.cluster.sharding.typed.ClusterShardingSettings
 import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka}
 import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
 import org.apache.kafka.common.serialization.StringDeserializer
 
@@ -53,24 +54,28 @@ object UserEvents {
     }
   }
 
-  /*
-   * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying DefaultPartitioner so that the same murmur2
-   * hashing algorithm is used for Kafka and Akka Cluster Sharding
+  /**
+   * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for
+   * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
+   * (murmur2) with Akka Cluster Sharding.
    */
-  class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: String, groupId: String)
-                              (implicit actorSystem: akka.actor.ActorSystem, timeout: FiniteDuration)
-      extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, groupId) {
+  class UserIdMessageExtractor(strategy: PartitionCountStrategy)
+      extends KafkaShardingNoEnvelopeExtractor[Message](strategy) {
     def entityId(message: Message): String = message.userId
   }
 
   def init(system: ActorSystem[_]): ActorRef[Message] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
-    implicit val classic: akka.actor.ActorSystem = system.toClassic
-    implicit val timeout: FiniteDuration = 10.seconds
-    val clientSettings = ConsumerSettings(classic, new StringDeserializer, new StringDeserializer)
-    val topic = processorConfig.topics.head
-    val groupId = processorConfig.groupId
-    val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, groupId)
+    val messageExtractor = new UserIdMessageExtractor(
+      strategy = RetrieveFromKafka(
+        system = system.toClassic,
+        timeout = 10.seconds,
+        groupId = processorConfig.groupId,
+        topic = processorConfig.topics.head,
+        settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
+          .withBootstrapServers(processorConfig.bootstrapServers)
+      )
+    )
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org