You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/07/13 13:47:52 UTC

[incubator-openwhisk] branch master updated: Throttle message bus consumption. (#2425)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f33993  Throttle message bus consumption. (#2425)
0f33993 is described below

commit 0f3399372c6d7f047983707d8249707668daa746
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Thu Jul 13 06:47:49 2017 -0700

    Throttle message bus consumption. (#2425)
    
    Replaces unthrottled Kafka consumer with FSM actor that can throttle the consumption.
    Applied for active active acks and invoker health pings.
    Implement capacity based sends.
---
 .../connector/kafka/KafkaConsumerConnector.scala   |  29 ----
 .../main/scala/whisk/core/connector/Message.scala  |   2 +-
 .../whisk/core/connector/MessageConsumer.scala     | 192 ++++++++++++++++++++-
 .../core/loadBalancer/InvokerSupervision.scala     |  36 ++--
 .../core/loadBalancer/LoadBalancerService.scala    |  37 +++-
 .../whisk/core/containerpool/ContainerPool.scala   |   5 +-
 .../whisk/core/dispatcher/ActivationFeed.scala     | 122 -------------
 .../scala/whisk/core/dispatcher/Dispatcher.scala   |  29 ++--
 .../main/scala/whisk/core/invoker/Invoker.scala    |  20 +--
 .../scala/whisk/core/invoker/InvokerReactive.scala |   5 +-
 docs/actions.md                                    |   7 +-
 .../test/scala/services/KafkaConnectorTests.scala  |  54 +-----
 .../core/connector/test/MessageFeedTests.scala     | 191 ++++++++++++++++++++
 .../test/TestConnector.scala                       |  36 ++--
 .../containerpool/test/ContainerPoolTests.scala    |  14 +-
 .../core/controller/test/PackagesApiTests.scala    |   1 -
 .../core/dispatcher/test/DispatcherTests.scala     |  56 +++---
 .../test/InvokerSupervisionTests.scala             |   9 +-
 18 files changed, 508 insertions(+), 337 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 1042030..a138781 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -24,9 +24,7 @@ import scala.collection.JavaConversions.seqAsJavaList
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
 
-import org.apache.kafka.clients.consumer.CommitFailedException
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
@@ -62,34 +60,8 @@ class KafkaConsumerConnector(
      */
     def commit() = consumer.commitSync()
 
-    override def onMessage(process: (String, Int, Long, Array[Byte]) => Unit) = {
-        val self = this
-        val thread = new Thread() {
-            override def run() = {
-                while (!disconnect) {
-                    Try {
-                        // Grab next batch of messages and commit offsets immediately
-                        // It won't be processed twice (tested in "KafkaConnectorTests")
-                        val messages = peek()
-                        commit()
-                        messages
-                    } map {
-                        _.foreach { process.tupled(_) }
-                    } recover {
-                        case e: CommitFailedException => logging.error(self, s"failed to commit to kafka: ${e.getMessage}")
-                        case e: Throwable             => logging.error(self, s"exception while pulling new records: ${e.getMessage}")
-                    }
-                }
-                logging.warn(self, "consumer stream terminated")
-                consumer.close()
-            }
-        }
-        thread.start()
-    }
-
     override def close() = {
         logging.info(this, s"closing '$topic' consumer")
-        disconnect = true
     }
 
     private def getProps: Properties = {
@@ -122,5 +94,4 @@ class KafkaConsumerConnector(
     }
 
     private val consumer = getConsumer(getProps, Some(List(topic)))
-    private var disconnect = false
 }
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 3491c21..82c797d 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -32,7 +32,7 @@ import whisk.core.entity.WhiskActivation
 /** Basic trait for messages that are sent on a message bus connector. */
 trait Message {
     /**
-     * A transaction id to attach to the message. If not defined, defaults to 'dontcare' value.
+     * A transaction id to attach to the message.
      */
     val transid = TransactionId.unknown
 
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 359cddb..87353df 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -17,7 +17,18 @@
 
 package whisk.core.connector
 
-import scala.concurrent.duration.Duration
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.Failure
+
+import org.apache.kafka.clients.consumer.CommitFailedException
+
+import akka.actor.FSM
+import akka.pattern.pipe
+import whisk.common.Logging
+import whisk.common.TransactionId
 
 trait MessageConsumer {
 
@@ -38,15 +49,180 @@ trait MessageConsumer {
      * Commits offsets from last peek operation to ensure they are removed
      * from the connector.
      */
-    def commit()
-
-    /**
-     * Calls process for every message received. Process receives a tuple
-     * (topic, partition, offset, and message as byte array).
-     */
-    def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit
+    def commit(): Unit
 
     /** Closes consumer. */
     def close(): Unit
 
 }
+
+object MessageFeed {
+    protected sealed trait FeedState
+    protected[connector] case object Idle extends FeedState
+    protected[connector] case object FillingPipeline extends FeedState
+    protected[connector] case object DrainingPipeline extends FeedState
+
+    protected sealed trait FeedData
+    private case object NoData extends FeedData
+
+    /** Indicates the consumer is ready to accept messages from the message bus for processing. */
+    object Ready
+
+    /** Steady state message, indicates capacity in downstream process to receive more messages. */
+    object Processed
+
+    /** Indicates the fill operation has completed. */
+    private case class FillCompleted(messages: Seq[(String, Int, Long, Array[Byte])])
+}
+
+/**
+ * This actor polls the message bus for new messages and dispatches them to the given
+ * handler. The actor tracks the number of messages dispatched and will not dispatch new
+ * messages until some number of them are acknowledged.
+ *
+ * This is used by the invoker to pull messages from the message bus and apply back pressure
+ * when the invoker does not have resources to complete processing messages (i.e., no containers
+ * are available to run new actions). It is also used in the load balancer to consume active
+ * ack messages.
+ * When the invoker releases resources (by reclaiming containers) it will send a message
+ * to this actor which will then attempt to fill the pipeline with new messages.
+ *
+ * The actor tries to fill the pipeline with additional messages while the number
+ * of outstanding requests is below the pipeline fill threshold.
+ */
+@throws[IllegalArgumentException]
+class MessageFeed(
+    description: String,
+    logging: Logging,
+    consumer: MessageConsumer,
+    maximumHandlerCapacity: Int,
+    longPollDuration: FiniteDuration,
+    handler: Array[Byte] => Future[Unit],
+    autoStart: Boolean = true,
+    logHandoff: Boolean = true)
+    extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
+    import MessageFeed._
+
+    // double-buffer to make up for message bus read overhead
+    val maxPipelineDepth = maximumHandlerCapacity * 2
+    private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+
+    require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth")
+
+    private val outstandingMessages = mutable.Queue[(String, Int, Long, Array[Byte])]()
+    private var handlerCapacity = maximumHandlerCapacity
+
+    private implicit val tid = TransactionId.dispatcher
+
+    logging.info(this, s"handler capacity = $maximumHandlerCapacity, pipeline fill at = $pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
+
+    when(Idle) {
+        case Event(Ready, _) =>
+            fillPipeline()
+            goto(FillingPipeline)
+
+        case _ => stay
+    }
+
+    // wait for fill to complete, and keep filling if there is
+    // capacity otherwise wait to drain
+    when(FillingPipeline) {
+        case Event(Processed, _) =>
+            updateHandlerCapacity()
+            sendOutstandingMessages()
+            stay
+
+        case Event(FillCompleted(messages), _) =>
+            outstandingMessages.enqueue(messages: _*)
+            sendOutstandingMessages()
+
+            if (shouldFillQueue()) {
+                fillPipeline()
+                stay
+            } else {
+                goto(DrainingPipeline)
+            }
+
+        case _ => stay
+    }
+
+    when(DrainingPipeline) {
+        case Event(Processed, _) =>
+            updateHandlerCapacity()
+            sendOutstandingMessages()
+            if (shouldFillQueue()) {
+                fillPipeline()
+                goto(FillingPipeline)
+            } else stay
+
+        case _ => stay
+    }
+
+    onTransition { case _ -> Idle => if (autoStart) self ! Ready }
+    startWith(Idle, MessageFeed.NoData)
+    initialize()
+
+    private implicit val ec = context.system.dispatcher
+
+    private def fillPipeline(): Unit = {
+        if (outstandingMessages.size <= pipelineFillThreshold) {
+            Future {
+                // Grab next batch of messages and commit offsets immediately
+                // essentially marking the activation as having satisfied "at most once"
+                // semantics (this is the point at which the activation is considered started).
+                // If the commit fails, then messages peeked are peeked again on the next poll.
+                // While the commit is synchronous and will block until it completes, at steady
+                // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
+                // of the commit should be masked.
+                val records = consumer.peek(longPollDuration)
+                consumer.commit()
+                FillCompleted(records.toSeq)
+            }.andThen {
+                case Failure(e: CommitFailedException) => logging.error(this, s"failed to commit $description consumer offset: $e")
+                case Failure(e: Throwable)             => logging.error(this, s"exception while pulling new $description records: $e")
+            }.recover {
+                case _ => FillCompleted(Seq.empty)
+            }.pipeTo(self)
+        } else {
+            logging.error(this, s"dropping fill request until $description feed is drained")
+        }
+    }
+
+    /** Send as many messages as possible to the handler. */
+    @tailrec
+    private def sendOutstandingMessages(): Unit = {
+        val occupancy = outstandingMessages.size
+        if (occupancy > 0 && handlerCapacity > 0) {
+            val (topic, partition, offset, bytes) = outstandingMessages.dequeue()
+
+            if (logHandoff) logging.info(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
+            handler(bytes)
+            handlerCapacity -= 1
+
+            sendOutstandingMessages()
+        }
+    }
+
+    private def shouldFillQueue(): Boolean = {
+        val occupancy = outstandingMessages.size
+        if (occupancy <= pipelineFillThreshold) {
+            logging.debug(this, s"$description pipeline has capacity: $occupancy <= $pipelineFillThreshold ($handlerCapacity)")
+            true
+        } else {
+            logging.debug(this, s"$description pipeline must drain: $occupancy > $pipelineFillThreshold")
+            false
+        }
+    }
+
+    private def updateHandlerCapacity(): Int = {
+        logging.debug(self, s"$description received processed msg, current capacity = $handlerCapacity")
+
+        if (handlerCapacity < maximumHandlerCapacity) {
+            handlerCapacity += 1
+            handlerCapacity
+        } else {
+            if (handlerCapacity > maximumHandlerCapacity) logging.error(self, s"$description capacity already at max")
+            maximumHandlerCapacity
+        }
+    }
+}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index d7e9025..79eb76b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -37,30 +37,20 @@ import akka.actor.FSM.Transition
 import akka.actor.Props
 import akka.pattern.pipe
 import akka.util.Timeout
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+
 import whisk.common.AkkaLogging
 import whisk.common.ConsulKV.LoadBalancerKeys
 import whisk.common.KeyValueStore
 import whisk.common.LoggingMarkers
 import whisk.common.RingBuffer
 import whisk.common.TransactionId
-import whisk.core.connector.ActivationMessage
-import whisk.core.connector.MessageConsumer
-import whisk.core.connector.PingMessage
+import whisk.core.connector._
 import whisk.core.entitlement.Privilege.Privilege
 import whisk.core.entity.ActivationId.ActivationIdGenerator
-import whisk.core.entity.AuthKey
-import whisk.core.entity.CodeExecAsString
-import whisk.core.entity.DocRevision
-import whisk.core.entity.EntityName
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.Secret
-import whisk.core.entity.Subject
-import whisk.core.entity.UUID
-import whisk.core.entity.WhiskAction
+import whisk.core.entity._
 
 // Received events
 case object GetStatus
@@ -151,13 +141,23 @@ class InvokerPool(
     }
 
     /** Receive Ping messages from invokers. */
-    pingConsumer.onMessage((topic, _, _, bytes) => {
+    val pingPollDuration = 1.second
+    val invokerPingFeed = context.system.actorOf(Props {
+        new MessageFeed("ping", logging, pingConsumer, pingConsumer.maxPeek, pingPollDuration, processInvokerPing, logHandoff = false)
+    })
+
+    def processInvokerPing(bytes: Array[Byte]): Future[Unit] = Future {
         val raw = new String(bytes, StandardCharsets.UTF_8)
         PingMessage.parse(raw) match {
-            case Success(p: PingMessage) => self ! p
-            case Failure(t)              => logging.error(this, s"failed processing message: $raw with $t")
+            case Success(p: PingMessage) =>
+                self ! p
+                invokerPingFeed ! MessageFeed.Processed
+
+            case Failure(t) =>
+                invokerPingFeed ! MessageFeed.Processed
+                logging.error(this, s"failed processing message: $raw with $t")
         }
-    })
+    }
 }
 
 object InvokerPool {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 6534307..63655c9 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -33,8 +33,10 @@ import org.apache.kafka.clients.producer.RecordMetadata
 
 import akka.actor.ActorRefFactory
 import akka.actor.ActorSystem
+import akka.actor.Props
 import akka.pattern.ask
 import akka.util.Timeout
+
 import whisk.common.ConsulClient
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
@@ -44,6 +46,7 @@ import whisk.connector.kafka.KafkaProducerConnector
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig._
 import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
 import whisk.core.database.NoDocumentException
 import whisk.core.entity.{ ActivationId, WhiskAction, WhiskActivation }
@@ -81,7 +84,13 @@ trait LoadBalancer {
 
 }
 
-class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer {
+class LoadBalancerService(
+    config: WhiskConfig,
+    instance: InstanceId,
+    entityStore: EntityStore)(
+        implicit val actorSystem: ActorSystem,
+        logging: Logging)
+    extends LoadBalancer {
 
     /** The execution context for futures */
     implicit val executionContext: ExecutionContext = actorSystem.dispatcher
@@ -224,9 +233,10 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
             throw new IllegalStateException("cannot create test action for invoker health because runtime manifest is not valid")
         }
 
+        val maxPingsPerPoll = 128
         val consul = new ConsulClient(config.consulServer)
         // Each controller gets its own Group Id, to receive all messages
-        val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health")
+        val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll)
         val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props(instance), name)
 
         actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv, invoker => {
@@ -235,22 +245,33 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
         }, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer))
     }
 
-    /** Subscribes to active acks (completion messages from the invokers). */
-    private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}")
+    /**
+     * Subscribes to active acks (completion messages from the invokers), and
+     * registers a handler for received active acks from invokers.
+     */
+    val maxActiveAcksPerPoll = 128
+    val activeAckPollDuration = 1.second
 
-    /** Registers a handler for received active acks from invokers. */
-    activeAckConsumer.onMessage((topic, _, _, bytes) => {
+    private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
+    val activationFeed = actorSystem.actorOf(Props {
+        new MessageFeed("activeack", logging, activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck)
+    })
+
+    def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
         val raw = new String(bytes, StandardCharsets.UTF_8)
         CompletionMessage.parse(raw) match {
             case Success(m: CompletionMessage) =>
                 processCompletion(m.response, m.transid, false)
                 // treat left as success (as it is the result a the message exceeding the bus limit)
                 val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError)
+                activationFeed ! MessageFeed.Processed
                 invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess)
 
-            case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")
+            case Failure(t) =>
+                activationFeed ! MessageFeed.Processed
+                logging.error(this, s"failed processing message: $raw with $t")
         }
-    })
+    }
 
     /** Return a sorted list of available invokers. */
     private def availableInvokers: Future[Seq[String]] = invokerHealth.map {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 9a796ee..f1f51f5 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -24,12 +24,13 @@ import akka.actor.ActorRef
 import akka.actor.ActorRefFactory
 import akka.actor.Props
 import whisk.common.AkkaLogging
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
+
 import whisk.core.entity.ByteSize
 import whisk.core.entity.CodeExec
 import whisk.core.entity.EntityName
 import whisk.core.entity.ExecutableWhiskAction
 import whisk.core.entity.size._
+import whisk.core.connector.MessageFeed
 
 sealed trait WorkerState
 case object Busy extends WorkerState
@@ -124,7 +125,7 @@ class ContainerPool(
 
         // Activation completed
         case ActivationCompleted =>
-            feed ! ContainerReleased
+            feed ! MessageFeed.Processed
     }
 
     /** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
deleted file mode 100644
index 5ea54e7..0000000
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
-
-import org.apache.kafka.clients.consumer.CommitFailedException
-
-import akka.actor.Actor
-import akka.actor.actorRef2Scala
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.connector.MessageConsumer
-
-object ActivationFeed {
-    sealed class ActivationNotification
-
-    /** Pulls new messages from the message bus. */
-    case class FillQueueWithMessages()
-
-    /** Indicates resources are available because transaction completed, may cause pipeline fill. */
-    case object ContainerReleased extends ActivationNotification
-
-    /** Indicate resources are available because transaction failed, may cause pipeline fill. */
-    case class FailedActivation(tid: TransactionId) extends ActivationNotification
-}
-
-/**
- * This actor polls the message bus for new messages and dispatches them to the given
- * handler. The actor tracks the number of messages dispatched and will not dispatch new
- * messages until some number of them are acknowledged.
- *
- * This is used by the invoker to pull messages from the message bus and apply back pressure
- * when the invoker does not have resources to complete processing messages (i.e., no containers
- * are available to run new actions).
- *
- * When the invoker releases resources (by reclaiming containers) it will send a message
- * to this actor which will then attempt to fill the pipeline with new messages.
- *
- * The actor tries to fill the pipeline with additional messages while the number
- * of outstanding requests is below the pipeline fill threshold.
- */
-@throws[IllegalArgumentException]
-protected class ActivationFeed(
-    logging: Logging,
-    consumer: MessageConsumer,
-    maxPipelineDepth: Int,
-    longpollDuration: FiniteDuration,
-    handler: (String, Array[Byte]) => Any)
-    extends Actor {
-    import ActivationFeed.ActivationNotification
-    import ActivationFeed.FillQueueWithMessages
-
-    require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth")
-
-    private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
-    private var pipelineOccupancy = 0
-    private implicit val tid = TransactionId.dispatcher
-
-    override def receive = {
-        case FillQueueWithMessages =>
-            if (pipelineOccupancy <= pipelineFillThreshold) {
-                Try {
-                    // Grab next batch of messages and commit offsets immediately
-                    // essentially marking the activation as having satisfied "at most once"
-                    // semantics (this is the point at which the activation is considered started).
-                    // If the commit fails, then messages peeked are peeked again on the next poll.
-                    // While the commit is synchronous and will block until it completes, at steady
-                    // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
-                    // of the commit should be masked.
-                    val records = consumer.peek(longpollDuration)
-                    consumer.commit()
-                    (records, records.size)
-                } map {
-                    case (records, count) =>
-                        records foreach {
-                            case (topic, partition, offset, bytes) =>
-                                pipelineOccupancy += 1
-                                logging.info(this, s"processing $topic[$partition][$offset ($count)][pipelineOccupancy=${pipelineOccupancy} (${pipelineFillThreshold})]")
-                                handler(topic, bytes)
-                        }
-                } recover {
-                    case e: CommitFailedException => logging.error(this, s"failed to commit consumer offset: $e")
-                    case e: Throwable             => logging.error(this, s"exception while pulling new records: $e")
-                }
-                fill()
-            } else logging.debug(this, "dropping fill request until feed is drained")
-
-        case n: ActivationNotification =>
-            pipelineOccupancy -= 1
-            logging.info(this, s"received ActivationNotification: $n / pipelineOccupancy=$pipelineOccupancy / pipelineFillThreshold=$pipelineFillThreshold")
-            if (pipelineOccupancy < 0) {
-                logging.error(this, "pipelineOccupancy<0")
-            }
-            fill()
-    }
-
-    private def fill() = {
-        if (pipelineOccupancy <= pipelineFillThreshold) {
-            logging.debug(this, s"filling activation pipeline: $pipelineOccupancy <= $pipelineFillThreshold")
-            self ! FillQueueWithMessages
-        } else {
-            logging.info(this, s"waiting for activation pipeline to drain: $pipelineOccupancy > $pipelineFillThreshold")
-        }
-    }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
index 565acbc..fa0e74d 100644
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
+++ b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
@@ -17,6 +17,8 @@
 
 package whisk.core.dispatcher
 
+import java.nio.charset.StandardCharsets
+
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
@@ -28,9 +30,9 @@ import akka.actor.Props
 import akka.actor.actorRef2Scala
 import whisk.common.Counter
 import whisk.common.Logging
-import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
 import whisk.core.connector.MessageConsumer
+import whisk.core.connector.MessageFeed
 
 /**
  * Creates a dispatcher that pulls messages from the message pub/sub connector.
@@ -54,9 +56,10 @@ class Dispatcher(
         implicit logging: Logging)
     extends Registrar {
 
-    val activationFeed = actorSystem.actorOf(Props(new ActivationFeed(logging, consumer, maxPipelineDepth, pollDuration, process)))
+    // create activation request feed but do not start it, until the invoker is registered
+    val activationFeed = actorSystem.actorOf(Props(new MessageFeed("activation", logging, consumer, maxPipelineDepth, pollDuration, process, autoStart = false)))
 
-    def start() = activationFeed ! ActivationFeed.FillQueueWithMessages
+    def start() = activationFeed ! MessageFeed.Ready
     def stop() = consumer.close()
 
     /**
@@ -67,8 +70,8 @@ class Dispatcher(
      * A handler is registered via addHandler and unregistered via removeHandler.
      * There is typically only one handler.
      */
-    def process(topic: String, bytes: Array[Byte]) = {
-        val raw = new String(bytes, "utf-8")
+    def process(bytes: Array[Byte]): Future[Unit] = Future {
+        val raw = new String(bytes, StandardCharsets.UTF_8)
         ActivationMessage.parse(raw) match {
             case Success(m) =>
                 handlers foreach {
@@ -78,9 +81,8 @@ class Dispatcher(
         }
     }
 
-    private def handleMessage(handler: MessageHandler, msg: ActivationMessage) = {
+    private def handleMessage(handler: MessageHandler, msg: ActivationMessage): Unit = {
         implicit val tid = msg.transid
-        implicit val executionContext = actorSystem.dispatcher
 
         Future {
             val count = counter.next()
@@ -92,17 +94,13 @@ class Dispatcher(
         }
     }
 
-    private def inform(matchers: TrieMap[String, MessageHandler])(implicit transid: TransactionId) = {
-        val names = matchers map { _._2.name } reduce (_ + "," + _)
-        logging.debug(this, s"matching message to ${matchers.size} handlers: $names")
-        matchers
-    }
-
-    private def errorMsg(handler: MessageHandler, e: Throwable): String =
+    private def errorMsg(handler: MessageHandler, e: Throwable): String = {
         s"failed applying handler '${handler.name}': ${errorMsg(e)}"
+    }
 
-    private def errorMsg(msg: String, e: Throwable) =
+    private def errorMsg(msg: String, e: Throwable): String = {
         s"failed processing message: $msg $e${e.getStackTrace.mkString("", " ", "")}"
+    }
 
     private def errorMsg(e: Throwable): String = {
         if (e.isInstanceOf[java.util.concurrent.ExecutionException]) {
@@ -113,6 +111,7 @@ class Dispatcher(
     }
 
     private val counter = new Counter()
+    private implicit val executionContext = actorSystem.dispatcher
 }
 
 trait Registrar {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 58592e7..239370f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -27,6 +27,8 @@ import scala.language.postfixOps
 import scala.util.{ Failure, Success }
 import scala.util.Try
 
+import org.apache.kafka.common.errors.RecordTooLargeException
+
 import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala }
 import akka.japi.Creator
 import spray.json._
@@ -38,24 +40,18 @@ import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector }
 import whisk.core.WhiskConfig
 import whisk.core.WhiskConfig.{ consulServer, dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, whiskVersion, invokerUseReactivePool }
 import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.PingMessage
 import whisk.core.container._
 import whisk.core.dispatcher.{ Dispatcher, MessageHandler }
-import whisk.core.dispatcher.ActivationFeed.{ ActivationNotification, ContainerReleased, FailedActivation }
 import whisk.core.entity._
 import whisk.http.BasicHttpService
 import whisk.http.Messages
 import whisk.utils.ExecutionContextFactory
-import whisk.common.Scheduler
-import whisk.core.connector.PingMessage
-import scala.util.Try
-import whisk.core.connector.MessageProducer
-import org.apache.kafka.common.errors.RecordTooLargeException
-import whisk.core.entity.TimeLimit
 
 /**
- * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke".
+ * A message handler that invokes actions as directed by message on topic "/actions/invoke".
  * The message path must contain a fully qualified action name and an optional revision id.
  *
  * @param config the whisk configuration
@@ -119,7 +115,7 @@ class Invoker(
                             case Success(activation) =>
                                 transactionPromise.completeWith {
                                     // this completes the successful activation case (1)
-                                    completeTransaction(tran, activation, ContainerReleased)
+                                    completeTransaction(tran, activation)
                                 }
 
                             case Failure(t) =>
@@ -164,7 +160,7 @@ class Invoker(
         // send activate ack for failed activations
         sendActiveAck(tran, activationResult)
 
-        completeTransaction(tran, activationResult, FailedActivation(transid))
+        completeTransaction(tran, activationResult)
     }
 
     /*
@@ -173,7 +169,7 @@ class Invoker(
      * Invariant: Only one call to here succeeds.  Even though the sync block wrap WhiskActivation.put,
      *            it is only blocking this transaction which is finishing anyway.
      */
-    protected def completeTransaction(tran: Transaction, activation: WhiskActivation, releaseResource: ActivationNotification)(
+    protected def completeTransaction(tran: Transaction, activation: WhiskActivation)(
         implicit transid: TransactionId): Future[DocInfo] = {
         tran.synchronized {
             tran.result match {
@@ -183,7 +179,7 @@ class Invoker(
                     // Send a message to the activation feed indicating there is a free resource to handle another activation.
                     // Since all transaction completions flow through this method and the invariant is that the transaction is
                     // completed only once, there is only one completion message sent to the feed as a result.
-                    activationFeed ! releaseResource
+                    activationFeed ! MessageFeed.Processed
                     // Since there is no active action taken for completion from the invoker, writing activation record is it.
                     logging.info(this, "recording the activation result to the data store")
                     val result = WhiskActivation.put(activationStore, activation) andThen {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index d620e3d..456182b 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -36,6 +36,7 @@ import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.connector.ActivationMessage
 import whisk.core.connector.CompletionMessage
+import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessageProducer
 import whisk.core.container.{ ContainerPool => OldContainerPool }
 import whisk.core.container.Interval
@@ -47,13 +48,13 @@ import whisk.core.containerpool.docker.DockerClientWithFileAccess
 import whisk.core.containerpool.docker.DockerContainer
 import whisk.core.containerpool.docker.RuncClient
 import whisk.core.database.NoDocumentException
-import whisk.core.dispatcher.ActivationFeed.FailedActivation
 import whisk.core.dispatcher.MessageHandler
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
 import whisk.http.Messages
 
+
 class InvokerReactive(
     config: WhiskConfig,
     instance: InstanceId,
@@ -206,7 +207,7 @@ class InvokerReactive(
                         Parameters("path", msg.action.toString.toJson) ++ causedBy
                     })
 
-                activationFeed ! FailedActivation(msg.transid)
+                activationFeed ! MessageFeed.Processed
                 ack(msg.transid, activation, msg.rootControllerIndex)
                 store(msg.transid, activation)
         }
diff --git a/docs/actions.md b/docs/actions.md
index 7dcae8d..b479bc9 100644
--- a/docs/actions.md
+++ b/docs/actions.md
@@ -640,7 +640,7 @@ To avoid the cold-start delay, you can compile your Swift file into a binary and
   ```
   docker run --rm -it -v "$(pwd):/owexec" openwhisk/action-swift-v3.1.1 bash
   ```
-  This puts you in a bash shell within the Docker container. 
+  This puts you in a bash shell within the Docker container.
 
 - Copy the source code and prepare to build it.
   ```
@@ -658,7 +658,7 @@ To avoid the cold-start delay, you can compile your Swift file into a binary and
 - (Optional) Create the `Package.swift` file to add dependencies.
   ```swift
   import PackageDescription
-  
+
   let package = Package(
     name: "Action",
         dependencies: [
@@ -699,7 +699,7 @@ and so you should include them in your own `Package.swift`.
   exit
   ```
 
-  This has created hello.zip in the same directory as hello.swift. 
+  This has created hello.zip in the same directory as hello.swift.
 
 - Upload it to OpenWhisk with the action name helloSwifty:
   ```
@@ -731,6 +731,7 @@ For example, create a Java file called `Hello.java` with the following content:
 
 ```java
 import com.google.gson.JsonObject;
+
 public class Hello {
     public static JsonObject main(JsonObject args) {
         String name = "stranger";
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 60bb3d3..f73e554 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -21,10 +21,8 @@ import java.util.Calendar
 
 import scala.concurrent.Await
 import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
 import scala.language.postfixOps
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.kafka.clients.consumer.CommitFailedException
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
@@ -34,14 +32,12 @@ import org.scalatest.junit.JUnitRunner
 
 import common.StreamLogging
 import common.WskActorSystem
-import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.connector.kafka.KafkaConsumerConnector
 import whisk.connector.kafka.KafkaProducerConnector
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.utils.ExecutionContextFactory
-import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class KafkaConnectorTests
@@ -61,7 +57,7 @@ class KafkaConnectorTests
     val sessionTimeout = 10 seconds
     val maxPollInterval = 10 seconds
     val producer = new KafkaProducerConnector(config.kafkaHost, ec)
-    val consumer = new TestKafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval)
+    val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval)
 
     override def afterAll() {
         producer.close()
@@ -116,52 +112,4 @@ class KafkaConnectorTests
             } else consumer.commit()
         }
     }
-
-    it should "catch a failing commit" in {
-        val messageReceived = "message received"
-        consumer.onMessage((topic, partition, offset, bytes) => {
-            printstream.println(messageReceived)
-        })
-        val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-
-        // Send message while commit throws no exception -> Should be processed
-        consumer.commitFails = false
-        Await.result(producer.send(topic, message), 10 seconds)
-        retry(stream.toString should include(messageReceived), 20, Some(500 millisecond))
-
-        // Send message while commit throws exception -> Message will not be processed
-        consumer.commitFails = true
-        retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond))
-        Await.result(producer.send(topic, message), 10 seconds)
-        retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond))
-
-        // Send message again -> No commit exception -> Should work again
-        consumer.commitFails = false
-        Await.result(producer.send(topic, message), 10 seconds)
-        retry(StringUtils.countMatches(stream.toString, messageReceived) should be(2), 50, Some(100 milliseconds))
-
-        // Wait a few seconds and ensure that the message is not processed three times
-        Thread.sleep(5000)
-        StringUtils.countMatches(stream.toString, messageReceived) should be(2)
-    }
-
-}
-
-class TestKafkaConsumerConnector(
-    kafkahost: String,
-    groupid: String,
-    topic: String,
-    sessionTimeout: FiniteDuration,
-    maxPollInterval: FiniteDuration)(implicit logging: Logging) extends KafkaConsumerConnector(
-    kafkahost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval) {
-
-    override def commit() = {
-        if (commitFails) {
-            throw new CommitFailedException()
-        } else {
-            super.commit()
-        }
-    }
-
-    var commitFails = false
 }
diff --git a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
new file mode 100644
index 0000000..030a6d0
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.test
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.Buffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.FSM
+import akka.actor.FSM.CurrentState
+import akka.actor.FSM.SubscribeTransitionCallBack
+import akka.actor.FSM.Transition
+import akka.actor.PoisonPill
+import akka.actor.Props
+import akka.testkit.TestKit
+import common.StreamLogging
+import whisk.core.connector._
+import whisk.core.connector.MessageFeed._
+import whisk.utils.retry
+
+@RunWith(classOf[JUnitRunner])
+class MessageFeedTests
+    extends FlatSpecLike
+    with Matchers
+    with BeforeAndAfterEach
+    with BeforeAndAfterAll
+    with MockFactory
+    with StreamLogging {
+
+    val system = ActorSystem("MessageFeedTestSystem")
+    val actorsToDestroyAfterEach: Buffer[ActorRef] = Buffer.empty
+
+    override def afterEach() = actorsToDestroyAfterEach.foreach { _ ! PoisonPill }
+    override def afterAll() = TestKit.shutdownActorSystem(system)
+
+    case class Connector(autoStart: Boolean = true) extends TestKit(system) {
+        val peekCount = new AtomicInteger()
+
+        val consumer = new TestConnector("feedtest", 4, true) {
+            override def peek(duration: Duration) = {
+                peekCount.incrementAndGet()
+                super.peek(duration)
+            }
+        }
+
+        val sentCount = new AtomicInteger()
+
+        def fill(n: Int) = {
+            val msgs = (1 to n).map { _ =>
+                new Message {
+                    override def serialize = {
+                        sentCount.incrementAndGet().toString
+                    }
+                    override def toString = {
+                        s"message${sentCount.get}"
+                    }
+                }
+            }
+            consumer.send(msgs)
+        }
+
+        val receivedCount = new AtomicInteger()
+
+        def handler(bytes: Array[Byte]): Future[Unit] = {
+            Future.successful(receivedCount.incrementAndGet())
+        }
+
+        val fsm = childActorOf(Props(new MessageFeed(
+            "test",
+            logging,
+            consumer,
+            consumer.maxPeek,
+            200.milliseconds,
+            handler,
+            autoStart)))
+
+        actorsToDestroyAfterEach += (fsm, testActor)
+
+        def monitorTransitionsAndStart() = {
+            fsm ! SubscribeTransitionCallBack(testActor)
+            expectMsg(CurrentState(fsm, Idle))
+            fsm ! Ready
+            expectMsg(Transition(fsm, Idle, FillingPipeline))
+            this
+        }
+    }
+
+    def timeout(actor: ActorRef) = actor ! FSM.StateTimeout
+
+    it should "wait for ready before accepting messages" in {
+        val connector = Connector(autoStart = false)
+        connector.fsm ! SubscribeTransitionCallBack(connector.testActor)
+
+        // start idle
+        connector.expectMsg(CurrentState(connector.fsm, Idle))
+
+        // stay until received ready
+        connector.fsm ! FSM.StateTimeout // should be ignored
+        connector.fsm ! Processed // should be ignored
+        Thread.sleep(500.milliseconds.toMillis)
+        connector.peekCount.get shouldBe 0
+
+        // start filling
+        connector.fsm ! Ready
+        connector.expectMsg(Transition(connector.fsm, Idle, FillingPipeline))
+        retry(connector.peekCount.get should be > 0)
+    }
+
+    it should "auto start and start polling for messages" in {
+        val connector = Connector(autoStart = true)
+        // automatically start filling
+        retry(connector.peekCount.get should be > 0, 5, Some(200.milliseconds))
+    }
+
+    it should "stop polling for messages when the pipeline is full" in {
+        val connector = Connector(autoStart = false).monitorTransitionsAndStart()
+        // push enough to cause pipeline to exceed fill mark
+        connector.fill(connector.consumer.maxPeek * 2 + 1)
+        retry(connector.peekCount.get should be > 0)
+        retry(connector.receivedCount.get shouldBe connector.consumer.maxPeek, 10, Some(200.milliseconds))
+
+        val peeks = connector.peekCount.get
+        connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline))
+
+        connector.peekCount.get shouldBe peeks
+        connector.expectNoMsg(500.milliseconds)
+    }
+
+    it should "transition from drain to fill mode" in {
+        val connector = Connector(autoStart = false).monitorTransitionsAndStart()
+        println(connector.fsm.toString())
+        // push enough to cause pipeline to exceed fill mark
+        val sendCount = connector.consumer.maxPeek * 2 + 2
+        connector.fill(sendCount)
+        retry(connector.peekCount.get should be > 0)
+        retry(connector.receivedCount.get shouldBe connector.consumer.maxPeek, 10, Some(200.milliseconds))
+
+        val peeks = connector.peekCount.get
+        connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline))
+
+        // stay in drain mode, no more peeking
+        timeout(connector.fsm) // should be ignored
+        connector.expectNoMsg(500.milliseconds)
+        connector.peekCount.get shouldBe peeks // no new reads
+
+        // expecting overflow of 2 in the queue, which is true if all expected messages were sent
+        retry(connector.sentCount.get shouldBe sendCount, 5, Some(200.milliseconds))
+
+        // drain one, should stay in draining state
+        connector.fsm ! Processed
+        connector.expectNoMsg(500.milliseconds)
+        connector.peekCount.get shouldBe peeks // no new reads
+
+        // back to fill mode
+        connector.fsm ! Processed
+        connector.expectMsg(Transition(connector.fsm, DrainingPipeline, FillingPipeline))
+        retry(connector.peekCount.get should be >= (peeks + 1))
+
+        // should send back to drain mode
+        connector.fill(1)
+        connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline))
+
+        connector.expectNoMsg(500.milliseconds)
+    }
+}
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
similarity index 78%
rename from tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala
rename to tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 9e7fe34..b454c7c 100644
--- a/tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package whisk.core.dispatcher.test
+package whisk.core.connector.test
 
 import java.util.ArrayList
 import java.util.concurrent.LinkedBlockingQueue
 
-import scala.collection.JavaConversions.asScalaBuffer
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
+import scala.collection.JavaConversions._
 
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.Record
-
 import common.StreamLogging
+
 import whisk.common.Counter
 import whisk.core.connector.Message
 import whisk.core.connector.MessageConsumer
@@ -44,7 +44,6 @@ class TestConnector(
     override def peek(duration: Duration) = {
         val msgs = new ArrayList[Message]
         queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek)
-
         msgs map { m =>
             offset += 1
             (topic, -1, offset, m.serialize.getBytes)
@@ -59,23 +58,17 @@ class TestConnector(
         }
     }
 
-    override def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit = {
-        new Thread {
-            override def run() = while (!closed) {
-                val msg = queue.take()
-                logging.info(this, s"received message for '$topic'")
-                process(topic, -1, -1, msg.serialize.getBytes)
-                Thread.sleep(100) // let producer get in a send if any
-            }
-        }.start
-    }
-
     def occupancy = queue.size
 
     def send(msg: Message): Future[RecordMetadata] = {
         producer.send(topic, msg)
     }
 
+    def send(msgs: Seq[Message]): Future[RecordMetadata] = {
+        import scala.language.reflectiveCalls
+        producer.sendBulk(topic, msgs)
+    }
+
     def close() = {
         closed = true
         producer.close()
@@ -91,12 +84,23 @@ class TestConnector(
                 Future.failed(new IllegalStateException("failed to write msg"))
             }
         }
+
+        def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] = {
+            if (queue.addAll(msgs)) {
+                logging.info(this, s"put: ${msgs.length} messages")
+                Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, Record.NO_TIMESTAMP, -1, -1, -1))
+            } else {
+                logging.error(this, s"put failed: ${msgs.length} messages")
+                Future.failed(new IllegalStateException("failed to write msg"))
+            }
+        }
+
         def close() = {}
         def sentCount() = counter.next()
         val counter = new Counter()
     }
 
-    protected[test] var throwCommitException = false
+    var throwCommitException = false
     private val queue = new LinkedBlockingQueue[Message]()
     @volatile private var closed = false
     private var offset = -1L
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index f7bcdf1..866d370 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -38,11 +38,11 @@ import akka.testkit.TestProbe
 import whisk.common.TransactionId
 import whisk.core.connector.ActivationMessage
 import whisk.core.containerpool._
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.RuntimeManifest
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
+import whisk.core.connector.MessageFeed
 
 /**
  * Behavior tests for the ContainerPool
@@ -114,7 +114,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
 
         val pool = system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref))
         containers(0).send(pool, ActivationCompleted)
-        feed.expectMsg(ContainerReleased)
+        feed.expectMsg(MessageFeed.Processed)
     }
 
     /*
@@ -159,7 +159,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
         containers(0).send(pool, ActivationCompleted)
-        feed.expectMsg(ContainerReleased)
+        feed.expectMsg(MessageFeed.Processed)
         pool ! runMessageDifferentEverything
         containers(0).expectMsg(Remove)
         containers(1).expectMsg(runMessageDifferentEverything)
@@ -177,14 +177,14 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData(lastUsed = Instant.EPOCH)))
         containers(0).send(pool, ActivationCompleted)
-        feed.expectMsg(ContainerReleased)
+        feed.expectMsg(MessageFeed.Processed)
 
         // Run the second container, don't remove the first one
         pool ! runMessageDifferentEverything
         containers(1).expectMsg(runMessageDifferentEverything)
         containers(1).send(pool, NeedWork(warmedData(lastUsed = Instant.now)))
         containers(1).send(pool, ActivationCompleted)
-        feed.expectMsg(ContainerReleased)
+        feed.expectMsg(MessageFeed.Processed)
 
         pool ! runMessageDifferentNamespace
         containers(2).expectMsg(runMessageDifferentNamespace)
@@ -203,7 +203,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
         containers(0).send(pool, ActivationCompleted)
-        feed.expectMsg(ContainerReleased)
+        feed.expectMsg(MessageFeed.Processed)
         pool ! runMessageDifferentNamespace
         containers(0).expectMsg(Remove)
         containers(1).expectMsg(runMessageDifferentNamespace)
@@ -219,7 +219,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
         containers(0).send(pool, ActivationCompleted)
-        feed.expectMsg(ContainerReleased)
+        feed.expectMsg(MessageFeed.Processed)
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         pool ! runMessage //expect this message to be requeued since previous is incomplete.
diff --git a/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala
index d6d74c6..6488156 100644
--- a/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala
@@ -519,7 +519,6 @@ class PackagesApiTests extends ControllerTestCommon with WhiskPackagesApi {
             deletePackage(reference.docid)
             status should be(OK)
             val response = responseAs[WhiskPackage]
-            println(responseAs[String])
             response should be {
                 WhiskPackage(reference.namespace, reference.name, reference.binding,
                     version = reference.version.upPatch,
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
index 3084270..aafd64f 100644
--- a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
+++ b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
@@ -35,7 +35,8 @@ import spray.json.JsNumber
 import spray.json.JsObject
 import whisk.common.TransactionId
 import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.dispatcher.ActivationFeed
+import whisk.core.connector.MessageFeed
+import whisk.core.connector.test.TestConnector
 import whisk.core.dispatcher.Dispatcher
 import whisk.core.dispatcher.MessageHandler
 import whisk.core.entity._
@@ -79,12 +80,12 @@ class DispatcherTests
     }
 
     it should "send and receive a message from connector bus" in {
-        val maxdepth = 8
-        val half = maxdepth / 2
-        val connector = new TestConnector("test connector", maxdepth / 2, true)
+        val capacity = 4
+        val connector = new TestConnector("test connector", capacity, false)
+
         val messagesProcessed = new AtomicInteger()
         val handler = new TestRule({ msg => messagesProcessed.incrementAndGet() })
-        val dispatcher = new Dispatcher(connector, 100 milliseconds, maxdepth, actorSystem)
+        val dispatcher = new Dispatcher(connector, 100 milliseconds, capacity, actorSystem)
         dispatcher.addHandler(handler, true)
         dispatcher.start()
 
@@ -94,72 +95,57 @@ class DispatcherTests
                 Console.withErr(stream) {
                     retry({
                         val logs = stream.toString()
-                        logs should include regex (s"exception while pulling new records *.* commit failed")
+                        logs should include regex (s"exception while pulling new activation records *.* commit failed")
                     }, 10, Some(100 milliseconds))
 
                     connector.throwCommitException = false
                 }
             }
 
-            for (i <- 0 to half) {
+            for (i <- 0 until (2 * capacity + 1)) {
                 sendMessage(connector, i + 1)
             }
 
-            // wait until all messages are received at which point the
-            // dispatcher cannot drain anymore messages
-            withClue("the queue should be empty since all messages are drained") {
-                retry({
-                    connector.occupancy shouldBe 0
-                }, 10, Some(100 milliseconds))
-            }
-
+            // only process as many messages as we have downstream capacity
             withClue("messages processed") {
                 retry({
-                    messagesProcessed.get should be(half + 1)
+                    messagesProcessed.get should be(capacity)
                 }, 20, Some(100 milliseconds))
             }
 
             withClue("confirming dispatcher is in overflow state") {
                 val logs = stream.toString()
-                logs should include regex (s"waiting for activation pipeline to drain: ${half + 1} > $half")
+                logs should include regex (s"activation pipeline must drain: ${capacity + 1} > $capacity")
             }
 
             // send one message and check later that it remains in the connector
-            // at this point, total messages sent = half + 2
-            sendMessage(connector, half + 2)
-
-            withClue("confirming dispatcher will not consume additional messages when in overflow state") {
-                stream.reset()
-                Console.withOut(stream) {
-                    dispatcher.activationFeed ! ActivationFeed.FillQueueWithMessages
-                    retry({
-                        val logs = stream.toString()
-                        logs should include regex (s"dropping fill request until feed is drained")
-                        logs should not include regex(s"waiting for activation pipeline to drain: ${messagesProcessed.get} > $half")
-                    }, 10, Some(100 milliseconds))
-                }
-            }
+            // at this point, total messages sent = 2 * capacity + 2
+            connector.occupancy shouldBe 0
+            sendMessage(connector, 2 * capacity + 2)
+            Thread.sleep(1.second.toMillis)
 
             withClue("expecting message to still be in the queue") {
-                connector.occupancy shouldBe 1
+                retry({
+                    connector.occupancy shouldBe 1
+                }, 10, Some(100 milliseconds))
             }
 
             // unblock the pipeline by draining 1 activations and check
             // that dispatcher refilled the pipeline
             stream.reset()
             Console.withOut(stream) {
-                dispatcher.activationFeed ! ActivationFeed.ContainerReleased
+                dispatcher.activationFeed ! MessageFeed.Processed
                 // wait until additional message is drained
                 retry({
                     withClue("additional messages processed") {
-                        messagesProcessed.get shouldBe half + 2
+                        messagesProcessed.get shouldBe capacity + 1
                     }
                 }, 10, Some(100 milliseconds))
             }
 
             withClue("confirm dispatcher tried to fill the pipeline") {
                 val logs = stream.toString()
-                logs should include regex (s"filling activation pipeline: $half <= $half")
+                logs should include regex (s"activation pipeline has capacity: $capacity <= $capacity")
             }
         } finally {
             dispatcher.stop()
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index d3feabe..eaa0278 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -50,7 +50,6 @@ import whisk.common.KeyValueStore
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.connector.ActivationMessage
-import whisk.core.connector.MessageConsumer
 import whisk.core.connector.PingMessage
 import whisk.core.entitlement.Privilege.Privilege
 import whisk.core.entity.ActivationId.ActivationIdGenerator
@@ -75,6 +74,7 @@ import whisk.core.loadBalancer.InvokerState
 import whisk.core.loadBalancer.Offline
 import whisk.core.loadBalancer.UnHealthy
 import whisk.utils.retry
+import whisk.core.connector.test.TestConnector
 
 @RunWith(classOf[JUnitRunner])
 class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
@@ -101,6 +101,8 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
     /** Queries all invokers for their state */
     def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[Map[String, InvokerState]], timeout.duration)
 
+    val pC = new TestConnector("pingFeedTtest", 4, false) {}
+
     behavior of "InvokerPool"
 
     it should "successfully create invokers in its pool on ping and keep track of statechanges" in {
@@ -114,7 +116,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
 
         val kv = stub[KeyValueStore]
         val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
-        val pC = stub[MessageConsumer]
+
         val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
 
         within(timeout.duration) {
@@ -156,7 +158,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
         val kv = stub[KeyValueStore]
         val callback = stubFunction[String, Unit]
         val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
-        val pC = stub[MessageConsumer]
         val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, callback, sendActivationToInvoker, pC))
 
         within(timeout.duration) {
@@ -186,7 +187,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
         val childFactory = (f: ActorRefFactory, name: String) => invoker.ref
         val kv = stub[KeyValueStore]
         val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
-        val pC = stub[MessageConsumer]
 
         val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
 
@@ -213,7 +213,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
 
         val kv = stub[KeyValueStore]
         val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
-        val pC = stub[MessageConsumer]
 
         val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].