You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:03 UTC

[incubator-pekko-samples] 01/01: WIP: migrate kafka write side to use reliable delivery

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-chbatey-reliable-delivery
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 311a02733d5abb4c005f58697858e695e32a7dd8
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Tue Mar 3 14:27:00 2020 +0000

    WIP: migrate kafka write side to use reliable delivery
---
 akka-sample-kafka-to-sharding-scala/build.sbt      |   2 +-
 .../processor/src/main/resources/logback.xml       |  30 ++--
 .../main/scala/sample/sharding/kafka/Main.scala    |   1 -
 .../scala/sample/sharding/kafka/UserEvents.scala   |  91 +++++++----
 .../sharding/kafka/UserEventsKafkaProcessor.scala  | 179 ++++++++++++++++++---
 .../sample/sharding/kafka/UserGrpcService.scala    |  14 +-
 6 files changed, 245 insertions(+), 72 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 10f8cbb..70d63f5 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,4 +1,4 @@
-val AkkaVersion = "2.6.3"
+val AkkaVersion = "2.6.3+135-0a7adf56+20200301-1347"
 // TODO upgrade to 2.0.0
 val AlpakkaKafkaVersion = "1.1.0"
 val AkkaManagementVersion = "1.0.5"
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
index 31d2e1d..32f5c5b 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
@@ -1,17 +1,15 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="utf-8"?>
 <configuration>
-
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.kafka" level="WARN" />
-
-    <logger name="akka.cluster.sharding" level="DEBUG" />
-
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file
+  <appender name="STDOUT"
+  class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
+      [%X{akkaSource}] - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <logger name="org.apache.kafka" level="WARN" />
+  <logger name="akka.cluster.sharding" level="DEBUG" />
+  <root level="DEBUG">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 96fce26..79d3943 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -45,7 +45,6 @@ object Main {
           .receiveMessage[MemberUp] {
             case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress =>
               ctx.log.info("Joined the cluster. Starting sharding and kafka processor")
-              UserEvents.init(ctx.system)
               val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor")
               ctx.watch(eventProcessor)
               Behaviors.same
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f093fff..d4a4b86 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -11,44 +11,81 @@ import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor
 import akka.cluster.sharding.typed.scaladsl.ClusterSharding
 import akka.cluster.sharding.typed.scaladsl.Entity
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.actor.typed.delivery.ConsumerController
+import akka.actor.typed.delivery.ConsumerController.Start
+import akka.actor.typed.delivery.ConsumerController
+import akka.cluster.sharding.typed.delivery.ShardingConsumerController
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.actor.typed.delivery.ConsumerController.SequencedMessage
+import akka.cluster.sharding.typed.Murmur2MessageExtractor
+import akka.actor.typed.delivery.ConsumerController.Confirmed
 
 object UserEvents {
 
-  val TypeKey: EntityTypeKey[UserEvents.Message] =
-    EntityTypeKey[UserEvents.Message]("user-processing")
+  val TypeKey: EntityTypeKey[SequencedMessage[UserEvents.Message]] =
+    EntityTypeKey[SequencedMessage[UserEvents.Message]]("user-processing")
 
   sealed trait Message extends CborSerializable {
     def userId: String
   }
   sealed trait UserEvent extends Message
-  case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent
-  case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done])
+  case class UserAction(userId: String, description: String) extends UserEvent
+  case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long)
       extends UserEvent
 
   sealed trait UserQuery extends Message
   case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery
 
+  final case class UserEventDelivery(message: Message, confirmTo: ActorRef[ConsumerController.Confirmed], seqNr: Long)
+
   case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
 
-  def apply(): Behavior[Message] = running(RunningTotal(0, 0))
+  def shardingInit(system: ActorSystem[_]): ActorRef[ShardingEnvelope[SequencedMessage[Message]]] = {
+    val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
+    val entity: Entity[SequencedMessage[Message], ShardingEnvelope[SequencedMessage[Message]]] = Entity[SequencedMessage[Message]](TypeKey)(_ => {
+                                                   ShardingConsumerController(controller => UserEvents(controller))
+                                                 })
+    val entityWithExtractor = 
+        entity
+        .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
+          .withMessageExtractor(new Murmur2MessageExtractor[SequencedMessage[Message]](processorConfig.nrPartitions))
+        .withSettings(ClusterShardingSettings(system))
+
+    ClusterSharding(system).init(entityWithExtractor)
+  }
+
+  def apply(controller: ActorRef[ConsumerController.Start[Message]]): Behavior[UserEventDelivery] = {
+    Behaviors.setup { ctx =>
+      val messageAdapter: ActorRef[ConsumerController.Delivery[Message]] =
+        ctx.messageAdapter(d => UserEventDelivery(d.msg, d.confirmTo, d.seqNr))
+      controller ! Start(messageAdapter)
+      running(RunningTotal(0, 0))
+    }
+  }
 
-  private def running(runningTotal: RunningTotal): Behavior[Message] = {
+  private def running(runningTotal: RunningTotal): Behavior[UserEventDelivery] = {
     Behaviors.setup { ctx =>
-      Behaviors.receiveMessage[Message] {
-        case UserAction(_, desc, ack) =>
-          ctx.log.info("user event {}", desc)
-          ack.tell(Done)
-          Behaviors.same
-        case UserPurchase(id, product, quantity, price, ack) =>
-          ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
-          ack.tell(Done)
-          running(
-            runningTotal.copy(
-              totalPurchases = runningTotal.totalPurchases + 1,
-              amountSpent = runningTotal.amountSpent + (quantity * price)))
-        case GetRunningTotal(_, replyTo) =>
-          replyTo ! runningTotal
-          Behaviors.same
+      Behaviors.receiveMessage {
+        case UserEventDelivery(msg, confirmTo, seqNr) =>
+          msg match {
+            case UserAction(_, desc) =>
+              ctx.log.info("user event {}", desc)
+              confirmTo ! Confirmed(seqNr)
+              Behaviors.same
+            case UserPurchase(id, product, quantity, price) =>
+              ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
+              confirmTo ! Confirmed(seqNr)
+              running(
+                runningTotal.copy(
+                  totalPurchases = runningTotal.totalPurchases + 1,
+                  amountSpent = runningTotal.amountSpent + (quantity * price)
+                )
+              )
+            case GetRunningTotal(_, replyTo) =>
+              replyTo ! runningTotal
+              confirmTo ! Confirmed(seqNr)
+              Behaviors.same
+          }
       }
     }
   }
@@ -58,20 +95,20 @@ object UserEvents {
    * have keys that are strings
    */
   class UserIdMessageExtractor(nrKafkaPartitions: Int)
-      extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) {
-    override def entityId(message: Message): String = message.userId
+      extends Murmur2NoEnvelopeMessageExtractor[SequencedMessage[Message]](nrKafkaPartitions) {
+    override def entityId(message: SequencedMessage[Message]): String = message.msg.userId
   }
 
+   /* 
   def init(system: ActorSystem[_]): ActorRef[Message] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
         .withMessageExtractor(new UserIdMessageExtractor(processorConfig.nrPartitions))
-        .withSettings(ClusterShardingSettings(system)))
+        .withSettings(ClusterShardingSettings(system))
+    )
   }
+    */
 
-  def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = {
-    init(system).narrow[UserQuery]
-  }
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 8c69c83..8f15bb2 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -14,20 +14,35 @@ import akka.kafka.scaladsl.Consumer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
 import akka.util.Timeout
+import akka.cluster.sharding.typed.delivery._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import sample.sharding.kafka.serialization.UserPurchaseProto
 
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
 import scala.concurrent.duration._
 import scala.util.Try
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.stream.stage.{GraphStage, GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, InHandler, OutHandler}
+import akka.stream.FlowShape
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.stream.Inlet
+import akka.stream.Outlet
+import akka.stream.Attributes
+import akka.actor.ActorRef
+import akka.cluster.sharding.typed.delivery.ShardingProducerController.RequestNext
+import akka.stream.SinkShape
+import akka.NotUsed
+import akka.stream.scaladsl.Keep
+
+import scala.util.control.NonFatal
 
 object UserEventsKafkaProcessor {
 
   sealed trait Command
+
   private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
 
   def apply(): Behavior[Nothing] = {
@@ -40,7 +55,14 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), "rebalancerRef")
-        val shardRegion = UserEvents.init(ctx.system)
+
+        // FIXME
+        val shardRegion = UserEvents.shardingInit(ctx.system)
+
+        val shardingProducerController = ctx.spawn(
+          ShardingProducerController[UserEvents.Message]("producer-id", shardRegion, None),
+          s"shardingController"
+        )
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
@@ -53,32 +75,27 @@ object UserEventsKafkaProcessor {
         val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
           Consumer.plainSource(consumerSettings, subscription)
 
-        // TODO use committable source and reliable delivery (once released)?
-        val stream: Future[Done] = kafkaConsumer
+        val stream = kafkaConsumer
           .log("kafka-consumer")
           .filter(_.key() != null) // no entity id
-          .mapAsync(20) { record =>
+          .map { record =>
             // alternatively the user id could be in the message rather than use the kafka key
             ctx.log.info(s"entityId->partition ${record.key()}->${record.partition()}")
             ctx.log.info("Forwarding message for entity {} to cluster sharding", record.key())
-            // idempotency?
-            retry(
-              () =>
-                shardRegion.ask[Done](replyTo => {
-                  val purchaseProto = UserPurchaseProto.parseFrom(record.value())
-                  UserEvents.UserPurchase(
-                    purchaseProto.userId,
-                    purchaseProto.product,
-                    purchaseProto.quantity,
-                    purchaseProto.price,
-                    replyTo)
-                })(timeout, ctx.system.scheduler),
-              3,
-              1.second)
+            val purchaseProto = UserPurchaseProto.parseFrom(record.value())
+            ShardingEnvelope[UserEvents.Message](purchaseProto.userId,
+              UserEvents.UserPurchase(
+                purchaseProto.userId,
+                purchaseProto.product,
+                purchaseProto.quantity,
+                purchaseProto.price
+              ))
           }
-          .runWith(Sink.ignore)
+          .toMat(ShardingToReliableDelivery.sink(shardingProducerController))(Keep.right)
+          .run()
 
         stream.onComplete { result =>
+          println("Stream finished " + result)
           ctx.self ! KafkaConsumerStopped(result)
         }
         Behaviors.receiveMessage[Command] {
@@ -89,5 +106,125 @@ object UserEventsKafkaProcessor {
       }
       .narrow
   }
+}
+
+object ShardingToReliableDelivery {
+  def sink[M](
+               producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]]
+             ): Sink[ShardingEnvelope[M], Future[Done]] = {
+    Sink.fromGraph(new ShardingToReliableDeliveryStage(producerController))
+  }
+}
+
+/**
+ * Forwards messages to sharded entities via reliable delivery.
+ *
+ * Will keep requesting messages until there is a max number of messages buffered or
+ * a single entity has too many buffered messages
+ *
+ * Should one slow entity be allowed to slow everything down? Option to start dropping for a given entity?
+ */
+class ShardingToReliableDeliveryStage[M](
+                                          producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]]
+                                        ) extends GraphStageWithMaterializedValue[SinkShape[ShardingEnvelope[M]], Future[Done]] {
+
+  val in = Inlet[ShardingEnvelope[M]]("ShardingToReliableDelivery.in")
+
+  val shape = SinkShape.of(in)
+
+  val MaxBufferedMessages = 1000
+  val MaxBufferedForSingleEntity = 100
+
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+    val promise = Promise[Done]()
+    val logic = new GraphStageLogicWithLogging(shape) {
+      var currentRequest: Option[RequestNext[M]] = None
+
+      private def receive(message: (ActorRef, Any)): Unit = message match {
+        case (_, rn: RequestNext[M]) =>
+          try {
+            currentRequest = Some(rn.asInstanceOf[RequestNext[M]])
+            pullIfShardingDemand()
+          } catch {
+            case NonFatal(t) =>
+              failStage(t)
+              promise.tryFailure(t)
+          }
+        case msg =>
+          log.warning("unexpected message to stage actor {}", msg)
+      }
 
+      override def preStart(): Unit = {
+        import akka.actor.typed.scaladsl.adapter._
+        val stageActor: ActorRef = getStageActor(receive).ref
+        val asTyped = stageActor.toTyped[ShardingProducerController.RequestNext[M]]
+        producerController ! ShardingProducerController.Start(asTyped)
+        pull(in)
+      }
+
+      override def postStop(): Unit = {
+        super.postStop()
+        promise.trySuccess(Done)
+      }
+
+      /**
+       * Enforces that max total buffered messages or max for a signle
+       * entity.
+       */
+      private def reliableDeliveryFull(): Boolean = {
+        currentRequest.exists(request => {
+          val entityTooMany = request.bufferedForEntitiesWithoutDemand.find(_._2 >= MaxBufferedForSingleEntity)
+          if (entityTooMany.isDefined) {
+            log.info("Entity {} has max buffered messages. Not producing demand.", entityTooMany.get._1)
+            true
+          } else if (request.bufferedForEntitiesWithoutDemand.values.sum > MaxBufferedMessages) {
+            log.info("Max number of messages in flight to reliable delivery")
+            true
+          } else {
+            false
+          }
+        })
+      }
+
+      /**
+       * Pulls a request has been received from reliable delivery, and that request
+       * has not hit max buffers.
+       */
+      def pullIfShardingDemand(): Unit = {
+        if (currentRequest.isDefined && !reliableDeliveryFull() && !hasBeenPulled(in)) {
+          pull(in)
+        }
+      }
+
+      setHandler(
+        in,
+        new InHandler {
+          override def onUpstreamFinish(): Unit = {
+            log.info("Upstream finished")
+            super.onUpstreamFinish()
+            promise.trySuccess(Done)
+          }
+
+          override def onUpstreamFailure(ex: Throwable): Unit = {
+            log.error("Upstream failed", ex)
+            super.onUpstreamFailure(ex)
+            promise.tryFailure(ex)
+          }
+
+          override def onPush(): Unit = {
+            log.info("onPush")
+            // there should be demand from sharding and downstream, otherwise there would not have been a pull
+            val next = grab(in)
+            currentRequest match {
+              case None => throw new IllegalStateException("onPush called when no demand from sharding")
+              case Some(request) => request.sendNextTo ! next
+            }
+            pullIfShardingDemand()
+          }
+        }
+      )
+    }
+    (logic, promise.future)
+  }
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index adfaf67..759f0aa 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -2,6 +2,7 @@ package sample.sharding.kafka
 
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
+import akka.actor.typed.delivery.ConsumerController.SequencedMessage
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.util.Timeout
 import sample.sharding.kafka.UserEvents.GetRunningTotal
@@ -9,6 +10,7 @@ import sample.sharding.kafka.UserEvents.RunningTotal
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
+import akka.cluster.sharding.typed.ShardingEnvelope
 
 class UserGrpcService(system: ActorSystem[_]) extends UserService {
 
@@ -16,11 +18,11 @@ class UserGrpcService(system: ActorSystem[_]) extends UserService {
   implicit val sched = system.scheduler
   implicit val ec = system.executionContext
 
-  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system)
-
-  override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
-    shardRegion
-      .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo))
-      .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent))
+ override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
+   // shardRegion
+   //   .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo))
+   //   .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent))
+    ???
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org