You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:52 UTC
[incubator-pekko-samples] 07/09: Create as extension
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 7aa68a292e0b20cb14584849e3f9ac149623239d
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Feb 26 13:40:02 2020 -0500
Create as extension
---
akka-sample-kafka-to-sharding-scala/README.md | 6 +-
.../scala/akka/kafka/KafkaClusterSharding.scala | 129 ++++++++++++++-------
.../scala/sample/sharding/kafka/UserEvents.scala | 3 +-
.../sharding/kafka/UserEventsKafkaProcessor.scala | 2 +-
4 files changed, 95 insertions(+), 45 deletions(-)
diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 20ccf8b..732db5b 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -46,8 +46,8 @@ sbt "kafka / run"
In the Kafka server window you'll see the following when the server is ready:
```
-12:46:47.022 INFO [run-main-0 ] sample.sharding.embeddedkafka.Main$ Kafka running on port '9092'
-12:46:47.022 INFO [run-main-0 ] sample.sharding.embeddedkafka.Main$ Topic 'user-events' with '128' partitions created
+12:06:59.711 INFO [run-main-0 ] s.s.embeddedkafka.KafkaBroker$ Kafka running: localhost:9092
+12:06:59.711 INFO [run-main-0 ] s.s.embeddedkafka.KafkaBroker$ Topic 'user-events' with 128 partitions created
```
If you want to use a different Kafka cluster then then update the `applications.conf`s in each project to point to your
@@ -183,7 +183,7 @@ the correct node even if that moves due to a kafka rebalance.
A gRPC client is included which can be started with...
```
-sbt "client/run"
+sbt "client / run"
```
It assumes there is one of the nodes running its front end port on 8081.
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
index 2fbaa7f..6c981d0 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -5,12 +5,13 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
-import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId}
import akka.annotation.{ApiMayChange, InternalApi}
import akka.cluster.sharding.external.ExternalShardAllocation
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
import akka.cluster.typed.Cluster
+import akka.kafka
import akka.kafka.scaladsl.MetadataClient
import akka.util.Timeout._
import org.apache.kafka.common.utils.Utils
@@ -20,10 +21,14 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
/**
- * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka.
+ * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka.
*/
-object KafkaClusterSharding {
+class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension {
+ import KafkaClusterSharding._
+
private val metadataActorCounter = new AtomicInteger
+ private var _messageExtractor: Option[KafkaShardingMessageExtractor[_]] = None
+ private var _messageExtractorNoEnvelope: Option[KafkaShardingNoEnvelopeExtractor[_]] = None
/**
* API MAY CHANGE
@@ -39,12 +44,33 @@ object KafkaClusterSharding {
* that entities are routed to the same Entity type.
*/
@ApiMayChange
- def messageExtractor[M](system: ActorSystem,
- topic: String,
+ def messageExtractor[M](topic: String,
timeout: FiniteDuration,
- settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
- getPartitionCount(system, topic, timeout, settings)
- .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher)
+ settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = _messageExtractor match {
+ case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingMessageExtractor[M]])
+ case _ =>
+ getPartitionCount(topic, timeout, settings)
+ .map { kafkaPartitions =>
+ val extractor = new KafkaShardingMessageExtractor[M](kafkaPartitions)
+ _messageExtractor = Some(extractor)
+ extractor
+ }(system.dispatcher)
+ }
+
+ /**
+ * API MAY CHANGE
+ *
+ * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+ * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+ *
+ * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
+ *
+ * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+ * that entities are routed to the same Entity type.
+ */
+ @ApiMayChange
+ def messageExtractor[M](kafkaPartitions: Int): Future[KafkaShardingMessageExtractor[M]] =
+ Future.successful(new KafkaShardingMessageExtractor[M](kafkaPartitions))
/**
* API MAY CHANGE
@@ -61,19 +87,39 @@ object KafkaClusterSharding {
* that entities are routed to the same Entity type.
*/
@ApiMayChange
- def messageExtractorNoEnvelope[M](system: ActorSystem,
- topic: String,
+ def messageExtractorNoEnvelope[M](topic: String,
timeout: FiniteDuration,
entityIdExtractor: M => String,
- settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
- getPartitionCount(system, topic, timeout, settings)
- .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher)
+ settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] = _messageExtractorNoEnvelope match {
+ case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingNoEnvelopeExtractor[M]])
+ case _ =>
+ getPartitionCount(topic, timeout, settings)
+ .map { kafkaPartitions =>
+ val extractor = new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)
+ _messageExtractorNoEnvelope = Some(extractor)
+ extractor
+ }(system.dispatcher)
+ }
- private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
+ /**
+ * API MAY CHANGE
+ *
+ * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's
+ * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
+ *
+ * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]].
+ *
+ * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure
+ * that entities are routed to the same Entity type.
+ */
+ @ApiMayChange
+ def messageExtractorNoEnvelope[M](kafkaPartitions: Int, entityIdExtractor: M => String): Future[KafkaShardingNoEnvelopeExtractor[M]] =
+ Future.successful(new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))
+
+ private def getPartitionCount[M](topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
implicit val ec: ExecutionContextExecutor = system.dispatcher
val actorNum = metadataActorCounter.getAndIncrement()
val consumerActor = system
- .asInstanceOf[ExtendedActorSystem]
.systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
val metadataClient = MetadataClient.create(consumerActor, timeout)
val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
@@ -83,30 +129,6 @@ object KafkaClusterSharding {
}
}
- @InternalApi
- sealed trait KafkaClusterSharding {
- def kafkaPartitions: Int
- def shardId(entityId: String): String = {
- // simplified version of Kafka's `DefaultPartitioner` implementation
- val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
- partition.toString
- }
- }
-
- @InternalApi
- final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
- extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
- override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
- override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
- }
-
- @InternalApi
- final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
- extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
- override def entityId(message: M): String = entityIdExtractor(message)
- override def unwrapMessage(message: M): M = message
- }
-
// TODO: will require `akka-actors-typed` as a provided dep
/**
* API MAY CHANGE
@@ -158,4 +180,33 @@ object KafkaClusterSharding {
.systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener")
.toClassic
}
+}
+
+object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {
+ @InternalApi
+ sealed trait KafkaClusterShardingContract {
+ def kafkaPartitions: Int
+ def shardId(entityId: String): String = {
+ // simplified version of Kafka's `DefaultPartitioner` implementation
+ val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+ partition.toString
+ }
+ }
+
+ @InternalApi
+ final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int)
+ extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterShardingContract {
+ override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+ override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+ }
+
+ @InternalApi
+ final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String)
+ extends ShardingMessageExtractor[M, M] with KafkaClusterShardingContract {
+ override def entityId(message: M): String = entityIdExtractor(message)
+ override def unwrapMessage(message: M): M = message
+ }
+
+ override def createExtension(system: ExtendedActorSystem): kafka.KafkaClusterSharding =
+ new KafkaClusterSharding(system)
}
\ No newline at end of file
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 1cfa62d..763bc49 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -58,8 +58,7 @@ object UserEvents {
*/
def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = {
val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
- KafkaClusterSharding.messageExtractorNoEnvelope(
- system = system.toClassic,
+ KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope(
timeout = 10.seconds,
topic = processorConfig.topics.head,
entityIdExtractor = (msg: Message) => msg.userId,
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 43ad4bb..6c8d4c0 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -40,7 +40,7 @@ object UserEventsKafkaProcessor {
// TODO config
val timeout = Timeout(3.seconds)
val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId)
- val rebalanceListener = KafkaClusterSharding.rebalanceListener(classic, typeKey)
+ val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(classic, typeKey)
val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId)
val consumerSettings =
ConsumerSettings(classic, new StringDeserializer, new ByteArrayDeserializer)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org