You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by gi...@git.apache.org on 2017/06/27 07:52:29 UTC

[GitHub] rabbah commented on a change in pull request #2425: Throttle message bus consumption.

rabbah commented on a change in pull request #2425: Throttle message bus consumption.
URL: https://github.com/apache/incubator-openwhisk/pull/2425#discussion_r124201235
 
 

 ##########
 File path: common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
 ##########
 @@ -38,15 +47,157 @@ trait MessageConsumer {
      * Commits offsets from last peek operation to ensure they are removed
      * from the connector.
      */
-    def commit()
-
-    /**
-     * Calls process for every message received. Process receives a tuple
-     * (topic, partition, offset, and message as byte array).
-     */
-    def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit
+    def commit(): Unit
 
     /** Closes consumer. */
     def close(): Unit
 
 }
+
+object MessageFeed {
+    protected sealed trait FeedState
+    protected[connector] case object Uninitialized extends FeedState
+    protected[connector] case object FillingPipeline extends FeedState
+    protected[connector] case object DrainingPipeline extends FeedState
+
+    protected sealed trait FeedData
+    private case object NoData extends FeedData
+
+    /** Indicated the consumer is ready to accept messages from the message bus for processing. */
+    object Ready
+
+    /** Steady state message, indicates capacity in downstream process to receive more messages. */
+    object Processed
+}
+
+/**
+ * This actor polls the message bus for new messages and dispatches them to the given
+ * handler. The actor tracks the number of messages dispatched and will not dispatch new
+ * messages until some number of them are acknowledged.
+ *
+ * This is used by the invoker to pull messages from the message bus and apply back pressure
+ * when the invoker does not have resources to complete processing messages (i.e., no containers
+ * are available to run new actions). It is also used in the load balancer to consume active
+ * ack messages.
+ * When the invoker releases resources (by reclaiming containers) it will send a message
+ * to this actor which will then attempt to fill the pipeline with new messages.
+ *
+ * The actor tries to fill the pipeline with additional messages while the number
+ * of outstanding requests is below the pipeline fill threshold.
+ */
+@throws[IllegalArgumentException]
+class MessageFeed(
+    description: String,
+    logging: Logging,
+    consumer: MessageConsumer,
+    maxPipelineDepth: Int,
+    longPollDuration: FiniteDuration,
+    handler: Array[Byte] => Future[Unit],
+    autoStart: Boolean = true)
+    extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
+    import MessageFeed._
+
+    require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth")
+
+    private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+    private var pipelineOccupancy = 0
+    private implicit val tid = TransactionId.dispatcher
+
+    startWith(Uninitialized, MessageFeed.NoData)
+
+    onTransition {
+        case _ -> FillingPipeline => fillPipeline()
+    }
+
+    when(Uninitialized) {
+        case Event(Ready, _) =>
+            goto(FillingPipeline)
+
+        case _ => stay
+    }
+
+    // fill pipeline either periodically due to a scheduled alarm or when
+    // previously queued messages are released
+    when(FillingPipeline, longPollDuration) {
+        case Event(Processed, _) =>
+            updateOccupancy()
+            if (haveCapacity) {
+                fillPipeline()
+                stay
+            } else {
+                goto(DrainingPipeline)
+            }
+
+        case Event(StateTimeout, _) =>
+            if (haveCapacity) {
+                fillPipeline()
+                stay
+            } else {
+                goto(DrainingPipeline)
+            }
+    }
+
+    when(DrainingPipeline) {
+        case Event(Processed, _) =>
+            // fill if there is room otherwise stay
+            updateOccupancy()
+            if (haveCapacity) {
+                goto(FillingPipeline)
+            } else stay
+
+        case _ => stay
+    }
+
+    if (autoStart) self ! Ready
+
+    private def fillPipeline(): Unit = {
+        if (pipelineOccupancy <= pipelineFillThreshold) {
+            Try {
+                // Grab next batch of messages and commit offsets immediately
+                // essentially marking the activation as having satisfied "at most once"
+                // semantics (this is the point at which the activation is considered started).
+                // If the commit fails, then messages peeked are peeked again on the next poll.
+                // While the commit is synchronous and will block until it completes, at steady
+                // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
+                // of the commit should be masked.
+                val records = consumer.peek(longPollDuration)
+                consumer.commit()
 
 Review comment:
   No reviewers assigned yet;)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services