You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/06/06 05:23:46 UTC
[incubator-openwhisk] branch master updated: Fix activation feed
book-keeping in reactive pool (#2319)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ed39c03 Fix activation feed book-keeping in reactive pool (#2319)
ed39c03 is described below
commit ed39c03a0b9c1f5ab96da68cde1448e2ccfeb8d5
Author: Sven Lange-Last <sv...@de.ibm.com>
AuthorDate: Tue Jun 6 07:23:44 2017 +0200
Fix activation feed book-keeping in reactive pool (#2319)
The old mechanism piggybacked on the `NeedWork` message to also claim free resources to the ActivationFeed. That is incomplete in that this won’t signal free resources for various failure scenarios.
This decouples the `NeedWork` signal from the `ActivationComplete` signal to be able to pull in new ActivationMessages from the ActivationFeed reliably.
Fixes #2285.
---
.../whisk/core/containerpool/ContainerPool.scala | 17 ++--
.../whisk/core/containerpool/ContainerProxy.scala | 91 +++++++++++++++-------
.../whisk/core/dispatcher/ActivationFeed.scala | 8 +-
.../scala/whisk/core/invoker/InvokerReactive.scala | 4 +-
.../containerpool/test/ContainerPoolTests.scala | 7 +-
.../containerpool/test/ContainerProxyTests.scala | 6 +-
6 files changed, 91 insertions(+), 42 deletions(-)
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 2ef3bae..1af51bf 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -51,7 +51,7 @@ case class WorkerData(data: ContainerData, state: WorkerState)
* Prewarm containers are only used, if they have matching arguments
* (kind, memory) and there is space in the pool.
*
- * @param childFactory method to create new containers
+ * @param childFactory method to create new container proxy actors
* @param maxPoolSize maximum size of containers allowed in the pool
* @param feed actor to request more work from
* @param prewarmConfig optional settings for container prewarming
@@ -95,7 +95,7 @@ class ContainerPool(
pool.get(actor) match {
case Some(w) =>
pool.update(actor, WorkerData(w.data, Busy))
- actor ! r
+ actor ! r // forwards the run request to the container
case None =>
logging.error(this, "actor data not found")
self ! r
@@ -106,17 +106,16 @@ class ContainerPool(
}
// Container is free to take more work
- case NeedWork(data: WarmedData) =>
- pool.update(sender(), WorkerData(data, Free))
- feed ! ContainerReleased
+ case NeedWork(data: WarmedData) => pool.update(sender(), WorkerData(data, Free))
// Container is prewarmed and ready to take work
- case NeedWork(data: PreWarmedData) =>
- prewarmedPool.update(sender(), WorkerData(data, Free))
+ case NeedWork(data: PreWarmedData) => prewarmedPool.update(sender(), WorkerData(data, Free))
// Container got removed
- case ContainerRemoved =>
- pool.remove(sender())
+ case ContainerRemoved => pool.remove(sender())
+
+ // Activation completed
+ case ActivationCompleted => feed ! ContainerReleased
}
/** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 025a448..4aa77a7 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -37,6 +37,8 @@ import whisk.core.entity._
import whisk.core.entity.size._
import whisk.common.Counter
import whisk.core.entity.ExecManifest.ImageName
+import whisk.common.AkkaLogging
+import whisk.http.Messages
// States
sealed trait ContainerState
@@ -64,6 +66,12 @@ case object Remove
case class NeedWork(data: ContainerData)
case object ContainerPaused
case object ContainerRemoved
+/**
+ * Indicates the container resource is now free to receive a new request.
+ * This message is sent to the parent which in turn notifies the feed that a
+ * resource slot is available.
+ */
+case object ActivationCompleted
/**
* A proxy that wraps a Container. It is used to keep track of the lifecycle
@@ -88,6 +96,7 @@ class ContainerProxy(
sendActiveAck: (TransactionId, WhiskActivation) => Future[Any],
storeActivation: (TransactionId, WhiskActivation) => Future[Any]) extends FSM[ContainerState, ContainerData] with Stash {
implicit val ec = context.system.dispatcher
+ val logging = new AkkaLogging(context.system.log)
// The container is destroyed after this period of time
val unusedTimeout = 10.minutes
@@ -99,7 +108,7 @@ class ContainerProxy(
startWith(Uninitialized, NoData())
when(Uninitialized) {
- // pre warm a container
+ // pre warm a container (creates a stem cell container)
case Event(job: Start, _) =>
factory(
TransactionId.invokerWarmup,
@@ -112,32 +121,49 @@ class ContainerProxy(
goto(Starting)
- // cold start
+ // cold start (no container to reuse or available stem cell container)
case Event(job: Run, _) =>
implicit val transid = job.msg.transid
- factory(
+
+ // create a new container
+ val container = factory(
job.msg.transid,
ContainerProxy.containerName(job.msg.user.namespace.name, job.action.name.name),
job.action.exec.image,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB)
- .andThen {
- case Success(container) => self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB)
- case Failure(t) =>
- val response = t match {
- case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg)
- case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg)
- case _ => ActivationResponse.whiskError(t.getMessage)
- }
- val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
- sendActiveAck(transid, activation)
- storeActivation(transid, activation)
- }
- .flatMap {
- container =>
- initializeAndRun(container, job)
- .map(_ => WarmedData(container, job.msg.user.namespace, job.action, Instant.now))
- }.pipeTo(self)
+
+ // container factory will either yield a new container ready to execute the action, or
+ // starting up the container failed; for the latter, it's either an internal error starting
+ // a container or a docker action that is not conforming to the required action API
+ container.andThen {
+ case Success(container) =>
+ // the container is ready to accept an activation; register it as PreWarmed; this
+ // normalizes the life cycle for containers and their cleanup when activations fail
+ self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB)
+
+ case Failure(t) =>
+ // the container did not come up cleanly, so disambiguate the failure mode and then cleanup
+ // the failure is either the system fault, or for docker actions, the application/developer fault
+ val response = t match {
+ case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg)
+ case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg)
+ case _ => ActivationResponse.whiskError(t.getMessage)
+ }
+ // construct an appropriate activation and record it in the datastore,
+ // also update the feed and active ack; the container cleanup is queued
+ // implicitly via a FailureMessage which will be processed later when the state
+ // transitions to Running
+ val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
+ self ! ActivationCompleted
+ sendActiveAck(transid, activation)
+ storeActivation(transid, activation)
+ }.flatMap {
+ container =>
+ // now attempt to inject the user code and run the action
+ initializeAndRun(container, job)
+ .map(_ => WarmedData(container, job.msg.user.namespace, job.action, Instant.now))
+ }.pipeTo(self)
goto(Running)
}
@@ -189,6 +215,11 @@ class ContainerProxy(
context.parent ! ContainerRemoved
stop()
+ // Activation finished either successfully or not
+ case Event(ActivationCompleted, _) =>
+ context.parent ! ActivationCompleted
+ stay
+
case _ => delay
}
@@ -210,10 +241,7 @@ class ContainerProxy(
}
when(Pausing) {
- case Event(ContainerPaused, data: WarmedData) =>
- context.parent ! NeedWork(data)
- goto(Paused)
-
+ case Event(ContainerPaused, data: WarmedData) => goto(Paused)
case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data.container)
case _ => delay
}
@@ -284,6 +312,11 @@ class ContainerProxy(
/**
* Runs the job, initialize first if necessary.
+ * Completes the job by:
+ * 1. sending an activate ack,
+ * 2. fetching the logs for the run,
+ * 3. indicating the resource is free to the parent pool,
+ * 4. recording the result to the data store
*
* @param container the container to run the job on
* @param job the job to run
@@ -319,13 +352,18 @@ class ContainerProxy(
}.recover {
case InitializationError(interval, response) =>
ContainerProxy.constructWhiskActivation(job, interval, response)
+ case t =>
+ // Actually, this should never happen - but we want to make sure to not miss a problem
+ logging.error(this, s"caught unexpected error while running activation: ${t}")
+ ContainerProxy.constructWhiskActivation(job, Interval.zero, ActivationResponse.whiskError(Messages.abnormalRun))
}
// Sending active ack and storing the activation are concurrent side-effects
// and do not block further execution of the future. They are completely
// asynchronous.
activation.andThen {
- case Success(activation) => sendActiveAck(tid, activation)
+ // the activation future will always complete with Success
+ case Success(ack) => sendActiveAck(tid, ack)
}.flatMap { activation =>
container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs =>
activation.withLogs(ActivationLogs(logs.toVector))
@@ -333,6 +371,7 @@ class ContainerProxy(
}.andThen {
case Success(activation) => storeActivation(tid, activation)
}.flatMap { activation =>
+ self ! ActivationCompleted
// Fail the future iff the activation was unsuccessful to facilitate
// better cleanup logic.
if (activation.response.isSuccess) Future.successful(activation)
@@ -353,7 +392,7 @@ object ContainerProxy {
* Generates a unique container name.
*
* @param prefix the container name's prefix
- * @param suffic the container name's suffix
+ * @param suffix the container name's suffix
* @return a unique container name
*/
def containerName(prefix: String, suffix: String) =
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
index b839713..716d688 100644
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
+++ b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
@@ -90,8 +90,8 @@ protected class ActivationFeed(
case (records, count) =>
records foreach {
case (topic, partition, offset, bytes) =>
- logging.info(this, s"processing $topic[$partition][$offset ($count)]")
pipelineOccupancy += 1
+ logging.info(this, s"processing $topic[$partition][$offset ($count)][pipelineOccupancy=${pipelineOccupancy} (${pipelineFillThreshold})]")
handler(topic, bytes)
}
} recover {
@@ -101,8 +101,12 @@ protected class ActivationFeed(
fill()
} else logging.debug(this, "dropping fill request until feed is drained")
- case _: ActivationNotification =>
+ case n: ActivationNotification =>
pipelineOccupancy -= 1
+ logging.info(this, s"received ActivationNotification: $n / pipelineOccupancy=$pipelineOccupancy / pipelineFillThreshold=$pipelineFillThreshold")
+ if (pipelineOccupancy < 0) {
+ logging.error(this, "pipelineOccupancy<0")
+ }
fill()
}
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index f6ab973..1d07de9 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -42,11 +42,11 @@ import whisk.core.containerpool.Run
import whisk.core.containerpool.docker.DockerClientWithFileAccess
import whisk.core.containerpool.docker.DockerContainer
import whisk.core.containerpool.docker.RuncClient
+import whisk.core.dispatcher.ActivationFeed.FailedActivation
import whisk.core.dispatcher.MessageHandler
import whisk.core.entity._
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
import whisk.core.containerpool.ContainerPool
import whisk.core.database.NoDocumentException
import whisk.http.Messages
@@ -192,7 +192,7 @@ class InvokerReactive(
Parameters("path", msg.action.toString.toJson) ++ causedBy
})
- activationFeed ! ContainerReleased
+ activationFeed ! FailedActivation(msg.transid)
ack(msg.transid, activation)
store(msg.transid, activation)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 2513f9b..567fe6d 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -106,12 +106,12 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
behavior of "ContainerPool"
- it should "indicate free resources to the feed only if a warm container responds" in within(timeout) {
+ it should "indicate free resources to the feed once activations finish" in within(timeout) {
val (containers, factory) = testContainers(1)
val feed = TestProbe()
val pool = system.actorOf(ContainerPool.props(factory, 0, feed.ref))
- containers(0).send(pool, NeedWork(warmedData()))
+ containers(0).send(pool, ActivationCompleted)
feed.expectMsg(ContainerReleased)
}
@@ -156,6 +156,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
+ containers(0).send(pool, ActivationCompleted)
feed.expectMsg(ContainerReleased)
pool ! runMessageDifferentEverything
containers(0).expectMsg(Remove)
@@ -171,6 +172,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
+ containers(0).send(pool, ActivationCompleted)
feed.expectMsg(ContainerReleased)
pool ! runMessageDifferentNamespace
containers(0).expectMsg(Remove)
@@ -186,6 +188,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
+ containers(0).send(pool, ActivationCompleted)
feed.expectMsg(ContainerReleased)
pool ! runMessage
containers(0).expectMsg(runMessage)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 2094756..0701ef6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -104,6 +104,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
def run(machine: ActorRef, currentState: ContainerState) = {
machine ! Run(action, message)
expectMsg(Transition(machine, currentState, Running))
+ expectMsg(ActivationCompleted)
expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Running, Ready))
}
@@ -124,7 +125,6 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
/** Expect the container to pause successfully */
def expectPause(machine: ActorRef) = {
expectMsg(Transition(machine, Ready, Pausing))
- expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Pausing, Paused))
}
@@ -271,6 +271,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
+ expectMsg(ActivationCompleted)
expectMsg(ContainerRemoved)
awaitAssert {
@@ -299,6 +300,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
+ expectMsg(ActivationCompleted)
expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -327,6 +329,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
+ expectMsg(ActivationCompleted)
expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -426,6 +429,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
// Finish /init, note that /run and log-collecting happens nonetheless
initPromise.success(Interval.zero)
+ expectMsg(ActivationCompleted)
expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Running, Ready))
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].