You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:47 UTC
[incubator-pekko-samples] 02/09: WIP- use systemActorOf to create metadata client
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 5c118c69be77f2ceac59f4c04c4e561eaa59738d
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Fri Feb 14 13:14:22 2020 -0500
WIP- use systemActorOf to create metadata client
---
.../akka/kafka/KafkaShardingMessageExtractor.scala | 78 ++++++++++++----------
.../scala/sample/sharding/kafka/UserEvents.scala | 29 ++++----
2 files changed, 60 insertions(+), 47 deletions(-)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
index f02eb41..14353ab 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
@@ -1,52 +1,61 @@
package akka.kafka
+import java.util.concurrent.atomic.AtomicInteger
+
import akka.actor.{ActorSystem, ExtendedActorSystem}
import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy
import akka.kafka.scaladsl.MetadataClient
import akka.util.Timeout._
-import org.apache.kafka.clients.producer.Partitioner
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner
-import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster}
+import org.apache.kafka.common.utils.Utils
import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContextExecutor}
-import scala.jdk.CollectionConverters._
-
-private[kafka] trait DefaultKafkaShardingMessageExtractor {
- implicit val actorSystem: ActorSystem
- implicit val timeout: FiniteDuration
- implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher
+import scala.concurrent.{Await, ExecutionContext}
- val clientSettings: ConsumerSettings[_, _]
- val groupId: String
- val topic: String
+object DefaultKafkaShardingMessageExtractor {
+ sealed trait PartitionCountStrategy {
+ def groupId: String
+ def partitions: Int
+ }
+ final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy
+ final case class RetrieveFromKafka(
+ system: ActorSystem,
+ timeout: FiniteDuration,
+ groupId: String,
+ topic: String,
+ settings: ConsumerSettings[_,_])
+ extends PartitionCountStrategy {
+ import RetrieveFromKafka._
+ private implicit val ec: ExecutionContext = system.dispatcher
+ lazy val partitions: Int = {
+ val actorNum = metadataActorCounter.getAndIncrement()
+ val consumerActor = system
+ .asInstanceOf[ExtendedActorSystem]
+ .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
+ val metadataClient = MetadataClient.create(consumerActor, timeout)
+ val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
+ numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId))
+ Await.result(numPartitions, timeout)
+ }
+ }
+ object RetrieveFromKafka {
+ private val metadataActorCounter = new AtomicInteger
+ }
+}
- private val CLUSTER_ID = "cluster-id"
- private val kafkaPartitioner = partitioner()
- private val kafkaCluster = cluster(partitions())
+private[kafka] trait DefaultKafkaShardingMessageExtractor {
+ val strategy: PartitionCountStrategy
+ private val groupId: String = strategy.groupId
+ private val kafkaPartitions: Int = strategy.partitions
def shardId(entityId: String): String = {
- val partition = kafkaPartitioner
- .partition(topic, entityId, entityId.getBytes(), null, null, kafkaCluster)
+ // simplified version of Kafka's `DefaultPartitioner` implementation
+ val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
s"$groupId-$partition"
}
-
- def partitions(): List[PartitionInfo] = {
- val consumerActor = actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), "metadata-consumer-actor")
- val metadataClient = MetadataClient.create(consumerActor, timeout)
- val partitions = metadataClient.getPartitionsFor(topic)
- partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for topic %s for group %s", p.length, topic, groupId))
- Await.result(partitions, timeout)
- }
-
- def cluster(partitions: List[PartitionInfo]): KafkaCluster =
- new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava)
-
- def partitioner(): Partitioner = new DefaultPartitioner()
}
-final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
- (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy)
extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
@@ -60,8 +69,7 @@ final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSetting
* only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
* partitioner doesn't make use of any other Kafka Cluster metadata.
*/
-abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String)
- (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration)
+abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy)
extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
override def unwrapMessage(message: M): M = message
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f0f4f90..5cf3321 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -7,6 +7,7 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
+import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka}
import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
import org.apache.kafka.common.serialization.StringDeserializer
@@ -53,24 +54,28 @@ object UserEvents {
}
}
- /*
- * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying DefaultPartitioner so that the same murmur2
- * hashing algorithm is used for Kafka and Akka Cluster Sharding
+ /**
+ * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for
+ * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
+ * (murmur2) with Akka Cluster Sharding.
*/
- class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: String, groupId: String)
- (implicit actorSystem: akka.actor.ActorSystem, timeout: FiniteDuration)
- extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, groupId) {
+ class UserIdMessageExtractor(strategy: PartitionCountStrategy)
+ extends KafkaShardingNoEnvelopeExtractor[Message](strategy) {
def entityId(message: Message): String = message.userId
}
def init(system: ActorSystem[_]): ActorRef[Message] = {
val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
- implicit val classic: akka.actor.ActorSystem = system.toClassic
- implicit val timeout: FiniteDuration = 10.seconds
- val clientSettings = ConsumerSettings(classic, new StringDeserializer, new StringDeserializer)
- val topic = processorConfig.topics.head
- val groupId = processorConfig.groupId
- val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, groupId)
+ val messageExtractor = new UserIdMessageExtractor(
+ strategy = RetrieveFromKafka(
+ system = system.toClassic,
+ timeout = 10.seconds,
+ groupId = processorConfig.groupId,
+ topic = processorConfig.topics.head,
+ settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
+ .withBootstrapServers(processorConfig.bootstrapServers)
+ )
+ )
ClusterSharding(system).init(
Entity(TypeKey)(createBehavior = _ => UserEvents())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org