You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:03 UTC
[incubator-pekko-samples] 01/01: WIP: migrate kafka write side to use reliable delivery
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch wip-chbatey-reliable-delivery
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 311a02733d5abb4c005f58697858e695e32a7dd8
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Tue Mar 3 14:27:00 2020 +0000
WIP: migrate kafka write side to use reliable delivery
---
akka-sample-kafka-to-sharding-scala/build.sbt | 2 +-
.../processor/src/main/resources/logback.xml | 30 ++--
.../main/scala/sample/sharding/kafka/Main.scala | 1 -
.../scala/sample/sharding/kafka/UserEvents.scala | 91 +++++++----
.../sharding/kafka/UserEventsKafkaProcessor.scala | 179 ++++++++++++++++++---
.../sample/sharding/kafka/UserGrpcService.scala | 14 +-
6 files changed, 245 insertions(+), 72 deletions(-)
diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 10f8cbb..70d63f5 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,4 +1,4 @@
-val AkkaVersion = "2.6.3"
+val AkkaVersion = "2.6.3+135-0a7adf56+20200301-1347"
// TODO upgrade to 2.0.0
val AlpakkaKafkaVersion = "1.1.0"
val AkkaManagementVersion = "1.0.5"
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
index 31d2e1d..32f5c5b 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
@@ -1,17 +1,15 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="utf-8"?>
<configuration>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.kafka" level="WARN" />
-
- <logger name="akka.cluster.sharding" level="DEBUG" />
-
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- </root>
-</configuration>
\ No newline at end of file
+ <appender name="STDOUT"
+ class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
+ [%X{akkaSource}] - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <logger name="org.apache.kafka" level="WARN" />
+ <logger name="akka.cluster.sharding" level="DEBUG" />
+ <root level="DEBUG">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 96fce26..79d3943 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -45,7 +45,6 @@ object Main {
.receiveMessage[MemberUp] {
case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress =>
ctx.log.info("Joined the cluster. Starting sharding and kafka processor")
- UserEvents.init(ctx.system)
val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor")
ctx.watch(eventProcessor)
Behaviors.same
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f093fff..d4a4b86 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -11,44 +11,81 @@ import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.actor.typed.delivery.ConsumerController
+import akka.actor.typed.delivery.ConsumerController.Start
+import akka.actor.typed.delivery.ConsumerController
+import akka.cluster.sharding.typed.delivery.ShardingConsumerController
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.actor.typed.delivery.ConsumerController.SequencedMessage
+import akka.cluster.sharding.typed.Murmur2MessageExtractor
+import akka.actor.typed.delivery.ConsumerController.Confirmed
object UserEvents {
- val TypeKey: EntityTypeKey[UserEvents.Message] =
- EntityTypeKey[UserEvents.Message]("user-processing")
+ val TypeKey: EntityTypeKey[SequencedMessage[UserEvents.Message]] =
+ EntityTypeKey[SequencedMessage[UserEvents.Message]]("user-processing")
sealed trait Message extends CborSerializable {
def userId: String
}
sealed trait UserEvent extends Message
- case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent
- case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done])
+ case class UserAction(userId: String, description: String) extends UserEvent
+ case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long)
extends UserEvent
sealed trait UserQuery extends Message
case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery
+ final case class UserEventDelivery(message: Message, confirmTo: ActorRef[ConsumerController.Confirmed], seqNr: Long)
+
case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
- def apply(): Behavior[Message] = running(RunningTotal(0, 0))
+ def shardingInit(system: ActorSystem[_]): ActorRef[ShardingEnvelope[SequencedMessage[Message]]] = {
+ val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
+ val entity: Entity[SequencedMessage[Message], ShardingEnvelope[SequencedMessage[Message]]] = Entity[SequencedMessage[Message]](TypeKey)(_ => {
+ ShardingConsumerController(controller => UserEvents(controller))
+ })
+ val entityWithExtractor =
+ entity
+ .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
+ .withMessageExtractor(new Murmur2MessageExtractor[SequencedMessage[Message]](processorConfig.nrPartitions))
+ .withSettings(ClusterShardingSettings(system))
+
+ ClusterSharding(system).init(entityWithExtractor)
+ }
+
+ def apply(controller: ActorRef[ConsumerController.Start[Message]]): Behavior[UserEventDelivery] = {
+ Behaviors.setup { ctx =>
+ val messageAdapter: ActorRef[ConsumerController.Delivery[Message]] =
+ ctx.messageAdapter(d => UserEventDelivery(d.msg, d.confirmTo, d.seqNr))
+ controller ! Start(messageAdapter)
+ running(RunningTotal(0, 0))
+ }
+ }
- private def running(runningTotal: RunningTotal): Behavior[Message] = {
+ private def running(runningTotal: RunningTotal): Behavior[UserEventDelivery] = {
Behaviors.setup { ctx =>
- Behaviors.receiveMessage[Message] {
- case UserAction(_, desc, ack) =>
- ctx.log.info("user event {}", desc)
- ack.tell(Done)
- Behaviors.same
- case UserPurchase(id, product, quantity, price, ack) =>
- ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
- ack.tell(Done)
- running(
- runningTotal.copy(
- totalPurchases = runningTotal.totalPurchases + 1,
- amountSpent = runningTotal.amountSpent + (quantity * price)))
- case GetRunningTotal(_, replyTo) =>
- replyTo ! runningTotal
- Behaviors.same
+ Behaviors.receiveMessage {
+ case UserEventDelivery(msg, confirmTo, seqNr) =>
+ msg match {
+ case UserAction(_, desc) =>
+ ctx.log.info("user event {}", desc)
+ confirmTo ! Confirmed(seqNr)
+ Behaviors.same
+ case UserPurchase(id, product, quantity, price) =>
+ ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
+ confirmTo ! Confirmed(seqNr)
+ running(
+ runningTotal.copy(
+ totalPurchases = runningTotal.totalPurchases + 1,
+ amountSpent = runningTotal.amountSpent + (quantity * price)
+ )
+ )
+ case GetRunningTotal(_, replyTo) =>
+ replyTo ! runningTotal
+ confirmTo ! Confirmed(seqNr)
+ Behaviors.same
+ }
}
}
}
@@ -58,20 +95,20 @@ object UserEvents {
* have keys that are strings
*/
class UserIdMessageExtractor(nrKafkaPartitions: Int)
- extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) {
- override def entityId(message: Message): String = message.userId
+ extends Murmur2NoEnvelopeMessageExtractor[SequencedMessage[Message]](nrKafkaPartitions) {
+ override def entityId(message: SequencedMessage[Message]): String = message.msg.userId
}
+ /*
def init(system: ActorSystem[_]): ActorRef[Message] = {
val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
ClusterSharding(system).init(
Entity(TypeKey)(createBehavior = _ => UserEvents())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
.withMessageExtractor(new UserIdMessageExtractor(processorConfig.nrPartitions))
- .withSettings(ClusterShardingSettings(system)))
+ .withSettings(ClusterShardingSettings(system))
+ )
}
+ */
- def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = {
- init(system).narrow[UserQuery]
- }
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 8c69c83..8f15bb2 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -14,20 +14,35 @@ import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.Timeout
+import akka.cluster.sharding.typed.delivery._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import sample.sharding.kafka.serialization.UserPurchaseProto
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
import scala.concurrent.duration._
import scala.util.Try
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.stream.stage.{GraphStage, GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, InHandler, OutHandler}
+import akka.stream.FlowShape
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.stream.Inlet
+import akka.stream.Outlet
+import akka.stream.Attributes
+import akka.actor.ActorRef
+import akka.cluster.sharding.typed.delivery.ShardingProducerController.RequestNext
+import akka.stream.SinkShape
+import akka.NotUsed
+import akka.stream.scaladsl.Keep
+
+import scala.util.control.NonFatal
object UserEventsKafkaProcessor {
sealed trait Command
+
private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
def apply(): Behavior[Nothing] = {
@@ -40,7 +55,14 @@ object UserEventsKafkaProcessor {
// TODO config
val timeout = Timeout(3.seconds)
val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), "rebalancerRef")
- val shardRegion = UserEvents.init(ctx.system)
+
+ // FIXME
+ val shardRegion = UserEvents.shardingInit(ctx.system)
+
+ val shardingProducerController = ctx.spawn(
+ ShardingProducerController[UserEvents.Message]("producer-id", shardRegion, None),
+ s"shardingController"
+ )
val consumerSettings =
ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(processorSettings.bootstrapServers)
@@ -53,32 +75,27 @@ object UserEventsKafkaProcessor {
val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
Consumer.plainSource(consumerSettings, subscription)
- // TODO use committable source and reliable delivery (once released)?
- val stream: Future[Done] = kafkaConsumer
+ val stream = kafkaConsumer
.log("kafka-consumer")
.filter(_.key() != null) // no entity id
- .mapAsync(20) { record =>
+ .map { record =>
// alternatively the user id could be in the message rather than use the kafka key
ctx.log.info(s"entityId->partition ${record.key()}->${record.partition()}")
ctx.log.info("Forwarding message for entity {} to cluster sharding", record.key())
- // idempotency?
- retry(
- () =>
- shardRegion.ask[Done](replyTo => {
- val purchaseProto = UserPurchaseProto.parseFrom(record.value())
- UserEvents.UserPurchase(
- purchaseProto.userId,
- purchaseProto.product,
- purchaseProto.quantity,
- purchaseProto.price,
- replyTo)
- })(timeout, ctx.system.scheduler),
- 3,
- 1.second)
+ val purchaseProto = UserPurchaseProto.parseFrom(record.value())
+ ShardingEnvelope[UserEvents.Message](purchaseProto.userId,
+ UserEvents.UserPurchase(
+ purchaseProto.userId,
+ purchaseProto.product,
+ purchaseProto.quantity,
+ purchaseProto.price
+ ))
}
- .runWith(Sink.ignore)
+ .toMat(ShardingToReliableDelivery.sink(shardingProducerController))(Keep.right)
+ .run()
stream.onComplete { result =>
+ println("Stream finished " + result)
ctx.self ! KafkaConsumerStopped(result)
}
Behaviors.receiveMessage[Command] {
@@ -89,5 +106,125 @@ object UserEventsKafkaProcessor {
}
.narrow
}
+}
+
+object ShardingToReliableDelivery {
+ def sink[M](
+ producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]]
+ ): Sink[ShardingEnvelope[M], Future[Done]] = {
+ Sink.fromGraph(new ShardingToReliableDeliveryStage(producerController))
+ }
+}
+
+/**
+ * Forwards messages to sharded entities via reliable delivery.
+ *
+ * Will keep requesting messages until there is a max number of messages buffered or
+ * a single entity has too many buffered messages
+ *
+ * Should one slow entity be allowed to slow everything down? Option to start dropping for a given entity?
+ */
+class ShardingToReliableDeliveryStage[M](
+ producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]]
+ ) extends GraphStageWithMaterializedValue[SinkShape[ShardingEnvelope[M]], Future[Done]] {
+
+ val in = Inlet[ShardingEnvelope[M]]("ShardingToReliableDelivery.in")
+
+ val shape = SinkShape.of(in)
+
+ val MaxBufferedMessages = 1000
+ val MaxBufferedForSingleEntity = 100
+
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+ val promise = Promise[Done]()
+ val logic = new GraphStageLogicWithLogging(shape) {
+ var currentRequest: Option[RequestNext[M]] = None
+
+ private def receive(message: (ActorRef, Any)): Unit = message match {
+ case (_, rn: RequestNext[M]) =>
+ try {
+ currentRequest = Some(rn.asInstanceOf[RequestNext[M]])
+ pullIfShardingDemand()
+ } catch {
+ case NonFatal(t) =>
+ failStage(t)
+ promise.tryFailure(t)
+ }
+ case msg =>
+ log.warning("unexpected message to stage actor {}", msg)
+ }
+ override def preStart(): Unit = {
+ import akka.actor.typed.scaladsl.adapter._
+ val stageActor: ActorRef = getStageActor(receive).ref
+ val asTyped = stageActor.toTyped[ShardingProducerController.RequestNext[M]]
+ producerController ! ShardingProducerController.Start(asTyped)
+ pull(in)
+ }
+
+ override def postStop(): Unit = {
+ super.postStop()
+ promise.trySuccess(Done)
+ }
+
+ /**
+ * Enforces that max total buffered messages or max for a signle
+ * entity.
+ */
+ private def reliableDeliveryFull(): Boolean = {
+ currentRequest.exists(request => {
+ val entityTooMany = request.bufferedForEntitiesWithoutDemand.find(_._2 >= MaxBufferedForSingleEntity)
+ if (entityTooMany.isDefined) {
+ log.info("Entity {} has max buffered messages. Not producing demand.", entityTooMany.get._1)
+ true
+ } else if (request.bufferedForEntitiesWithoutDemand.values.sum > MaxBufferedMessages) {
+ log.info("Max number of messages in flight to reliable delivery")
+ true
+ } else {
+ false
+ }
+ })
+ }
+
+ /**
+ * Pulls a request has been received from reliable delivery, and that request
+ * has not hit max buffers.
+ */
+ def pullIfShardingDemand(): Unit = {
+ if (currentRequest.isDefined && !reliableDeliveryFull() && !hasBeenPulled(in)) {
+ pull(in)
+ }
+ }
+
+ setHandler(
+ in,
+ new InHandler {
+ override def onUpstreamFinish(): Unit = {
+ log.info("Upstream finished")
+ super.onUpstreamFinish()
+ promise.trySuccess(Done)
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ log.error("Upstream failed", ex)
+ super.onUpstreamFailure(ex)
+ promise.tryFailure(ex)
+ }
+
+ override def onPush(): Unit = {
+ log.info("onPush")
+ // there should be demand from sharding and downstream, otherwise there would not have been a pull
+ val next = grab(in)
+ currentRequest match {
+ case None => throw new IllegalStateException("onPush called when no demand from sharding")
+ case Some(request) => request.sendNextTo ! next
+ }
+ pullIfShardingDemand()
+ }
+ }
+ )
+ }
+ (logic, promise.future)
+ }
}
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index adfaf67..759f0aa 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -2,6 +2,7 @@ package sample.sharding.kafka
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
+import akka.actor.typed.delivery.ConsumerController.SequencedMessage
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import sample.sharding.kafka.UserEvents.GetRunningTotal
@@ -9,6 +10,7 @@ import sample.sharding.kafka.UserEvents.RunningTotal
import scala.concurrent.Future
import scala.concurrent.duration._
+import akka.cluster.sharding.typed.ShardingEnvelope
class UserGrpcService(system: ActorSystem[_]) extends UserService {
@@ -16,11 +18,11 @@ class UserGrpcService(system: ActorSystem[_]) extends UserService {
implicit val sched = system.scheduler
implicit val ec = system.executionContext
- private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system)
-
- override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
- shardRegion
- .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo))
- .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent))
+ override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
+ // shardRegion
+ // .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo))
+ // .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent))
+ ???
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org