You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:52 UTC

[incubator-pekko-samples] 07/09: Create as extension

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 7aa68a292e0b20cb14584849e3f9ac149623239d
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Feb 26 13:40:02 2020 -0500

    Create as extension
---
 akka-sample-kafka-to-sharding-scala/README.md      |   6 +-
 .../scala/akka/kafka/KafkaClusterSharding.scala    | 129 ++++++++++++++-------
 .../scala/sample/sharding/kafka/UserEvents.scala   |   3 +-
 .../sharding/kafka/UserEventsKafkaProcessor.scala  |   2 +-
 4 files changed, 95 insertions(+), 45 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 20ccf8b..732db5b 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -46,8 +46,8 @@ sbt "kafka / run"
 In the Kafka server window you'll see the following when the server is ready:
 
 ```
-12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$   Kafka running on port '9092'
-12:46:47.022 INFO  [run-main-0          ] sample.sharding.embeddedkafka.Main$   Topic 'user-events' with '128' partitions created
+12:06:59.711 INFO  [run-main-0          ] s.s.embeddedkafka.KafkaBroker$        Kafka running: localhost:9092
+12:06:59.711 INFO  [run-main-0          ] s.s.embeddedkafka.KafkaBroker$        Topic 'user-events' with 128 partitions created
 ```
 
 If you want to use a different Kafka cluster then then update the `applications.conf`s in each project to point to your 
@@ -183,7 +183,7 @@ the correct node even if that moves due to a kafka rebalance.
 A gRPC client is included which can be started with...
 
 ```
-sbt "client/run"
+sbt "client / run"
 ```
 
 It assumes there is one of the nodes running its front end port on 8081. 
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 2fbaa7f..6c981d0 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -5,12 +5,13 @@ import java.util.concurrent.atomic.AtomicInteger
 import akka.actor.typed.Behavior
 import akka.actor.typed.scaladsl.adapter._
 import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId}
 import akka.annotation.{ApiMayChange, InternalApi}
 import akka.cluster.sharding.external.ExternalShardAllocation
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
 import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
 import akka.cluster.typed.Cluster
+import akka.kafka
 import akka.kafka.scaladsl.MetadataClient
 import akka.util.Timeout._
 import org.apache.kafka.common.utils.Utils
@@ -20,10 +21,14 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
 import scala.util.{Failure, Success}
 
 /**
- * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka.
+ * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka.
  */
-object KafkaClusterSharding {
+class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension {
+  import KafkaClusterSharding._
+
   private val metadataActorCounter = new AtomicInteger
+  private var _messageExtractor: Option[KafkaShardingMessageExtractor[_]] = None
+  private var _messageExtractorNoEnvelope: Option[KafkaShardingNoEnvelopeExtractor[_]] = None
 
   /**
    * API MAY CHANGE
@@ -39,12 +44,33 @@ object KafkaClusterSharding {
    * that entities are routed to the same Entity type.
    */
   @ApiMayChange
-  def messageExtractor[M](system: ActorSystem,
-                          topic: String,
+  def messageExtractor[M](topic: String,
                           timeout: FiniteDuration,
-                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
-    getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
+                          settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = _messageExtractor match {
+  case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingMessageExtractor[M]])
+  case _ =>
+    getPartitionCount(topic, timeout, settings)
+      .map { kafkaPartitions =>
+        val extractor = new KafkaShardingMessageExtractor[M](kafkaPartitions)
+        _messageExtractor = Some(extractor)
+        extractor
+      }(system.dispatcher)
+  }
+
+  /**
+   * API MAY CHANGE
+   *
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
+   *
+   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
+  @ApiMayChange
+  def messageExtractor[M](kafkaPartitions: Int): Future[KafkaShardingMessageExtractor[M]] =
+    Future.successful(new KafkaShardingMessageExtractor[M](kafkaPartitions))
 
   /**
    * API MAY CHANGE
@@ -61,19 +87,39 @@ object KafkaClusterSharding {
    * that entities are routed to the same Entity type.
    */
   @ApiMayChange
-  def messageExtractorNoEnvelope[M](system: ActorSystem,
-                                    topic: String,
+  def messageExtractorNoEnvelope[M](topic: String,
                                     timeout: FiniteDuration,
                                     entityIdExtractor: M => String,
-                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
-    getPartitionCount(system, topic, timeout, settings)
-      .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher)
+                                    settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =  _messageExtractorNoEnvelope match {
+    case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingNoEnvelopeExtractor[M]])
+    case _ =>
+      getPartitionCount(topic, timeout, settings)
+        .map { kafkaPartitions =>
+          val extractor = new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)
+          _messageExtractorNoEnvelope = Some(extractor)
+          extractor
+        }(system.dispatcher)
+  }
 
-  private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
+  /**
+   * API MAY CHANGE
+   *
+   * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+   * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+   *
+   * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
+   *
+   * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+   * that entities are routed to the same Entity type.
+   */
+  @ApiMayChange
+  def messageExtractorNoEnvelope[M](kafkaPartitions: Int, entityIdExtractor: M => String): Future[KafkaShardingNoEnvelopeExtractor[M]] =
+    Future.successful(new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))
+
+  private def getPartitionCount[M](topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
     implicit val ec: ExecutionContextExecutor = system.dispatcher
     val actorNum = metadataActorCounter.getAndIncrement()
     val consumerActor = system
-      .asInstanceOf[ExtendedActorSystem]
       .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
     val metadataClient = MetadataClient.create(consumerActor, timeout)
     val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
@@ -83,30 +129,6 @@ object KafkaClusterSharding {
     }
   }
 
-  @InternalApi
-  sealed trait KafkaClusterSharding {
-    def kafkaPartitions: Int
-    def shardId(entityId: String): String = {
-      // simplified version of Kafka's `DefaultPartitioner` implementation
-      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
-      partition.toString
-    }
-  }
-
-  @InternalApi
-  final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
-    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
-    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
-    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-  }
-
-  @InternalApi
-  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
-    extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
-    override def entityId(message: M): String = entityIdExtractor(message)
-    override def unwrapMessage(message: M): M = message
-  }
-
   // TODO: will require `akka-actors-typed` as a provided dep
   /**
    * API MAY CHANGE
@@ -158,4 +180,33 @@ object KafkaClusterSharding {
       .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener")
       .toClassic
   }
+}
+
+object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {
+  @InternalApi
+  sealed trait KafkaClusterShardingContract {
+    def kafkaPartitions: Int
+    def shardId(entityId: String): String = {
+      // simplified version of Kafka's `DefaultPartitioner` implementation
+      val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+      partition.toString
+    }
+  }
+
+  @InternalApi
+  final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
+    extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterShardingContract {
+    override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+    override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+  }
+
+  @InternalApi
+  final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
+    extends ShardingMessageExtractor[M, M] with KafkaClusterShardingContract {
+    override def entityId(message: M): String = entityIdExtractor(message)
+    override def unwrapMessage(message: M): M = message
+  }
+
+  override def createExtension(system: ExtendedActorSystem): kafka.KafkaClusterSharding =
+    new KafkaClusterSharding(system)
 }
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 1cfa62d..763bc49 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -58,8 +58,7 @@ object UserEvents {
    */
   def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
-    KafkaClusterSharding.messageExtractorNoEnvelope(
-      system = system.toClassic,
+    KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope(
       timeout = 10.seconds,
       topic = processorConfig.topics.head,
       entityIdExtractor = (msg: Message) => msg.userId,
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 43ad4bb..6c8d4c0 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -40,7 +40,7 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
-        val rebalanceListener = KafkaClusterSharding.rebalanceListener(classic, typeKey)
+        val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(classic, typeKey)
         val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
         val consumerSettings =
           ConsumerSettings(classic, new StringDeserializer, new ByteArrayDeserializer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org