You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:48 UTC
[incubator-pekko-samples] 03/09: WIP- async message extractor
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-seglo-kafka-sharding
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit fb0f3f28a5a8b5eb71658342d03dc0f77c5d78c9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Fri Feb 14 15:58:05 2020 -0500
WIP- async message extractor
---
akka-sample-kafka-to-sharding-scala/README.md | 8 +--
.../sharding/embeddedkafka/KafkaBroker.scala | 8 +--
.../scala/akka/kafka/KafkaClusterSharding.scala | 78 ++++++++++++++++++++
.../akka/kafka/KafkaShardingMessageExtractor.scala | 75 -------------------
.../main/scala/sample/sharding/kafka/Main.scala | 84 ++++++++++++++--------
.../sample/sharding/kafka/TopicListener.scala | 3 +
.../scala/sample/sharding/kafka/UserEvents.scala | 48 ++++++-------
.../sharding/kafka/UserEventsKafkaProcessor.scala | 5 +-
.../sample/sharding/kafka/UserGrpcService.scala | 5 +-
.../producer/src/main/resources/application.conf | 11 +--
.../kafka/producer/UserEventProducer.scala | 7 +-
11 files changed, 174 insertions(+), 158 deletions(-)
diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md
index 7194fe2..4913792 100644
--- a/akka-sample-kafka-to-sharding-scala/README.md
+++ b/akka-sample-kafka-to-sharding-scala/README.md
@@ -154,16 +154,16 @@ Using Akka management we can see the shard allocations and the number of entitie
```
// Node 1:
- curl -v localhost:8551/cluster/shards/user-processing | jq
+ curl -v localhost:8551/cluster/shards/user-processing | jq
// Node 2:
- curl -v localhost:8552/cluster/shards/user-processing | jq
+ curl -v localhost:8552/cluster/shards/user-processing | jq
```
We can count the number of shards on each:
```
-curl -v localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc
+curl -s localhost:8551/cluster/shards/user-processing | jq -r "." | grep shardId | wc -l
```
The number of shards will depend on which entities have received messages.
@@ -181,7 +181,7 @@ the correct node even if that moves due to a kafka rebalance.
A gRPC client is included which can be started with...
```
- sbt "client/run"
+sbt "client/run"
```
It assumes there is one of the nodes running its front end port on 8081.
diff --git a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
index 90663a3..7e5c81c 100644
--- a/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
+++ b/akka-sample-kafka-to-sharding-scala/kafka/src/main/scala/sample/sharding/embeddedkafka/KafkaBroker.scala
@@ -10,13 +10,13 @@ object KafkaBroker extends App with EmbeddedKafka {
val topic = "user-events"
val partitions = 128
- val config = EmbeddedKafkaConfig(kafkaPort = port)
- val server = EmbeddedKafka.start()(config)
+ implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort = port)
+ val server = EmbeddedKafka.start()
createCustomTopic(topic = topic, partitions = partitions)
- log.info(s"Kafka running on port '$port'")
- log.info(s"Topic '$topic' with '$partitions' partitions created")
+ log.info(s"Kafka running: localhost:$port")
+ log.info(s"Topic '$topic' with $partitions partitions created")
server.broker.awaitShutdown()
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
new file mode 100644
index 0000000..5562c42
--- /dev/null
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala
@@ -0,0 +1,78 @@
+package akka.kafka
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
+import akka.kafka.scaladsl.MetadataClient
+import akka.util.Timeout._
+import org.apache.kafka.common.utils.Utils
+
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.concurrent.duration._
+
+object KafkaClusterSharding {
+ private val metadataActorCounter = new AtomicInteger
+
+ def messageExtractor[M](system: ActorSystem,
+ groupId: String,
+ topic: String,
+ timeout: FiniteDuration,
+ settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] =
+ getPartitionCount(system, topic, timeout, settings)
+ .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher)
+
+ def messageExtractorNoEnvelope[M](system: ActorSystem,
+ groupId: String,
+ topic: String,
+ timeout: FiniteDuration,
+ entityIdExtractor: M => String,
+ settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] =
+ getPartitionCount(system, topic, timeout, settings)
+ .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher)
+
+ private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = {
+ implicit val ec: ExecutionContextExecutor = system.dispatcher
+ val actorNum = metadataActorCounter.getAndIncrement()
+ val consumerActor = system
+ .asInstanceOf[ExtendedActorSystem]
+ .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
+ val metadataClient = MetadataClient.create(consumerActor, timeout)
+ val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
+ numPartitions.map { count =>
+ system.log.info("Retrieved {} partitions for topic '{}'", count, topic)
+ count
+ }
+ }
+}
+
+trait KafkaClusterSharding {
+ def groupId: String
+ def kafkaPartitions: Int
+
+ def shardId(entityId: String): String = {
+ // simplified version of Kafka's `DefaultPartitioner` implementation
+ val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
+ s"$groupId-$partition"
+ }
+}
+
+class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int)
+ extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding {
+ override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
+ override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
+}
+
+/**
+ * Caveats
+ * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
+ * - Values are passed as `null` to the partitioner.
+ * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
+ * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
+ * partitioner doesn't make use of any other Kafka Cluster metadata.
+ */
+class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String)
+ extends ShardingMessageExtractor[M, M] with KafkaClusterSharding {
+ override def entityId(message: M): String = entityIdExtractor(message)
+ override def unwrapMessage(message: M): M = message
+}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
deleted file mode 100644
index 14353ab..0000000
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package akka.kafka
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor}
-import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy
-import akka.kafka.scaladsl.MetadataClient
-import akka.util.Timeout._
-import org.apache.kafka.common.utils.Utils
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
-
-object DefaultKafkaShardingMessageExtractor {
- sealed trait PartitionCountStrategy {
- def groupId: String
- def partitions: Int
- }
- final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy
- final case class RetrieveFromKafka(
- system: ActorSystem,
- timeout: FiniteDuration,
- groupId: String,
- topic: String,
- settings: ConsumerSettings[_,_])
- extends PartitionCountStrategy {
- import RetrieveFromKafka._
- private implicit val ec: ExecutionContext = system.dispatcher
- lazy val partitions: Int = {
- val actorNum = metadataActorCounter.getAndIncrement()
- val consumerActor = system
- .asInstanceOf[ExtendedActorSystem]
- .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum")
- val metadataClient = MetadataClient.create(consumerActor, timeout)
- val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length)
- numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId))
- Await.result(numPartitions, timeout)
- }
- }
- object RetrieveFromKafka {
- private val metadataActorCounter = new AtomicInteger
- }
-}
-
-private[kafka] trait DefaultKafkaShardingMessageExtractor {
- val strategy: PartitionCountStrategy
- private val groupId: String = strategy.groupId
- private val kafkaPartitions: Int = strategy.partitions
-
- def shardId(entityId: String): String = {
- // simplified version of Kafka's `DefaultPartitioner` implementation
- val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions
- s"$groupId-$partition"
- }
-}
-
-final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy)
- extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor {
- override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
- override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
-}
-
-/**
- * Caveats
- * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions.
- * - Values are passed as `null` to the partitioner.
- * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that
- * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your
- * partitioner doesn't make use of any other Kafka Cluster metadata.
- */
-abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy)
- extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor {
- override def unwrapMessage(message: M): M = message
-}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 96fce26..a695853 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -1,21 +1,24 @@
package sample.sharding.kafka
-import akka.actor.typed.ActorSystem
-import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
+import akka.actor.typed.{ActorRef, ActorSystem, Behavior, Terminated}
import akka.cluster.ClusterEvent.MemberUp
-import akka.cluster.typed.Cluster
-import akka.cluster.typed.Subscribe
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.cluster.typed.{Cluster, Subscribe}
import akka.http.scaladsl._
-import akka.http.scaladsl.model.HttpRequest
-import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.management.scaladsl.AkkaManagement
import akka.stream.Materializer
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+sealed trait Command
+case object NodeMemberUp extends Command
+case object StartProcessor extends Command
+final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command
object Main {
def main(args: Array[String]): Unit = {
@@ -25,45 +28,69 @@ object Main {
args.toList match {
case portString :: managementPort :: frontEndPort :: Nil
if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) =>
- startNode(portString.toInt, managementPort.toInt, frontEndPort.toInt)
+ init(portString.toInt, managementPort.toInt, frontEndPort.toInt)
case _ =>
throw new IllegalArgumentException("usage: <remotingPort> <managementPort> <frontEndPort>")
}
}
- def startNode(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
- ActorSystem(Behaviors.setup[MemberUp] {
+ def init(remotingPort: Int, akkaManagementPort: Int, frontEndPort: Int): Unit = {
+ ActorSystem(Behaviors.setup[Command] {
ctx =>
- implicit val mat = Materializer.createMaterializer(ctx.system.toClassic)
- implicit val ec = ctx.executionContext
AkkaManagement(ctx.system.toClassic).start()
- // maybe don't start until part of the cluster, or add health check
- val binding = startGrpc(ctx.system, mat, frontEndPort)
+
val cluster = Cluster(ctx.system)
- cluster.subscriptions.tell(Subscribe(ctx.self, classOf[MemberUp]))
+ val subscriber = ctx.spawn(clusterUpSubscriber(cluster, ctx.self), "cluster-subscriber")
+ cluster.subscriptions.tell(Subscribe(subscriber, classOf[MemberUp]))
+
+ ctx.pipeToSelf(UserEvents.messageExtractor(ctx.system)) {
+ case Success(extractor) => MessageExtractor(extractor)
+ case Failure(ex) => throw new Exception(ex)
+ }
+
+ starting()
+ }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+
+ def starting(extractor: Option[ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]] = None): Behavior[Command] = Behaviors
+ .receive[Command] {
+ case (ctx, MessageExtractor(extractor)) =>
+ ctx.self.tell(StartProcessor)
+ starting(Some(extractor))
+ case (ctx, StartProcessor) if extractor.isDefined =>
+ UserEvents.init(ctx.system, extractor.get)
+ val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor")
+ ctx.watch(eventProcessor)
+ ctx.log.info("Processor started.")
+ val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get)
+ running(binding, eventProcessor)
+ }
+
+ def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors
+ .receiveSignal {
+ case (ctx, Terminated(`processor`)) =>
+ ctx.log.warn("Kafka event processor stopped. Shutting down")
+ binding.map(_.unbind())(ctx.executionContext)
+ Behaviors.stopped
+ }
+
+ def clusterUpSubscriber(cluster: Cluster, parent: ActorRef[Command]): Behavior[MemberUp] = Behaviors.setup[MemberUp] {
+ ctx =>
Behaviors
.receiveMessage[MemberUp] {
case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress =>
ctx.log.info("Joined the cluster. Starting sharding and kafka processor")
- UserEvents.init(ctx.system)
- val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor")
- ctx.watch(eventProcessor)
+ parent.tell(StartProcessor)
Behaviors.same
case MemberUp(member) =>
ctx.log.info("Member up {}", member)
Behaviors.same
}
- .receiveSignal {
- case (ctx, Terminated(_)) =>
- ctx.log.warn("Kafka event processor stopped. Shutting down")
- binding.map(_.unbind())
- Behaviors.stopped
- }
- }, "KafkaToSharding", config(remotingPort, akkaManagementPort))
+ }
- def startGrpc(system: ActorSystem[_], mat: Materializer, frontEndPort: Int): Future[Http.ServerBinding] = {
+ def startGrpc(system: ActorSystem[_], frontEndPort: Int, extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Future[Http.ServerBinding] = {
+ val mat = Materializer.createMaterializer(system.toClassic)
val service: HttpRequest => Future[HttpResponse] =
- UserServiceHandler(new UserGrpcService(system))(mat, system.toClassic)
+ UserServiceHandler(new UserGrpcService(system, extractor))(mat, system.toClassic)
Http()(system.toClassic).bindAndHandleAsync(
service,
interface = "127.0.0.1",
@@ -79,5 +106,4 @@ object Main {
""").withFallback(ConfigFactory.load())
}
-
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
index 79be5e3..3b59790 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala
@@ -29,6 +29,9 @@ object TopicListener {
val address = Cluster(ctx.system).selfMember.address
Behaviors.receiveMessage[ConsumerRebalanceEvent] {
case TopicPartitionsAssigned(sub, partitions) =>
+ // TODO
+ // - log all partitions assigned in one log line
+ // - block for shard allocation to complete, add configurable timeout
partitions.foreach(tp => {
val shardId = s"$groupId-${tp.partition()}"
ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index 5cf3321..69d815d 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -5,12 +5,12 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
-import akka.cluster.sharding.typed.ClusterShardingSettings
+import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor}
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
-import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka}
-import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor}
+import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor}
import org.apache.kafka.common.serialization.StringDeserializer
+import scala.concurrent.Future
import scala.concurrent.duration._
object UserEvents {
@@ -47,7 +47,8 @@ object UserEvents {
runningTotal.copy(
totalPurchases = runningTotal.totalPurchases + 1,
amountSpent = runningTotal.amountSpent + (quantity * price)))
- case GetRunningTotal(_, replyTo) =>
+ case GetRunningTotal(id, replyTo) =>
+ ctx.log.info("user {} running total queried", id)
replyTo ! runningTotal
Behaviors.same
}
@@ -55,35 +56,30 @@ object UserEvents {
}
/**
- * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for
- * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
- * (murmur2) with Akka Cluster Sharding.
+ * Asynchronously get the Akka Cluster Sharding [[ShardingMessageExtractor]]. Given a topic we can automatically
+ * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka
+ * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding.
*/
- class UserIdMessageExtractor(strategy: PartitionCountStrategy)
- extends KafkaShardingNoEnvelopeExtractor[Message](strategy) {
- def entityId(message: Message): String = message.userId
- }
-
- def init(system: ActorSystem[_]): ActorRef[Message] = {
+ def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = {
val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
- val messageExtractor = new UserIdMessageExtractor(
- strategy = RetrieveFromKafka(
- system = system.toClassic,
- timeout = 10.seconds,
- groupId = processorConfig.groupId,
- topic = processorConfig.topics.head,
- settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
- .withBootstrapServers(processorConfig.bootstrapServers)
- )
+ KafkaClusterSharding.messageExtractorNoEnvelope(
+ system = system.toClassic,
+ timeout = 10.seconds,
+ groupId = processorConfig.groupId,
+ topic = processorConfig.topics.head,
+ entityIdExtractor = (msg: Message) => msg.userId,
+ settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
+ .withBootstrapServers(processorConfig.bootstrapServers)
)
+ }
+
+ def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] =
ClusterSharding(system).init(
Entity(TypeKey)(createBehavior = _ => UserEvents())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
.withMessageExtractor(messageExtractor)
.withSettings(ClusterShardingSettings(system)))
- }
- def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = {
- init(system).narrow[UserQuery]
- }
+ def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] =
+ init(system, messageExtractor).narrow[UserQuery]
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 511c3bc..155a51c 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -8,6 +8,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
+import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.kafka.ConsumerSettings
import akka.kafka.Subscriptions
import akka.kafka.scaladsl.Consumer
@@ -30,7 +31,7 @@ object UserEventsKafkaProcessor {
sealed trait Command
private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
- def apply(): Behavior[Nothing] = {
+ def apply(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]): Behavior[Nothing] = {
Behaviors
.setup[Command] { ctx =>
val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor"))
@@ -40,7 +41,7 @@ object UserEventsKafkaProcessor {
// TODO config
val timeout = Timeout(3.seconds)
val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef")
- val shardRegion = UserEvents.init(ctx.system)
+ val shardRegion = UserEvents.init(ctx.system, extractor)
val consumerSettings =
ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(processorSettings.bootstrapServers)
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index adfaf67..f3acb7c 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -3,6 +3,7 @@ package sample.sharding.kafka
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.AskPattern._
+import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.util.Timeout
import sample.sharding.kafka.UserEvents.GetRunningTotal
import sample.sharding.kafka.UserEvents.RunningTotal
@@ -10,13 +11,13 @@ import sample.sharding.kafka.UserEvents.RunningTotal
import scala.concurrent.Future
import scala.concurrent.duration._
-class UserGrpcService(system: ActorSystem[_]) extends UserService {
+class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends UserService {
implicit val timeout = Timeout(5.seconds)
implicit val sched = system.scheduler
implicit val ec = system.executionContext
- private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system)
+ private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor)
override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
shardRegion
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
index 38d51c0..0c08ab1 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/resources/application.conf
@@ -1,13 +1,4 @@
kafka-to-sharding-producer {
- bootstrap-servers = "localhost:9094"
+ bootstrap-servers = "localhost:9092"
topic = "user-events"
-
- # can be one of:
- # - default: to put the entity id in the key and use the default kafka partitioner
- # - explicit: to specify the partition explicitly, based on the entity id
- # if you have control of both the producer and consumer it is better to be explicit to make sure
- # that both sides align
- partitioning = "explicit"
-
- nr-partitions = 128
}
diff --git a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
index 846dee7..a20418c 100644
--- a/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
+++ b/akka-sample-kafka-to-sharding-scala/producer/src/main/scala/sharding/kafka/producer/UserEventProducer.scala
@@ -1,7 +1,5 @@
package sharding.kafka.producer
-import java.nio.charset.StandardCharsets
-
import akka.Done
import akka.actor.ActorSystem
import akka.event.Logging
@@ -10,9 +8,7 @@ import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import sample.sharding.kafka.serialization.user_events.UserPurchaseProto
import scala.concurrent.Future
@@ -55,7 +51,6 @@ object UserEventProducer extends App {
// rely on the default kafka partitioner to hash the key and distribute among shards
// the logic of the default partitioner must be replicated in MessageExtractor entityId -> shardId function
new ProducerRecord[String, Array[Byte]](producerConfig.topic, randomEntityId, message)
-
})
.runWith(Producer.plainSink(producerSettings))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org