You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/06/06 05:23:46 UTC

[incubator-openwhisk] branch master updated: Fix activation feed book-keeping in reactive pool (#2319)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ed39c03  Fix activation feed book-keeping in reactive pool (#2319)
ed39c03 is described below

commit ed39c03a0b9c1f5ab96da68cde1448e2ccfeb8d5
Author: Sven Lange-Last <sv...@de.ibm.com>
AuthorDate: Tue Jun 6 07:23:44 2017 +0200

    Fix activation feed book-keeping in reactive pool (#2319)
    
    The old mechanism piggybacked on the `NeedWork` message to also claim free resources to the ActivationFeed. That is incomplete in that this won’t signal free resources for various failure scenarios.
    
    This decouples the `NeedWork` signal from the `ActivationComplete` signal to be able to pull in new ActivationMessages from the ActivationFeed reliably.
    
    Fixes #2285.
---
 .../whisk/core/containerpool/ContainerPool.scala   | 17 ++--
 .../whisk/core/containerpool/ContainerProxy.scala  | 91 +++++++++++++++-------
 .../whisk/core/dispatcher/ActivationFeed.scala     |  8 +-
 .../scala/whisk/core/invoker/InvokerReactive.scala |  4 +-
 .../containerpool/test/ContainerPoolTests.scala    |  7 +-
 .../containerpool/test/ContainerProxyTests.scala   |  6 +-
 6 files changed, 91 insertions(+), 42 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 2ef3bae..1af51bf 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -51,7 +51,7 @@ case class WorkerData(data: ContainerData, state: WorkerState)
  * Prewarm containers are only used, if they have matching arguments
  * (kind, memory) and there is space in the pool.
  *
- * @param childFactory method to create new containers
+ * @param childFactory method to create new container proxy actors
  * @param maxPoolSize maximum size of containers allowed in the pool
  * @param feed actor to request more work from
  * @param prewarmConfig optional settings for container prewarming
@@ -95,7 +95,7 @@ class ContainerPool(
                     pool.get(actor) match {
                         case Some(w) =>
                             pool.update(actor, WorkerData(w.data, Busy))
-                            actor ! r
+                            actor ! r // forwards the run request to the container
                         case None =>
                             logging.error(this, "actor data not found")
                             self ! r
@@ -106,17 +106,16 @@ class ContainerPool(
             }
 
         // Container is free to take more work
-        case NeedWork(data: WarmedData) =>
-            pool.update(sender(), WorkerData(data, Free))
-            feed ! ContainerReleased
+        case NeedWork(data: WarmedData)    => pool.update(sender(), WorkerData(data, Free))
 
         // Container is prewarmed and ready to take work
-        case NeedWork(data: PreWarmedData) =>
-            prewarmedPool.update(sender(), WorkerData(data, Free))
+        case NeedWork(data: PreWarmedData) => prewarmedPool.update(sender(), WorkerData(data, Free))
 
         // Container got removed
-        case ContainerRemoved =>
-            pool.remove(sender())
+        case ContainerRemoved              => pool.remove(sender())
+
+        // Activation completed
+        case ActivationCompleted           => feed ! ContainerReleased
     }
 
     /** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 025a448..4aa77a7 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -37,6 +37,8 @@ import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.common.Counter
 import whisk.core.entity.ExecManifest.ImageName
+import whisk.common.AkkaLogging
+import whisk.http.Messages
 
 // States
 sealed trait ContainerState
@@ -64,6 +66,12 @@ case object Remove
 case class NeedWork(data: ContainerData)
 case object ContainerPaused
 case object ContainerRemoved
+/**
+ * Indicates the container resource is now free to receive a new request.
+ * This message is sent to the parent which in turn notifies the feed that a
+ * resource slot is available.
+ */
+case object ActivationCompleted
 
 /**
  * A proxy that wraps a Container. It is used to keep track of the lifecycle
@@ -88,6 +96,7 @@ class ContainerProxy(
     sendActiveAck: (TransactionId, WhiskActivation) => Future[Any],
     storeActivation: (TransactionId, WhiskActivation) => Future[Any]) extends FSM[ContainerState, ContainerData] with Stash {
     implicit val ec = context.system.dispatcher
+    val logging = new AkkaLogging(context.system.log)
 
     // The container is destroyed after this period of time
     val unusedTimeout = 10.minutes
@@ -99,7 +108,7 @@ class ContainerProxy(
     startWith(Uninitialized, NoData())
 
     when(Uninitialized) {
-        // pre warm a container
+        // pre warm a container (creates a stem cell container)
         case Event(job: Start, _) =>
             factory(
                 TransactionId.invokerWarmup,
@@ -112,32 +121,49 @@ class ContainerProxy(
 
             goto(Starting)
 
-        // cold start
+        // cold start (no container to reuse or available stem cell container)
         case Event(job: Run, _) =>
             implicit val transid = job.msg.transid
-            factory(
+
+            // create a new container
+            val container = factory(
                 job.msg.transid,
                 ContainerProxy.containerName(job.msg.user.namespace.name, job.action.name.name),
                 job.action.exec.image,
                 job.action.exec.pull,
                 job.action.limits.memory.megabytes.MB)
-                .andThen {
-                    case Success(container) => self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB)
-                    case Failure(t) =>
-                        val response = t match {
-                            case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg)
-                            case BlackboxStartupError(msg)       => ActivationResponse.applicationError(msg)
-                            case _                               => ActivationResponse.whiskError(t.getMessage)
-                        }
-                        val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
-                        sendActiveAck(transid, activation)
-                        storeActivation(transid, activation)
-                }
-                .flatMap {
-                    container =>
-                        initializeAndRun(container, job)
-                            .map(_ => WarmedData(container, job.msg.user.namespace, job.action, Instant.now))
-                }.pipeTo(self)
+
+            // container factory will either yield a new container ready to execute the action, or
+            // starting up the container failed; for the latter, it's either an internal error starting
+            // a container or a docker action that is not conforming to the required action API
+            container.andThen {
+                case Success(container) =>
+                    // the container is ready to accept an activation; register it as PreWarmed; this
+                    // normalizes the life cycle for containers and their cleanup when activations fail
+                    self ! PreWarmedData(container, job.action.exec.kind, job.action.limits.memory.megabytes.MB)
+
+                case Failure(t) =>
+                    // the container did not come up cleanly, so disambiguate the failure mode and then cleanup
+                    // the failure is either the system fault, or for docker actions, the application/developer fault
+                    val response = t match {
+                        case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg)
+                        case BlackboxStartupError(msg)       => ActivationResponse.applicationError(msg)
+                        case _                               => ActivationResponse.whiskError(t.getMessage)
+                    }
+                    // construct an appropriate activation and record it in the datastore,
+                    // also update the feed and active ack; the container cleanup is queued
+                    // implicitly via a FailureMessage which will be processed later when the state
+                    // transitions to Running
+                    val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
+                    self ! ActivationCompleted
+                    sendActiveAck(transid, activation)
+                    storeActivation(transid, activation)
+            }.flatMap {
+                container =>
+                    // now attempt to inject the user code and run the action
+                    initializeAndRun(container, job)
+                        .map(_ => WarmedData(container, job.msg.user.namespace, job.action, Instant.now))
+            }.pipeTo(self)
 
             goto(Running)
     }
@@ -189,6 +215,11 @@ class ContainerProxy(
             context.parent ! ContainerRemoved
             stop()
 
+        // Activation finished either successfully or not
+        case Event(ActivationCompleted, _) =>
+            context.parent ! ActivationCompleted
+            stay
+
         case _ => delay
     }
 
@@ -210,10 +241,7 @@ class ContainerProxy(
     }
 
     when(Pausing) {
-        case Event(ContainerPaused, data: WarmedData) =>
-            context.parent ! NeedWork(data)
-            goto(Paused)
-
+        case Event(ContainerPaused, data: WarmedData) => goto(Paused)
         case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data.container)
         case _ => delay
     }
@@ -284,6 +312,11 @@ class ContainerProxy(
 
     /**
      * Runs the job, initialize first if necessary.
+     * Completes the job by:
+     * 1. sending an activate ack,
+     * 2. fetching the logs for the run,
+     * 3. indicating the resource is free to the parent pool,
+     * 4. recording the result to the data store
      *
      * @param container the container to run the job on
      * @param job the job to run
@@ -319,13 +352,18 @@ class ContainerProxy(
         }.recover {
             case InitializationError(interval, response) =>
                 ContainerProxy.constructWhiskActivation(job, interval, response)
+            case t =>
+                // Actually, this should never happen - but we want to make sure to not miss a problem
+                logging.error(this, s"caught unexpected error while running activation: ${t}")
+                ContainerProxy.constructWhiskActivation(job, Interval.zero, ActivationResponse.whiskError(Messages.abnormalRun))
         }
 
         // Sending active ack and storing the activation are concurrent side-effects
         // and do not block further execution of the future. They are completely
         // asynchronous.
         activation.andThen {
-            case Success(activation) => sendActiveAck(tid, activation)
+            // the activation future will always complete with Success
+            case Success(ack) => sendActiveAck(tid, ack)
         }.flatMap { activation =>
             container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs =>
                 activation.withLogs(ActivationLogs(logs.toVector))
@@ -333,6 +371,7 @@ class ContainerProxy(
         }.andThen {
             case Success(activation) => storeActivation(tid, activation)
         }.flatMap { activation =>
+            self ! ActivationCompleted
             // Fail the future iff the activation was unsuccessful to facilitate
             // better cleanup logic.
             if (activation.response.isSuccess) Future.successful(activation)
@@ -353,7 +392,7 @@ object ContainerProxy {
      * Generates a unique container name.
      *
      * @param prefix the container name's prefix
-     * @param suffic the container name's suffix
+     * @param suffix the container name's suffix
      * @return a unique container name
      */
     def containerName(prefix: String, suffix: String) =
diff --git a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
index b839713..716d688 100644
--- a/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
+++ b/core/invoker/src/main/scala/whisk/core/dispatcher/ActivationFeed.scala
@@ -90,8 +90,8 @@ protected class ActivationFeed(
                     case (records, count) =>
                         records foreach {
                             case (topic, partition, offset, bytes) =>
-                                logging.info(this, s"processing $topic[$partition][$offset ($count)]")
                                 pipelineOccupancy += 1
+                                logging.info(this, s"processing $topic[$partition][$offset ($count)][pipelineOccupancy=${pipelineOccupancy} (${pipelineFillThreshold})]")
                                 handler(topic, bytes)
                         }
                 } recover {
@@ -101,8 +101,12 @@ protected class ActivationFeed(
                 fill()
             } else logging.debug(this, "dropping fill request until feed is drained")
 
-        case _: ActivationNotification =>
+        case n: ActivationNotification =>
             pipelineOccupancy -= 1
+            logging.info(this, s"received ActivationNotification: $n / pipelineOccupancy=$pipelineOccupancy / pipelineFillThreshold=$pipelineFillThreshold")
+            if (pipelineOccupancy < 0) {
+                logging.error(this, "pipelineOccupancy<0")
+            }
             fill()
     }
 
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index f6ab973..1d07de9 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -42,11 +42,11 @@ import whisk.core.containerpool.Run
 import whisk.core.containerpool.docker.DockerClientWithFileAccess
 import whisk.core.containerpool.docker.DockerContainer
 import whisk.core.containerpool.docker.RuncClient
+import whisk.core.dispatcher.ActivationFeed.FailedActivation
 import whisk.core.dispatcher.MessageHandler
 import whisk.core.entity._
 import whisk.core.entity.ExecManifest.ImageName
 import whisk.core.entity.size._
-import whisk.core.dispatcher.ActivationFeed.ContainerReleased
 import whisk.core.containerpool.ContainerPool
 import whisk.core.database.NoDocumentException
 import whisk.http.Messages
@@ -192,7 +192,7 @@ class InvokerReactive(
                         Parameters("path", msg.action.toString.toJson) ++ causedBy
                     })
 
-                activationFeed ! ContainerReleased
+                activationFeed ! FailedActivation(msg.transid)
                 ack(msg.transid, activation)
                 store(msg.transid, activation)
         }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 2513f9b..567fe6d 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -106,12 +106,12 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
 
     behavior of "ContainerPool"
 
-    it should "indicate free resources to the feed only if a warm container responds" in within(timeout) {
+    it should "indicate free resources to the feed once activations finish" in within(timeout) {
         val (containers, factory) = testContainers(1)
         val feed = TestProbe()
 
         val pool = system.actorOf(ContainerPool.props(factory, 0, feed.ref))
-        containers(0).send(pool, NeedWork(warmedData()))
+        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(ContainerReleased)
     }
 
@@ -156,6 +156,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
+        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(ContainerReleased)
         pool ! runMessageDifferentEverything
         containers(0).expectMsg(Remove)
@@ -171,6 +172,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
+        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(ContainerReleased)
         pool ! runMessageDifferentNamespace
         containers(0).expectMsg(Remove)
@@ -186,6 +188,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
+        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(ContainerReleased)
         pool ! runMessage
         containers(0).expectMsg(runMessage)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 2094756..0701ef6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -104,6 +104,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
     def run(machine: ActorRef, currentState: ContainerState) = {
         machine ! Run(action, message)
         expectMsg(Transition(machine, currentState, Running))
+        expectMsg(ActivationCompleted)
         expectWarmed(invocationNamespace.name, action)
         expectMsg(Transition(machine, Running, Ready))
     }
@@ -124,7 +125,6 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
     /** Expect the container to pause successfully */
     def expectPause(machine: ActorRef) = {
         expectMsg(Transition(machine, Ready, Pausing))
-        expectWarmed(invocationNamespace.name, action)
         expectMsg(Transition(machine, Pausing, Paused))
     }
 
@@ -271,6 +271,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
+        expectMsg(ActivationCompleted)
         expectMsg(ContainerRemoved)
 
         awaitAssert {
@@ -299,6 +300,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
+        expectMsg(ActivationCompleted)
         expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
         expectMsg(Transition(machine, Running, Removing))
 
@@ -327,6 +329,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
+        expectMsg(ActivationCompleted)
         expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself
         expectMsg(Transition(machine, Running, Removing))
 
@@ -426,6 +429,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
 
         // Finish /init, note that /run and log-collecting happens nonetheless
         initPromise.success(Interval.zero)
+        expectMsg(ActivationCompleted)
         expectWarmed(invocationNamespace.name, action)
         expectMsg(Transition(machine, Running, Ready))
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].