You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:36:02 UTC

[incubator-pekko-samples] branch wip-chbatey-reliable-delivery created (now 311a027)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch wip-chbatey-reliable-delivery
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


      at 311a027  WIP: migrate kafka write side to use reliable delivery

This branch includes the following new commits:

     new 311a027  WIP: migrate kafka write side to use reliable delivery

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-samples] 01/01: WIP: migrate kafka write side to use reliable delivery

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-chbatey-reliable-delivery
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 311a02733d5abb4c005f58697858e695e32a7dd8
Author: Christopher Batey <ch...@gmail.com>
AuthorDate: Tue Mar 3 14:27:00 2020 +0000

    WIP: migrate kafka write side to use reliable delivery
---
 akka-sample-kafka-to-sharding-scala/build.sbt      |   2 +-
 .../processor/src/main/resources/logback.xml       |  30 ++--
 .../main/scala/sample/sharding/kafka/Main.scala    |   1 -
 .../scala/sample/sharding/kafka/UserEvents.scala   |  91 +++++++----
 .../sharding/kafka/UserEventsKafkaProcessor.scala  | 179 ++++++++++++++++++---
 .../sample/sharding/kafka/UserGrpcService.scala    |  14 +-
 6 files changed, 245 insertions(+), 72 deletions(-)

diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt
index 10f8cbb..70d63f5 100644
--- a/akka-sample-kafka-to-sharding-scala/build.sbt
+++ b/akka-sample-kafka-to-sharding-scala/build.sbt
@@ -1,4 +1,4 @@
-val AkkaVersion = "2.6.3"
+val AkkaVersion = "2.6.3+135-0a7adf56+20200301-1347"
 // TODO upgrade to 2.0.0
 val AlpakkaKafkaVersion = "1.1.0"
 val AkkaManagementVersion = "1.0.5"
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
index 31d2e1d..32f5c5b 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml
@@ -1,17 +1,15 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="utf-8"?>
 <configuration>
-
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.kafka" level="WARN" />
-
-    <logger name="akka.cluster.sharding" level="DEBUG" />
-
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file
+  <appender name="STDOUT"
+  class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>[%date{ISO8601}] [%level] [%logger] [%thread]
+      [%X{akkaSource}] - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <logger name="org.apache.kafka" level="WARN" />
+  <logger name="akka.cluster.sharding" level="DEBUG" />
+  <root level="DEBUG">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
index 96fce26..79d3943 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala
@@ -45,7 +45,6 @@ object Main {
           .receiveMessage[MemberUp] {
             case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress =>
               ctx.log.info("Joined the cluster. Starting sharding and kafka processor")
-              UserEvents.init(ctx.system)
               val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor")
               ctx.watch(eventProcessor)
               Behaviors.same
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
index f093fff..d4a4b86 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala
@@ -11,44 +11,81 @@ import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor
 import akka.cluster.sharding.typed.scaladsl.ClusterSharding
 import akka.cluster.sharding.typed.scaladsl.Entity
 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.actor.typed.delivery.ConsumerController
+import akka.actor.typed.delivery.ConsumerController.Start
+import akka.actor.typed.delivery.ConsumerController
+import akka.cluster.sharding.typed.delivery.ShardingConsumerController
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.actor.typed.delivery.ConsumerController.SequencedMessage
+import akka.cluster.sharding.typed.Murmur2MessageExtractor
+import akka.actor.typed.delivery.ConsumerController.Confirmed
 
 object UserEvents {
 
-  val TypeKey: EntityTypeKey[UserEvents.Message] =
-    EntityTypeKey[UserEvents.Message]("user-processing")
+  val TypeKey: EntityTypeKey[SequencedMessage[UserEvents.Message]] =
+    EntityTypeKey[SequencedMessage[UserEvents.Message]]("user-processing")
 
   sealed trait Message extends CborSerializable {
     def userId: String
   }
   sealed trait UserEvent extends Message
-  case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent
-  case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done])
+  case class UserAction(userId: String, description: String) extends UserEvent
+  case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long)
       extends UserEvent
 
   sealed trait UserQuery extends Message
   case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery
 
+  final case class UserEventDelivery(message: Message, confirmTo: ActorRef[ConsumerController.Confirmed], seqNr: Long)
+
   case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable
 
-  def apply(): Behavior[Message] = running(RunningTotal(0, 0))
+  def shardingInit(system: ActorSystem[_]): ActorRef[ShardingEnvelope[SequencedMessage[Message]]] = {
+    val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
+    val entity: Entity[SequencedMessage[Message], ShardingEnvelope[SequencedMessage[Message]]] = Entity[SequencedMessage[Message]](TypeKey)(_ => {
+                                                   ShardingConsumerController(controller => UserEvents(controller))
+                                                 })
+    val entityWithExtractor = 
+        entity
+        .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
+          .withMessageExtractor(new Murmur2MessageExtractor[SequencedMessage[Message]](processorConfig.nrPartitions))
+        .withSettings(ClusterShardingSettings(system))
+
+    ClusterSharding(system).init(entityWithExtractor)
+  }
+
+  def apply(controller: ActorRef[ConsumerController.Start[Message]]): Behavior[UserEventDelivery] = {
+    Behaviors.setup { ctx =>
+      val messageAdapter: ActorRef[ConsumerController.Delivery[Message]] =
+        ctx.messageAdapter(d => UserEventDelivery(d.msg, d.confirmTo, d.seqNr))
+      controller ! Start(messageAdapter)
+      running(RunningTotal(0, 0))
+    }
+  }
 
-  private def running(runningTotal: RunningTotal): Behavior[Message] = {
+  private def running(runningTotal: RunningTotal): Behavior[UserEventDelivery] = {
     Behaviors.setup { ctx =>
-      Behaviors.receiveMessage[Message] {
-        case UserAction(_, desc, ack) =>
-          ctx.log.info("user event {}", desc)
-          ack.tell(Done)
-          Behaviors.same
-        case UserPurchase(id, product, quantity, price, ack) =>
-          ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
-          ack.tell(Done)
-          running(
-            runningTotal.copy(
-              totalPurchases = runningTotal.totalPurchases + 1,
-              amountSpent = runningTotal.amountSpent + (quantity * price)))
-        case GetRunningTotal(_, replyTo) =>
-          replyTo ! runningTotal
-          Behaviors.same
+      Behaviors.receiveMessage {
+        case UserEventDelivery(msg, confirmTo, seqNr) =>
+          msg match {
+            case UserAction(_, desc) =>
+              ctx.log.info("user event {}", desc)
+              confirmTo ! Confirmed(seqNr)
+              Behaviors.same
+            case UserPurchase(id, product, quantity, price) =>
+              ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price)
+              confirmTo ! Confirmed(seqNr)
+              running(
+                runningTotal.copy(
+                  totalPurchases = runningTotal.totalPurchases + 1,
+                  amountSpent = runningTotal.amountSpent + (quantity * price)
+                )
+              )
+            case GetRunningTotal(_, replyTo) =>
+              replyTo ! runningTotal
+              confirmTo ! Confirmed(seqNr)
+              Behaviors.same
+          }
       }
     }
   }
@@ -58,20 +95,20 @@ object UserEvents {
    * have keys that are strings
    */
   class UserIdMessageExtractor(nrKafkaPartitions: Int)
-      extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) {
-    override def entityId(message: Message): String = message.userId
+      extends Murmur2NoEnvelopeMessageExtractor[SequencedMessage[Message]](nrKafkaPartitions) {
+    override def entityId(message: SequencedMessage[Message]): String = message.msg.userId
   }
 
+   /* 
   def init(system: ActorSystem[_]): ActorRef[Message] = {
     val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor"))
     ClusterSharding(system).init(
       Entity(TypeKey)(createBehavior = _ => UserEvents())
         .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name))
         .withMessageExtractor(new UserIdMessageExtractor(processorConfig.nrPartitions))
-        .withSettings(ClusterShardingSettings(system)))
+        .withSettings(ClusterShardingSettings(system))
+    )
   }
+    */
 
-  def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = {
-    init(system).narrow[UserQuery]
-  }
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
index 8c69c83..8f15bb2 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala
@@ -14,20 +14,35 @@ import akka.kafka.scaladsl.Consumer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
 import akka.util.Timeout
+import akka.cluster.sharding.typed.delivery._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import sample.sharding.kafka.serialization.UserPurchaseProto
 
-import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
 import scala.concurrent.duration._
 import scala.util.Try
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.stream.stage.{GraphStage, GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, InHandler, OutHandler}
+import akka.stream.FlowShape
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.stream.Inlet
+import akka.stream.Outlet
+import akka.stream.Attributes
+import akka.actor.ActorRef
+import akka.cluster.sharding.typed.delivery.ShardingProducerController.RequestNext
+import akka.stream.SinkShape
+import akka.NotUsed
+import akka.stream.scaladsl.Keep
+
+import scala.util.control.NonFatal
 
 object UserEventsKafkaProcessor {
 
   sealed trait Command
+
   private case class KafkaConsumerStopped(reason: Try[Any]) extends Command
 
   def apply(): Behavior[Nothing] = {
@@ -40,7 +55,14 @@ object UserEventsKafkaProcessor {
         // TODO config
         val timeout = Timeout(3.seconds)
         val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), "rebalancerRef")
-        val shardRegion = UserEvents.init(ctx.system)
+
+        // FIXME
+        val shardRegion = UserEvents.shardingInit(ctx.system)
+
+        val shardingProducerController = ctx.spawn(
+          ShardingProducerController[UserEvents.Message]("producer-id", shardRegion, None),
+          s"shardingController"
+        )
         val consumerSettings =
           ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
             .withBootstrapServers(processorSettings.bootstrapServers)
@@ -53,32 +75,27 @@ object UserEventsKafkaProcessor {
         val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] =
           Consumer.plainSource(consumerSettings, subscription)
 
-        // TODO use committable source and reliable delivery (once released)?
-        val stream: Future[Done] = kafkaConsumer
+        val stream = kafkaConsumer
           .log("kafka-consumer")
           .filter(_.key() != null) // no entity id
-          .mapAsync(20) { record =>
+          .map { record =>
             // alternatively the user id could be in the message rather than use the kafka key
             ctx.log.info(s"entityId->partition ${record.key()}->${record.partition()}")
             ctx.log.info("Forwarding message for entity {} to cluster sharding", record.key())
-            // idempotency?
-            retry(
-              () =>
-                shardRegion.ask[Done](replyTo => {
-                  val purchaseProto = UserPurchaseProto.parseFrom(record.value())
-                  UserEvents.UserPurchase(
-                    purchaseProto.userId,
-                    purchaseProto.product,
-                    purchaseProto.quantity,
-                    purchaseProto.price,
-                    replyTo)
-                })(timeout, ctx.system.scheduler),
-              3,
-              1.second)
+            val purchaseProto = UserPurchaseProto.parseFrom(record.value())
+            ShardingEnvelope[UserEvents.Message](purchaseProto.userId,
+              UserEvents.UserPurchase(
+                purchaseProto.userId,
+                purchaseProto.product,
+                purchaseProto.quantity,
+                purchaseProto.price
+              ))
           }
-          .runWith(Sink.ignore)
+          .toMat(ShardingToReliableDelivery.sink(shardingProducerController))(Keep.right)
+          .run()
 
         stream.onComplete { result =>
+          println("Stream finished " + result)
           ctx.self ! KafkaConsumerStopped(result)
         }
         Behaviors.receiveMessage[Command] {
@@ -89,5 +106,125 @@ object UserEventsKafkaProcessor {
       }
       .narrow
   }
+}
+
+object ShardingToReliableDelivery {
+  def sink[M](
+               producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]]
+             ): Sink[ShardingEnvelope[M], Future[Done]] = {
+    Sink.fromGraph(new ShardingToReliableDeliveryStage(producerController))
+  }
+}
+
+/**
+ * Forwards messages to sharded entities via reliable delivery.
+ *
+ * Will keep requesting messages until there is a max number of messages buffered or
+ * a single entity has too many buffered messages
+ *
+ * Should one slow entity be allowed to slow everything down? Option to start dropping for a given entity?
+ */
+class ShardingToReliableDeliveryStage[M](
+                                          producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]]
+                                        ) extends GraphStageWithMaterializedValue[SinkShape[ShardingEnvelope[M]], Future[Done]] {
+
+  val in = Inlet[ShardingEnvelope[M]]("ShardingToReliableDelivery.in")
+
+  val shape = SinkShape.of(in)
+
+  val MaxBufferedMessages = 1000
+  val MaxBufferedForSingleEntity = 100
+
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
+    val promise = Promise[Done]()
+    val logic = new GraphStageLogicWithLogging(shape) {
+      var currentRequest: Option[RequestNext[M]] = None
+
+      private def receive(message: (ActorRef, Any)): Unit = message match {
+        case (_, rn: RequestNext[M]) =>
+          try {
+            currentRequest = Some(rn.asInstanceOf[RequestNext[M]])
+            pullIfShardingDemand()
+          } catch {
+            case NonFatal(t) =>
+              failStage(t)
+              promise.tryFailure(t)
+          }
+        case msg =>
+          log.warning("unexpected message to stage actor {}", msg)
+      }
 
+      override def preStart(): Unit = {
+        import akka.actor.typed.scaladsl.adapter._
+        val stageActor: ActorRef = getStageActor(receive).ref
+        val asTyped = stageActor.toTyped[ShardingProducerController.RequestNext[M]]
+        producerController ! ShardingProducerController.Start(asTyped)
+        pull(in)
+      }
+
+      override def postStop(): Unit = {
+        super.postStop()
+        promise.trySuccess(Done)
+      }
+
+      /**
+       * Enforces that max total buffered messages or max for a signle
+       * entity.
+       */
+      private def reliableDeliveryFull(): Boolean = {
+        currentRequest.exists(request => {
+          val entityTooMany = request.bufferedForEntitiesWithoutDemand.find(_._2 >= MaxBufferedForSingleEntity)
+          if (entityTooMany.isDefined) {
+            log.info("Entity {} has max buffered messages. Not producing demand.", entityTooMany.get._1)
+            true
+          } else if (request.bufferedForEntitiesWithoutDemand.values.sum > MaxBufferedMessages) {
+            log.info("Max number of messages in flight to reliable delivery")
+            true
+          } else {
+            false
+          }
+        })
+      }
+
+      /**
+       * Pulls a request has been received from reliable delivery, and that request
+       * has not hit max buffers.
+       */
+      def pullIfShardingDemand(): Unit = {
+        if (currentRequest.isDefined && !reliableDeliveryFull() && !hasBeenPulled(in)) {
+          pull(in)
+        }
+      }
+
+      setHandler(
+        in,
+        new InHandler {
+          override def onUpstreamFinish(): Unit = {
+            log.info("Upstream finished")
+            super.onUpstreamFinish()
+            promise.trySuccess(Done)
+          }
+
+          override def onUpstreamFailure(ex: Throwable): Unit = {
+            log.error("Upstream failed", ex)
+            super.onUpstreamFailure(ex)
+            promise.tryFailure(ex)
+          }
+
+          override def onPush(): Unit = {
+            log.info("onPush")
+            // there should be demand from sharding and downstream, otherwise there would not have been a pull
+            val next = grab(in)
+            currentRequest match {
+              case None => throw new IllegalStateException("onPush called when no demand from sharding")
+              case Some(request) => request.sendNextTo ! next
+            }
+            pullIfShardingDemand()
+          }
+        }
+      )
+    }
+    (logic, promise.future)
+  }
 }
diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
index adfaf67..759f0aa 100644
--- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
+++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala
@@ -2,6 +2,7 @@ package sample.sharding.kafka
 
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
+import akka.actor.typed.delivery.ConsumerController.SequencedMessage
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.util.Timeout
 import sample.sharding.kafka.UserEvents.GetRunningTotal
@@ -9,6 +10,7 @@ import sample.sharding.kafka.UserEvents.RunningTotal
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
+import akka.cluster.sharding.typed.ShardingEnvelope
 
 class UserGrpcService(system: ActorSystem[_]) extends UserService {
 
@@ -16,11 +18,11 @@ class UserGrpcService(system: ActorSystem[_]) extends UserService {
   implicit val sched = system.scheduler
   implicit val ec = system.executionContext
 
-  private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system)
-
-  override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
-    shardRegion
-      .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo))
-      .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent))
+ override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = {
+   // shardRegion
+   //   .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo))
+   //   .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent))
+    ???
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org