You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/07/13 13:47:52 UTC
[incubator-openwhisk] branch master updated: Throttle message bus
consumption. (#2425)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 0f33993 Throttle message bus consumption. (#2425)
0f33993 is described below
commit 0f3399372c6d7f047983707d8249707668daa746
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Thu Jul 13 06:47:49 2017 -0700
Throttle message bus consumption. (#2425)
Replaces unthrottled Kafka consumer with FSM actor that can throttle the consumption.
Applied for active active acks and invoker health pings.
Implement capacity based sends.
---
.../connector/kafka/KafkaConsumerConnector.scala | 29 ----
.../main/scala/whisk/core/connector/Message.scala | 2 +-
.../whisk/core/connector/MessageConsumer.scala | 192 ++++++++++++++++++++-
.../core/loadBalancer/InvokerSupervision.scala | 36 ++--
.../core/loadBalancer/LoadBalancerService.scala | 37 +++-
.../whisk/core/containerpool/ContainerPool.scala | 5 +-
.../whisk/core/dispatcher/ActivationFeed.scala | 122 -------------
.../scala/whisk/core/dispatcher/Dispatcher.scala | 29 ++--
.../main/scala/whisk/core/invoker/Invoker.scala | 20 +--
.../scala/whisk/core/invoker/InvokerReactive.scala | 5 +-
docs/actions.md | 7 +-
.../test/scala/services/KafkaConnectorTests.scala | 54 +-----
.../core/connector/test/MessageFeedTests.scala | 191 ++++++++++++++++++++
.../test/TestConnector.scala | 36 ++--
.../containerpool/test/ContainerPoolTests.scala | 14 +-
.../core/controller/test/PackagesApiTests.scala | 1 -
.../core/dispatcher/test/DispatcherTests.scala | 56 +++---
.../test/InvokerSupervisionTests.scala | 9 +-
18 files changed, 508 insertions(+), 337 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 1042030..a138781 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -24,9 +24,7 @@ import scala.collection.JavaConversions.seqAsJavaList
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
-import org.apache.kafka.clients.consumer.CommitFailedException
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
@@ -62,34 +60,8 @@ class KafkaConsumerConnector(
*/
def commit() = consumer.commitSync()
- override def onMessage(process: (String, Int, Long, Array[Byte]) => Unit) = {
- val self = this
- val thread = new Thread() {
- override def run() = {
- while (!disconnect) {
- Try {
- // Grab next batch of messages and commit offsets immediately
- // It won't be processed twice (tested in "KafkaConnectorTests")
- val messages = peek()
- commit()
- messages
- } map {
- _.foreach { process.tupled(_) }
- } recover {
- case e: CommitFailedException => logging.error(self, s"failed to commit to kafka: ${e.getMessage}")
- case e: Throwable => logging.error(self, s"exception while pulling new records: ${e.getMessage}")
- }
- }
- logging.warn(self, "consumer stream terminated")
- consumer.close()
- }
- }
- thread.start()
- }
-
override def close() = {
logging.info(this, s"closing '$topic' consumer")
- disconnect = true
}
private def getProps: Properties = {
@@ -122,5 +94,4 @@ class KafkaConsumerConnector(
}
private val consumer = getConsumer(getProps, Some(List(topic)))
- private var disconnect = false
}
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 3491c21..82c797d 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -32,7 +32,7 @@ import whisk.core.entity.WhiskActivation
/** Basic trait for messages that are sent on a message bus connector. */
trait Message {
/**
- * A transaction id to attach to the message. If not defined, defaults to 'dontcare' value.
+ * A transaction id to attach to the message.
*/
val transid = TransactionId.unknown
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 359cddb..87353df 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -17,7 +17,18 @@
package whisk.core.connector
-import scala.concurrent.duration.Duration
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.Failure
+
+import org.apache.kafka.clients.consumer.CommitFailedException
+
+import akka.actor.FSM
+import akka.pattern.pipe
+import whisk.common.Logging
+import whisk.common.TransactionId
trait MessageConsumer {
@@ -38,15 +49,180 @@ trait MessageConsumer {
* Commits offsets from last peek operation to ensure they are removed
* from the connector.
*/
- def commit()
-
- /**
- * Calls process for every message received. Process receives a tuple
- * (topic, partition, offset, and message as byte array).
- */
- def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit
+ def commit(): Unit
/** Closes consumer. */
def close(): Unit
}
+
+object MessageFeed {
+ protected sealed trait FeedState
+ protected[connector] case object Idle extends FeedState
+ protected[connector] case object FillingPipeline extends FeedState
+ protected[connector] case object DrainingPipeline extends FeedState
+
+ protected sealed trait FeedData
+ private case object NoData extends FeedData
+
+ /** Indicates the consumer is ready to accept messages from the message bus for processing. */
+ object Ready
+
+ /** Steady state message, indicates capacity in downstream process to receive more messages. */
+ object Processed
+
+ /** Indicates the fill operation has completed. */
+ private case class FillCompleted(messages: Seq[(String, Int, Long, Array[Byte])])
+}
+
+/**
+ * This actor polls the message bus for new messages and dispatches them to the given
+ * handler. The actor tracks the number of messages dispatched and will not dispatch new
+ * messages until some number of them are acknowledged.
+ *
+ * This is used by the invoker to pull messages from the message bus and apply back pressure
+ * when the invoker does not have resources to complete processing messages (i.e., no containers
+ * are available to run new actions). It is also used in the load balancer to consume active
+ * ack messages.
+ * When the invoker releases resources (by reclaiming containers) it will send a message
+ * to this actor which will then attempt to fill the pipeline with new messages.
+ *
+ * The actor tries to fill the pipeline with additional messages while the number
+ * of outstanding requests is below the pipeline fill threshold.
+ */
+@throws[IllegalArgumentException]
+class MessageFeed(
+ description: String,
+ logging: Logging,
+ consumer: MessageConsumer,
+ maximumHandlerCapacity: Int,
+ longPollDuration: FiniteDuration,
+ handler: Array[Byte] => Future[Unit],
+ autoStart: Boolean = true,
+ logHandoff: Boolean = true)
+ extends FSM[MessageFeed.FeedState, MessageFeed.FeedData] {
+ import MessageFeed._
+
+ // double-buffer to make up for message bus read overhead
+ val maxPipelineDepth = maximumHandlerCapacity * 2
+ private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
+
+ require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth")
+
+ private val outstandingMessages = mutable.Queue[(String, Int, Long, Array[Byte])]()
+ private var handlerCapacity = maximumHandlerCapacity
+
+ private implicit val tid = TransactionId.dispatcher
+
+ logging.info(this, s"handler capacity = $maximumHandlerCapacity, pipeline fill at = $pipelineFillThreshold, pipeline depth = $maxPipelineDepth")
+
+ when(Idle) {
+ case Event(Ready, _) =>
+ fillPipeline()
+ goto(FillingPipeline)
+
+ case _ => stay
+ }
+
+ // wait for fill to complete, and keep filling if there is
+ // capacity otherwise wait to drain
+ when(FillingPipeline) {
+ case Event(Processed, _) =>
+ updateHandlerCapacity()
+ sendOutstandingMessages()
+ stay
+
+ case Event(FillCompleted(messages), _) =>
+ outstandingMessages.enqueue(messages: _*)
+ sendOutstandingMessages()
+
+ if (shouldFillQueue()) {
+ fillPipeline()
+ stay
+ } else {
+ goto(DrainingPipeline)
+ }
+
+ case _ => stay
+ }
+
+ when(DrainingPipeline) {
+ case Event(Processed, _) =>
+ updateHandlerCapacity()
+ sendOutstandingMessages()
+ if (shouldFillQueue()) {
+ fillPipeline()
+ goto(FillingPipeline)
+ } else stay
+
+ case _ => stay
+ }
+
+ onTransition { case _ -> Idle => if (autoStart) self ! Ready }
+ startWith(Idle, MessageFeed.NoData)
+ initialize()
+
+ private implicit val ec = context.system.dispatcher
+
+ private def fillPipeline(): Unit = {
+ if (outstandingMessages.size <= pipelineFillThreshold) {
+ Future {
+ // Grab next batch of messages and commit offsets immediately
+ // essentially marking the activation as having satisfied "at most once"
+ // semantics (this is the point at which the activation is considered started).
+ // If the commit fails, then messages peeked are peeked again on the next poll.
+ // While the commit is synchronous and will block until it completes, at steady
+ // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
+ // of the commit should be masked.
+ val records = consumer.peek(longPollDuration)
+ consumer.commit()
+ FillCompleted(records.toSeq)
+ }.andThen {
+ case Failure(e: CommitFailedException) => logging.error(this, s"failed to commit $description consumer offset: $e")
+ case Failure(e: Throwable) => logging.error(this, s"exception while pulling new $description records: $e")
+ }.recover {
+ case _ => FillCompleted(Seq.empty)
+ }.pipeTo(self)
+ } else {
+ logging.error(this, s"dropping fill request until $description feed is drained")
+ }
+ }
+
+ /** Send as many messages as possible to the handler. */
+ @tailrec
+ private def sendOutstandingMessages(): Unit = {
+ val occupancy = outstandingMessages.size
+ if (occupancy > 0 && handlerCapacity > 0) {
+ val (topic, partition, offset, bytes) = outstandingMessages.dequeue()
+
+ if (logHandoff) logging.info(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
+ handler(bytes)
+ handlerCapacity -= 1
+
+ sendOutstandingMessages()
+ }
+ }
+
+ private def shouldFillQueue(): Boolean = {
+ val occupancy = outstandingMessages.size
+ if (occupancy <= pipelineFillThreshold) {
+ logging.debug(this, s"$description pipeline has capacity: $occupancy <= $pipelineFillThreshold ($handlerCapacity)")
+ true
+ } else {
+ logging.debug(this, s"$description pipeline must drain: $occupancy > $pipelineFillThreshold")
+ false
+ }
+ }
+
+ private def updateHandlerCapacity(): Int = {
+ logging.debug(self, s"$description received processed msg, current capacity = $handlerCapacity")
+
+ if (handlerCapacity < maximumHandlerCapacity) {
+ handlerCapacity += 1
+ handlerCapacity
+ } else {
+ if (handlerCapacity > maximumHandlerCapacity) logging.error(self, s"$description capacity already at max")
+ maximumHandlerCapacity
+ }
+ }
+}
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index d7e9025..79eb76b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -37,30 +37,20 @@ import akka.actor.FSM.Transition
import akka.actor.Props
import akka.pattern.pipe
import akka.util.Timeout
+
import spray.json._
import spray.json.DefaultJsonProtocol._
+
import whisk.common.AkkaLogging
import whisk.common.ConsulKV.LoadBalancerKeys
import whisk.common.KeyValueStore
import whisk.common.LoggingMarkers
import whisk.common.RingBuffer
import whisk.common.TransactionId
-import whisk.core.connector.ActivationMessage
-import whisk.core.connector.MessageConsumer
-import whisk.core.connector.PingMessage
+import whisk.core.connector._
import whisk.core.entitlement.Privilege.Privilege
import whisk.core.entity.ActivationId.ActivationIdGenerator
-import whisk.core.entity.AuthKey
-import whisk.core.entity.CodeExecAsString
-import whisk.core.entity.DocRevision
-import whisk.core.entity.EntityName
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.Secret
-import whisk.core.entity.Subject
-import whisk.core.entity.UUID
-import whisk.core.entity.WhiskAction
+import whisk.core.entity._
// Received events
case object GetStatus
@@ -151,13 +141,23 @@ class InvokerPool(
}
/** Receive Ping messages from invokers. */
- pingConsumer.onMessage((topic, _, _, bytes) => {
+ val pingPollDuration = 1.second
+ val invokerPingFeed = context.system.actorOf(Props {
+ new MessageFeed("ping", logging, pingConsumer, pingConsumer.maxPeek, pingPollDuration, processInvokerPing, logHandoff = false)
+ })
+
+ def processInvokerPing(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
PingMessage.parse(raw) match {
- case Success(p: PingMessage) => self ! p
- case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")
+ case Success(p: PingMessage) =>
+ self ! p
+ invokerPingFeed ! MessageFeed.Processed
+
+ case Failure(t) =>
+ invokerPingFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw with $t")
}
- })
+ }
}
object InvokerPool {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 6534307..63655c9 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -33,8 +33,10 @@ import org.apache.kafka.clients.producer.RecordMetadata
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
+import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
+
import whisk.common.ConsulClient
import whisk.common.Logging
import whisk.common.LoggingMarkers
@@ -44,6 +46,7 @@ import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
import whisk.core.database.NoDocumentException
import whisk.core.entity.{ ActivationId, WhiskAction, WhiskActivation }
@@ -81,7 +84,13 @@ trait LoadBalancer {
}
-class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer {
+class LoadBalancerService(
+ config: WhiskConfig,
+ instance: InstanceId,
+ entityStore: EntityStore)(
+ implicit val actorSystem: ActorSystem,
+ logging: Logging)
+ extends LoadBalancer {
/** The execution context for futures */
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
@@ -224,9 +233,10 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
throw new IllegalStateException("cannot create test action for invoker health because runtime manifest is not valid")
}
+ val maxPingsPerPoll = 128
val consul = new ConsulClient(config.consulServer)
// Each controller gets its own Group Id, to receive all messages
- val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health")
+ val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health", maxPeek = maxPingsPerPoll)
val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props(instance), name)
actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv, invoker => {
@@ -235,22 +245,33 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore
}, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer))
}
- /** Subscribes to active acks (completion messages from the invokers). */
- private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}")
+ /**
+ * Subscribes to active acks (completion messages from the invokers), and
+ * registers a handler for received active acks from invokers.
+ */
+ val maxActiveAcksPerPoll = 128
+ val activeAckPollDuration = 1.second
- /** Registers a handler for received active acks from invokers. */
- activeAckConsumer.onMessage((topic, _, _, bytes) => {
+ private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}", maxPeek = maxActiveAcksPerPoll)
+ val activationFeed = actorSystem.actorOf(Props {
+ new MessageFeed("activeack", logging, activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, processActiveAck)
+ })
+
+ def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
CompletionMessage.parse(raw) match {
case Success(m: CompletionMessage) =>
processCompletion(m.response, m.transid, false)
// treat left as success (as it is the result a the message exceeding the bus limit)
val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError)
+ activationFeed ! MessageFeed.Processed
invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess)
- case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")
+ case Failure(t) =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw with $t")
}
- })
+ }
/** Return a sorted list of available invokers. */
private def availableInvokers: Future[Seq[String]] = invokerHealth.map {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 9a796ee..f1f51f5 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -24,12 +24,13 @@ import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.Props
import whisk.common.AkkaLogging
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
+
import whisk.core.entity.ByteSize
import whisk.core.entity.CodeExec
import whisk.core.entity.EntityName
import whisk.core.entity.ExecutableWhiskAction
import whisk.core.entity.size._
+import whisk.core.connector.MessageFeed
sealed trait WorkerState
case object Busy extends WorkerState
@@ -124,7 +125,7 @@ class ContainerPool(
// Activation completed
case ActivationCompleted =>
- feed ! ContainerReleased
+ feed ! MessageFeed.Processed
}
/** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
deleted file mode 100644
index 5ea54e7..0000000
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.dispatcher
-
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Try
-
-import org.apache.kafka.clients.consumer.CommitFailedException
-
-import akka.actor.Actor
-import akka.actor.actorRef2Scala
-import whisk.common.Logging
-import whisk.common.TransactionId
-import whisk.core.connector.MessageConsumer
-
-object ActivationFeed {
- sealed class ActivationNotification
-
- /** Pulls new messages from the message bus. */
- case class FillQueueWithMessages()
-
- /** Indicates resources are available because transaction completed, may cause pipeline fill. */
- case object ContainerReleased extends ActivationNotification
-
- /** Indicate resources are available because transaction failed, may cause pipeline fill. */
- case class FailedActivation(tid: TransactionId) extends ActivationNotification
-}
-
-/**
- * This actor polls the message bus for new messages and dispatches them to the given
- * handler. The actor tracks the number of messages dispatched and will not dispatch new
- * messages until some number of them are acknowledged.
- *
- * This is used by the invoker to pull messages from the message bus and apply back pressure
- * when the invoker does not have resources to complete processing messages (i.e., no containers
- * are available to run new actions).
- *
- * When the invoker releases resources (by reclaiming containers) it will send a message
- * to this actor which will then attempt to fill the pipeline with new messages.
- *
- * The actor tries to fill the pipeline with additional messages while the number
- * of outstanding requests is below the pipeline fill threshold.
- */
-@throws[IllegalArgumentException]
-protected class ActivationFeed(
- logging: Logging,
- consumer: MessageConsumer,
- maxPipelineDepth: Int,
- longpollDuration: FiniteDuration,
- handler: (String, Array[Byte]) => Any)
- extends Actor {
- import ActivationFeed.ActivationNotification
- import ActivationFeed.FillQueueWithMessages
-
- require(consumer.maxPeek <= maxPipelineDepth, "consumer may not yield more messages per peek than permitted by max depth")
-
- private val pipelineFillThreshold = maxPipelineDepth - consumer.maxPeek
- private var pipelineOccupancy = 0
- private implicit val tid = TransactionId.dispatcher
-
- override def receive = {
- case FillQueueWithMessages =>
- if (pipelineOccupancy <= pipelineFillThreshold) {
- Try {
- // Grab next batch of messages and commit offsets immediately
- // essentially marking the activation as having satisfied "at most once"
- // semantics (this is the point at which the activation is considered started).
- // If the commit fails, then messages peeked are peeked again on the next poll.
- // While the commit is synchronous and will block until it completes, at steady
- // state with enough buffering (i.e., maxPipelineDepth > maxPeek), the latency
- // of the commit should be masked.
- val records = consumer.peek(longpollDuration)
- consumer.commit()
- (records, records.size)
- } map {
- case (records, count) =>
- records foreach {
- case (topic, partition, offset, bytes) =>
- pipelineOccupancy += 1
- logging.info(this, s"processing $topic[$partition][$offset ($count)][pipelineOccupancy=${pipelineOccupancy} (${pipelineFillThreshold})]")
- handler(topic, bytes)
- }
- } recover {
- case e: CommitFailedException => logging.error(this, s"failed to commit consumer offset: $e")
- case e: Throwable => logging.error(this, s"exception while pulling new records: $e")
- }
- fill()
- } else logging.debug(this, "dropping fill request until feed is drained")
-
- case n: ActivationNotification =>
- pipelineOccupancy -= 1
- logging.info(this, s"received ActivationNotification: $n / pipelineOccupancy=$pipelineOccupancy / pipelineFillThreshold=$pipelineFillThreshold")
- if (pipelineOccupancy < 0) {
- logging.error(this, "pipelineOccupancy<0")
- }
- fill()
- }
-
- private def fill() = {
- if (pipelineOccupancy <= pipelineFillThreshold) {
- logging.debug(this, s"filling activation pipeline: $pipelineOccupancy <= $pipelineFillThreshold")
- self ! FillQueueWithMessages
- } else {
- logging.info(this, s"waiting for activation pipeline to drain: $pipelineOccupancy > $pipelineFillThreshold")
- }
- }
-}
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
index 565acbc..fa0e74d 100644
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
+++ b/core/invoker/src/main/scala/whisk/core/dispatcher/Dispatcher.scala
@@ -17,6 +17,8 @@
package whisk.core.dispatcher
+import java.nio.charset.StandardCharsets
+
import scala.collection.concurrent.TrieMap
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
@@ -28,9 +30,9 @@ import akka.actor.Props
import akka.actor.actorRef2Scala
import whisk.common.Counter
import whisk.common.Logging
-import whisk.common.TransactionId
import whisk.core.connector.ActivationMessage
import whisk.core.connector.MessageConsumer
+import whisk.core.connector.MessageFeed
/**
* Creates a dispatcher that pulls messages from the message pub/sub connector.
@@ -54,9 +56,10 @@ class Dispatcher(
implicit logging: Logging)
extends Registrar {
- val activationFeed = actorSystem.actorOf(Props(new ActivationFeed(logging, consumer, maxPipelineDepth, pollDuration, process)))
+ // create activation request feed but do not start it, until the invoker is registered
+ val activationFeed = actorSystem.actorOf(Props(new MessageFeed("activation", logging, consumer, maxPipelineDepth, pollDuration, process, autoStart = false)))
- def start() = activationFeed ! ActivationFeed.FillQueueWithMessages
+ def start() = activationFeed ! MessageFeed.Ready
def stop() = consumer.close()
/**
@@ -67,8 +70,8 @@ class Dispatcher(
* A handler is registered via addHandler and unregistered via removeHandler.
* There is typically only one handler.
*/
- def process(topic: String, bytes: Array[Byte]) = {
- val raw = new String(bytes, "utf-8")
+ def process(bytes: Array[Byte]): Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
ActivationMessage.parse(raw) match {
case Success(m) =>
handlers foreach {
@@ -78,9 +81,8 @@ class Dispatcher(
}
}
- private def handleMessage(handler: MessageHandler, msg: ActivationMessage) = {
+ private def handleMessage(handler: MessageHandler, msg: ActivationMessage): Unit = {
implicit val tid = msg.transid
- implicit val executionContext = actorSystem.dispatcher
Future {
val count = counter.next()
@@ -92,17 +94,13 @@ class Dispatcher(
}
}
- private def inform(matchers: TrieMap[String, MessageHandler])(implicit transid: TransactionId) = {
- val names = matchers map { _._2.name } reduce (_ + "," + _)
- logging.debug(this, s"matching message to ${matchers.size} handlers: $names")
- matchers
- }
-
- private def errorMsg(handler: MessageHandler, e: Throwable): String =
+ private def errorMsg(handler: MessageHandler, e: Throwable): String = {
s"failed applying handler '${handler.name}': ${errorMsg(e)}"
+ }
- private def errorMsg(msg: String, e: Throwable) =
+ private def errorMsg(msg: String, e: Throwable): String = {
s"failed processing message: $msg $e${e.getStackTrace.mkString("", " ", "")}"
+ }
private def errorMsg(e: Throwable): String = {
if (e.isInstanceOf[java.util.concurrent.ExecutionException]) {
@@ -113,6 +111,7 @@ class Dispatcher(
}
private val counter = new Counter()
+ private implicit val executionContext = actorSystem.dispatcher
}
trait Registrar {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 58592e7..239370f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -27,6 +27,8 @@ import scala.language.postfixOps
import scala.util.{ Failure, Success }
import scala.util.Try
+import org.apache.kafka.common.errors.RecordTooLargeException
+
import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala }
import akka.japi.Creator
import spray.json._
@@ -38,24 +40,18 @@ import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector }
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig.{ consulServer, dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, whiskVersion, invokerUseReactivePool }
import whisk.core.connector.{ ActivationMessage, CompletionMessage }
+import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
import whisk.core.connector.PingMessage
import whisk.core.container._
import whisk.core.dispatcher.{ Dispatcher, MessageHandler }
-import whisk.core.dispatcher.ActivationFeed.{ ActivationNotification, ContainerReleased, FailedActivation }
import whisk.core.entity._
import whisk.http.BasicHttpService
import whisk.http.Messages
import whisk.utils.ExecutionContextFactory
-import whisk.common.Scheduler
-import whisk.core.connector.PingMessage
-import scala.util.Try
-import whisk.core.connector.MessageProducer
-import org.apache.kafka.common.errors.RecordTooLargeException
-import whisk.core.entity.TimeLimit
/**
- * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke".
+ * A message handler that invokes actions as directed by message on topic "/actions/invoke".
* The message path must contain a fully qualified action name and an optional revision id.
*
* @param config the whisk configuration
@@ -119,7 +115,7 @@ class Invoker(
case Success(activation) =>
transactionPromise.completeWith {
// this completes the successful activation case (1)
- completeTransaction(tran, activation, ContainerReleased)
+ completeTransaction(tran, activation)
}
case Failure(t) =>
@@ -164,7 +160,7 @@ class Invoker(
// send activate ack for failed activations
sendActiveAck(tran, activationResult)
- completeTransaction(tran, activationResult, FailedActivation(transid))
+ completeTransaction(tran, activationResult)
}
/*
@@ -173,7 +169,7 @@ class Invoker(
* Invariant: Only one call to here succeeds. Even though the sync block wrap WhiskActivation.put,
* it is only blocking this transaction which is finishing anyway.
*/
- protected def completeTransaction(tran: Transaction, activation: WhiskActivation, releaseResource: ActivationNotification)(
+ protected def completeTransaction(tran: Transaction, activation: WhiskActivation)(
implicit transid: TransactionId): Future[DocInfo] = {
tran.synchronized {
tran.result match {
@@ -183,7 +179,7 @@ class Invoker(
// Send a message to the activation feed indicating there is a free resource to handle another activation.
// Since all transaction completions flow through this method and the invariant is that the transaction is
// completed only once, there is only one completion message sent to the feed as a result.
- activationFeed ! releaseResource
+ activationFeed ! MessageFeed.Processed
// Since there is no active action taken for completion from the invoker, writing activation record is it.
logging.info(this, "recording the activation result to the data store")
val result = WhiskActivation.put(activationStore, activation) andThen {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index d620e3d..456182b 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -36,6 +36,7 @@ import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.connector.ActivationMessage
import whisk.core.connector.CompletionMessage
+import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
import whisk.core.container.{ ContainerPool => OldContainerPool }
import whisk.core.container.Interval
@@ -47,13 +48,13 @@ import whisk.core.containerpool.docker.DockerClientWithFileAccess
import whisk.core.containerpool.docker.DockerContainer
import whisk.core.containerpool.docker.RuncClient
import whisk.core.database.NoDocumentException
-import whisk.core.dispatcher.ActivationFeed.FailedActivation
import whisk.core.dispatcher.MessageHandler
import whisk.core.entity._
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
import whisk.http.Messages
+
class InvokerReactive(
config: WhiskConfig,
instance: InstanceId,
@@ -206,7 +207,7 @@ class InvokerReactive(
Parameters("path", msg.action.toString.toJson) ++ causedBy
})
- activationFeed ! FailedActivation(msg.transid)
+ activationFeed ! MessageFeed.Processed
ack(msg.transid, activation, msg.rootControllerIndex)
store(msg.transid, activation)
}
diff --git a/docs/actions.md b/docs/actions.md
index 7dcae8d..b479bc9 100644
--- a/docs/actions.md
+++ b/docs/actions.md
@@ -640,7 +640,7 @@ To avoid the cold-start delay, you can compile your Swift file into a binary and
```
docker run --rm -it -v "$(pwd):/owexec" openwhisk/action-swift-v3.1.1 bash
```
- This puts you in a bash shell within the Docker container.
+ This puts you in a bash shell within the Docker container.
- Copy the source code and prepare to build it.
```
@@ -658,7 +658,7 @@ To avoid the cold-start delay, you can compile your Swift file into a binary and
- (Optional) Create the `Package.swift` file to add dependencies.
```swift
import PackageDescription
-
+
let package = Package(
name: "Action",
dependencies: [
@@ -699,7 +699,7 @@ and so you should include them in your own `Package.swift`.
exit
```
- This has created hello.zip in the same directory as hello.swift.
+ This has created hello.zip in the same directory as hello.swift.
- Upload it to OpenWhisk with the action name helloSwifty:
```
@@ -731,6 +731,7 @@ For example, create a Java file called `Hello.java` with the following content:
```java
import com.google.gson.JsonObject;
+
public class Hello {
public static JsonObject main(JsonObject args) {
String name = "stranger";
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 60bb3d3..f73e554 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -21,10 +21,8 @@ import java.util.Calendar
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps
-import org.apache.commons.lang3.StringUtils
import org.apache.kafka.clients.consumer.CommitFailedException
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
@@ -34,14 +32,12 @@ import org.scalatest.junit.JUnitRunner
import common.StreamLogging
import common.WskActorSystem
-import whisk.common.Logging
import whisk.common.TransactionId
import whisk.connector.kafka.KafkaConsumerConnector
import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.connector.Message
import whisk.utils.ExecutionContextFactory
-import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class KafkaConnectorTests
@@ -61,7 +57,7 @@ class KafkaConnectorTests
val sessionTimeout = 10 seconds
val maxPollInterval = 10 seconds
val producer = new KafkaProducerConnector(config.kafkaHost, ec)
- val consumer = new TestKafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval)
+ val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval)
override def afterAll() {
producer.close()
@@ -116,52 +112,4 @@ class KafkaConnectorTests
} else consumer.commit()
}
}
-
- it should "catch a failing commit" in {
- val messageReceived = "message received"
- consumer.onMessage((topic, partition, offset, bytes) => {
- printstream.println(messageReceived)
- })
- val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-
- // Send message while commit throws no exception -> Should be processed
- consumer.commitFails = false
- Await.result(producer.send(topic, message), 10 seconds)
- retry(stream.toString should include(messageReceived), 20, Some(500 millisecond))
-
- // Send message while commit throws exception -> Message will not be processed
- consumer.commitFails = true
- retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond))
- Await.result(producer.send(topic, message), 10 seconds)
- retry(stream.toString should include("failed to commit to kafka:"), 50, Some(100 millisecond))
-
- // Send message again -> No commit exception -> Should work again
- consumer.commitFails = false
- Await.result(producer.send(topic, message), 10 seconds)
- retry(StringUtils.countMatches(stream.toString, messageReceived) should be(2), 50, Some(100 milliseconds))
-
- // Wait a few seconds and ensure that the message is not processed three times
- Thread.sleep(5000)
- StringUtils.countMatches(stream.toString, messageReceived) should be(2)
- }
-
-}
-
-class TestKafkaConsumerConnector(
- kafkahost: String,
- groupid: String,
- topic: String,
- sessionTimeout: FiniteDuration,
- maxPollInterval: FiniteDuration)(implicit logging: Logging) extends KafkaConsumerConnector(
- kafkahost, groupid, topic, sessionTimeout = sessionTimeout, maxPollInterval = maxPollInterval) {
-
- override def commit() = {
- if (commitFails) {
- throw new CommitFailedException()
- } else {
- super.commit()
- }
- }
-
- var commitFails = false
}
diff --git a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
new file mode 100644
index 0000000..030a6d0
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.test
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.Buffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import org.scalatest.junit.JUnitRunner
+
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.FSM
+import akka.actor.FSM.CurrentState
+import akka.actor.FSM.SubscribeTransitionCallBack
+import akka.actor.FSM.Transition
+import akka.actor.PoisonPill
+import akka.actor.Props
+import akka.testkit.TestKit
+import common.StreamLogging
+import whisk.core.connector._
+import whisk.core.connector.MessageFeed._
+import whisk.utils.retry
+
+@RunWith(classOf[JUnitRunner])
+class MessageFeedTests
+ extends FlatSpecLike
+ with Matchers
+ with BeforeAndAfterEach
+ with BeforeAndAfterAll
+ with MockFactory
+ with StreamLogging {
+
+ val system = ActorSystem("MessageFeedTestSystem")
+ val actorsToDestroyAfterEach: Buffer[ActorRef] = Buffer.empty
+
+ override def afterEach() = actorsToDestroyAfterEach.foreach { _ ! PoisonPill }
+ override def afterAll() = TestKit.shutdownActorSystem(system)
+
+ case class Connector(autoStart: Boolean = true) extends TestKit(system) {
+ val peekCount = new AtomicInteger()
+
+ val consumer = new TestConnector("feedtest", 4, true) {
+ override def peek(duration: Duration) = {
+ peekCount.incrementAndGet()
+ super.peek(duration)
+ }
+ }
+
+ val sentCount = new AtomicInteger()
+
+ def fill(n: Int) = {
+ val msgs = (1 to n).map { _ =>
+ new Message {
+ override def serialize = {
+ sentCount.incrementAndGet().toString
+ }
+ override def toString = {
+ s"message${sentCount.get}"
+ }
+ }
+ }
+ consumer.send(msgs)
+ }
+
+ val receivedCount = new AtomicInteger()
+
+ def handler(bytes: Array[Byte]): Future[Unit] = {
+ Future.successful(receivedCount.incrementAndGet())
+ }
+
+ val fsm = childActorOf(Props(new MessageFeed(
+ "test",
+ logging,
+ consumer,
+ consumer.maxPeek,
+ 200.milliseconds,
+ handler,
+ autoStart)))
+
+ actorsToDestroyAfterEach += (fsm, testActor)
+
+ def monitorTransitionsAndStart() = {
+ fsm ! SubscribeTransitionCallBack(testActor)
+ expectMsg(CurrentState(fsm, Idle))
+ fsm ! Ready
+ expectMsg(Transition(fsm, Idle, FillingPipeline))
+ this
+ }
+ }
+
+ def timeout(actor: ActorRef) = actor ! FSM.StateTimeout
+
+ it should "wait for ready before accepting messages" in {
+ val connector = Connector(autoStart = false)
+ connector.fsm ! SubscribeTransitionCallBack(connector.testActor)
+
+ // start idle
+ connector.expectMsg(CurrentState(connector.fsm, Idle))
+
+ // stay until received ready
+ connector.fsm ! FSM.StateTimeout // should be ignored
+ connector.fsm ! Processed // should be ignored
+ Thread.sleep(500.milliseconds.toMillis)
+ connector.peekCount.get shouldBe 0
+
+ // start filling
+ connector.fsm ! Ready
+ connector.expectMsg(Transition(connector.fsm, Idle, FillingPipeline))
+ retry(connector.peekCount.get should be > 0)
+ }
+
+ it should "auto start and start polling for messages" in {
+ val connector = Connector(autoStart = true)
+ // automatically start filling
+ retry(connector.peekCount.get should be > 0, 5, Some(200.milliseconds))
+ }
+
+ it should "stop polling for messages when the pipeline is full" in {
+ val connector = Connector(autoStart = false).monitorTransitionsAndStart()
+ // push enough to cause pipeline to exceed fill mark
+ connector.fill(connector.consumer.maxPeek * 2 + 1)
+ retry(connector.peekCount.get should be > 0)
+ retry(connector.receivedCount.get shouldBe connector.consumer.maxPeek, 10, Some(200.milliseconds))
+
+ val peeks = connector.peekCount.get
+ connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline))
+
+ connector.peekCount.get shouldBe peeks
+ connector.expectNoMsg(500.milliseconds)
+ }
+
+ it should "transition from drain to fill mode" in {
+ val connector = Connector(autoStart = false).monitorTransitionsAndStart()
+ println(connector.fsm.toString())
+ // push enough to cause pipeline to exceed fill mark
+ val sendCount = connector.consumer.maxPeek * 2 + 2
+ connector.fill(sendCount)
+ retry(connector.peekCount.get should be > 0)
+ retry(connector.receivedCount.get shouldBe connector.consumer.maxPeek, 10, Some(200.milliseconds))
+
+ val peeks = connector.peekCount.get
+ connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline))
+
+ // stay in drain mode, no more peeking
+ timeout(connector.fsm) // should be ignored
+ connector.expectNoMsg(500.milliseconds)
+ connector.peekCount.get shouldBe peeks // no new reads
+
+ // expecting overflow of 2 in the queue, which is true if all expected messages were sent
+ retry(connector.sentCount.get shouldBe sendCount, 5, Some(200.milliseconds))
+
+ // drain one, should stay in draining state
+ connector.fsm ! Processed
+ connector.expectNoMsg(500.milliseconds)
+ connector.peekCount.get shouldBe peeks // no new reads
+
+ // back to fill mode
+ connector.fsm ! Processed
+ connector.expectMsg(Transition(connector.fsm, DrainingPipeline, FillingPipeline))
+ retry(connector.peekCount.get should be >= (peeks + 1))
+
+ // should send back to drain mode
+ connector.fill(1)
+ connector.expectMsg(Transition(connector.fsm, FillingPipeline, DrainingPipeline))
+
+ connector.expectNoMsg(500.milliseconds)
+ }
+}
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
similarity index 78%
rename from tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala
rename to tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 9e7fe34..b454c7c 100644
--- a/tests/src/test/scala/whisk/core/dispatcher/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package whisk.core.dispatcher.test
+package whisk.core.connector.test
import java.util.ArrayList
import java.util.concurrent.LinkedBlockingQueue
-import scala.collection.JavaConversions.asScalaBuffer
import scala.concurrent.Future
import scala.concurrent.duration.Duration
+import scala.collection.JavaConversions._
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.Record
-
import common.StreamLogging
+
import whisk.common.Counter
import whisk.core.connector.Message
import whisk.core.connector.MessageConsumer
@@ -44,7 +44,6 @@ class TestConnector(
override def peek(duration: Duration) = {
val msgs = new ArrayList[Message]
queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek)
-
msgs map { m =>
offset += 1
(topic, -1, offset, m.serialize.getBytes)
@@ -59,23 +58,17 @@ class TestConnector(
}
}
- override def onMessage(process: (String, Int, Long, Array[Byte]) => Unit): Unit = {
- new Thread {
- override def run() = while (!closed) {
- val msg = queue.take()
- logging.info(this, s"received message for '$topic'")
- process(topic, -1, -1, msg.serialize.getBytes)
- Thread.sleep(100) // let producer get in a send if any
- }
- }.start
- }
-
def occupancy = queue.size
def send(msg: Message): Future[RecordMetadata] = {
producer.send(topic, msg)
}
+ def send(msgs: Seq[Message]): Future[RecordMetadata] = {
+ import scala.language.reflectiveCalls
+ producer.sendBulk(topic, msgs)
+ }
+
def close() = {
closed = true
producer.close()
@@ -91,12 +84,23 @@ class TestConnector(
Future.failed(new IllegalStateException("failed to write msg"))
}
}
+
+ def sendBulk(topic: String, msgs: Seq[Message]): Future[RecordMetadata] = {
+ if (queue.addAll(msgs)) {
+ logging.info(this, s"put: ${msgs.length} messages")
+ Future.successful(new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size, Record.NO_TIMESTAMP, -1, -1, -1))
+ } else {
+ logging.error(this, s"put failed: ${msgs.length} messages")
+ Future.failed(new IllegalStateException("failed to write msg"))
+ }
+ }
+
def close() = {}
def sentCount() = counter.next()
val counter = new Counter()
}
- protected[test] var throwCommitException = false
+ var throwCommitException = false
private val queue = new LinkedBlockingQueue[Message]()
@volatile private var closed = false
private var offset = -1L
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index f7bcdf1..866d370 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -38,11 +38,11 @@ import akka.testkit.TestProbe
import whisk.common.TransactionId
import whisk.core.connector.ActivationMessage
import whisk.core.containerpool._
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
import whisk.core.entity._
import whisk.core.entity.ExecManifest.RuntimeManifest
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
+import whisk.core.connector.MessageFeed
/**
* Behavior tests for the ContainerPool
@@ -114,7 +114,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
val pool = system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref))
containers(0).send(pool, ActivationCompleted)
- feed.expectMsg(ContainerReleased)
+ feed.expectMsg(MessageFeed.Processed)
}
/*
@@ -159,7 +159,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
containers(0).send(pool, ActivationCompleted)
- feed.expectMsg(ContainerReleased)
+ feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentEverything
containers(0).expectMsg(Remove)
containers(1).expectMsg(runMessageDifferentEverything)
@@ -177,14 +177,14 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(lastUsed = Instant.EPOCH)))
containers(0).send(pool, ActivationCompleted)
- feed.expectMsg(ContainerReleased)
+ feed.expectMsg(MessageFeed.Processed)
// Run the second container, don't remove the first one
pool ! runMessageDifferentEverything
containers(1).expectMsg(runMessageDifferentEverything)
containers(1).send(pool, NeedWork(warmedData(lastUsed = Instant.now)))
containers(1).send(pool, ActivationCompleted)
- feed.expectMsg(ContainerReleased)
+ feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentNamespace
containers(2).expectMsg(runMessageDifferentNamespace)
@@ -203,7 +203,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
containers(0).send(pool, ActivationCompleted)
- feed.expectMsg(ContainerReleased)
+ feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentNamespace
containers(0).expectMsg(Remove)
containers(1).expectMsg(runMessageDifferentNamespace)
@@ -219,7 +219,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
containers(0).send(pool, ActivationCompleted)
- feed.expectMsg(ContainerReleased)
+ feed.expectMsg(MessageFeed.Processed)
pool ! runMessage
containers(0).expectMsg(runMessage)
pool ! runMessage //expect this message to be requeued since previous is incomplete.
diff --git a/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala
index d6d74c6..6488156 100644
--- a/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/PackagesApiTests.scala
@@ -519,7 +519,6 @@ class PackagesApiTests extends ControllerTestCommon with WhiskPackagesApi {
deletePackage(reference.docid)
status should be(OK)
val response = responseAs[WhiskPackage]
- println(responseAs[String])
response should be {
WhiskPackage(reference.namespace, reference.name, reference.binding,
version = reference.version.upPatch,
diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
index 3084270..aafd64f 100644
--- a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
+++ b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala
@@ -35,7 +35,8 @@ import spray.json.JsNumber
import spray.json.JsObject
import whisk.common.TransactionId
import whisk.core.connector.{ ActivationMessage => Message }
-import whisk.core.dispatcher.ActivationFeed
+import whisk.core.connector.MessageFeed
+import whisk.core.connector.test.TestConnector
import whisk.core.dispatcher.Dispatcher
import whisk.core.dispatcher.MessageHandler
import whisk.core.entity._
@@ -79,12 +80,12 @@ class DispatcherTests
}
it should "send and receive a message from connector bus" in {
- val maxdepth = 8
- val half = maxdepth / 2
- val connector = new TestConnector("test connector", maxdepth / 2, true)
+ val capacity = 4
+ val connector = new TestConnector("test connector", capacity, false)
+
val messagesProcessed = new AtomicInteger()
val handler = new TestRule({ msg => messagesProcessed.incrementAndGet() })
- val dispatcher = new Dispatcher(connector, 100 milliseconds, maxdepth, actorSystem)
+ val dispatcher = new Dispatcher(connector, 100 milliseconds, capacity, actorSystem)
dispatcher.addHandler(handler, true)
dispatcher.start()
@@ -94,72 +95,57 @@ class DispatcherTests
Console.withErr(stream) {
retry({
val logs = stream.toString()
- logs should include regex (s"exception while pulling new records *.* commit failed")
+ logs should include regex (s"exception while pulling new activation records *.* commit failed")
}, 10, Some(100 milliseconds))
connector.throwCommitException = false
}
}
- for (i <- 0 to half) {
+ for (i <- 0 until (2 * capacity + 1)) {
sendMessage(connector, i + 1)
}
- // wait until all messages are received at which point the
- // dispatcher cannot drain anymore messages
- withClue("the queue should be empty since all messages are drained") {
- retry({
- connector.occupancy shouldBe 0
- }, 10, Some(100 milliseconds))
- }
-
+ // only process as many messages as we have downstream capacity
withClue("messages processed") {
retry({
- messagesProcessed.get should be(half + 1)
+ messagesProcessed.get should be(capacity)
}, 20, Some(100 milliseconds))
}
withClue("confirming dispatcher is in overflow state") {
val logs = stream.toString()
- logs should include regex (s"waiting for activation pipeline to drain: ${half + 1} > $half")
+ logs should include regex (s"activation pipeline must drain: ${capacity + 1} > $capacity")
}
// send one message and check later that it remains in the connector
- // at this point, total messages sent = half + 2
- sendMessage(connector, half + 2)
-
- withClue("confirming dispatcher will not consume additional messages when in overflow state") {
- stream.reset()
- Console.withOut(stream) {
- dispatcher.activationFeed ! ActivationFeed.FillQueueWithMessages
- retry({
- val logs = stream.toString()
- logs should include regex (s"dropping fill request until feed is drained")
- logs should not include regex(s"waiting for activation pipeline to drain: ${messagesProcessed.get} > $half")
- }, 10, Some(100 milliseconds))
- }
- }
+ // at this point, total messages sent = 2 * capacity + 2
+ connector.occupancy shouldBe 0
+ sendMessage(connector, 2 * capacity + 2)
+ Thread.sleep(1.second.toMillis)
withClue("expecting message to still be in the queue") {
- connector.occupancy shouldBe 1
+ retry({
+ connector.occupancy shouldBe 1
+ }, 10, Some(100 milliseconds))
}
// unblock the pipeline by draining 1 activations and check
// that dispatcher refilled the pipeline
stream.reset()
Console.withOut(stream) {
- dispatcher.activationFeed ! ActivationFeed.ContainerReleased
+ dispatcher.activationFeed ! MessageFeed.Processed
// wait until additional message is drained
retry({
withClue("additional messages processed") {
- messagesProcessed.get shouldBe half + 2
+ messagesProcessed.get shouldBe capacity + 1
}
}, 10, Some(100 milliseconds))
}
withClue("confirm dispatcher tried to fill the pipeline") {
val logs = stream.toString()
- logs should include regex (s"filling activation pipeline: $half <= $half")
+ logs should include regex (s"activation pipeline has capacity: $capacity <= $capacity")
}
} finally {
dispatcher.stop()
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index d3feabe..eaa0278 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -50,7 +50,6 @@ import whisk.common.KeyValueStore
import whisk.common.TransactionId
import whisk.core.WhiskConfig
import whisk.core.connector.ActivationMessage
-import whisk.core.connector.MessageConsumer
import whisk.core.connector.PingMessage
import whisk.core.entitlement.Privilege.Privilege
import whisk.core.entity.ActivationId.ActivationIdGenerator
@@ -75,6 +74,7 @@ import whisk.core.loadBalancer.InvokerState
import whisk.core.loadBalancer.Offline
import whisk.core.loadBalancer.UnHealthy
import whisk.utils.retry
+import whisk.core.connector.test.TestConnector
@RunWith(classOf[JUnitRunner])
class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
@@ -101,6 +101,8 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
/** Queries all invokers for their state */
def allStates(pool: ActorRef) = Await.result(pool.ask(GetStatus).mapTo[Map[String, InvokerState]], timeout.duration)
+ val pC = new TestConnector("pingFeedTtest", 4, false) {}
+
behavior of "InvokerPool"
it should "successfully create invokers in its pool on ping and keep track of statechanges" in {
@@ -114,7 +116,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
val kv = stub[KeyValueStore]
val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
- val pC = stub[MessageConsumer]
+
val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
within(timeout.duration) {
@@ -156,7 +158,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
val kv = stub[KeyValueStore]
val callback = stubFunction[String, Unit]
val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
- val pC = stub[MessageConsumer]
val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, callback, sendActivationToInvoker, pC))
within(timeout.duration) {
@@ -186,7 +187,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
val childFactory = (f: ActorRefFactory, name: String) => invoker.ref
val kv = stub[KeyValueStore]
val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
- val pC = stub[MessageConsumer]
val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
@@ -213,7 +213,6 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
val kv = stub[KeyValueStore]
val sendActivationToInvoker = stubFunction[ActivationMessage, String, Future[RecordMetadata]]
- val pC = stub[MessageConsumer]
val supervisor = system.actorOf(InvokerPool.props(childFactory, kv, () => _, sendActivationToInvoker, pC))
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].