You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/09/08 17:53:54 UTC

[openwhisk] branch master updated: cold start container failure should abort buffered concurrent activations (#4958)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5109942  cold start container failure should abort buffered concurrent activations (#4958)
5109942 is described below

commit 51099425f46d5bebb145f98134a2aa1bbc7e092b
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Tue Sep 8 10:53:42 2020 -0700

    cold start container failure should abort buffered concurrent activations (#4958)
---
 .../core/containerpool/ContainerProxy.scala        | 69 +++++++++++++++-------
 .../containerpool/test/ContainerProxyTests.scala   | 51 ++++++++++++++++
 2 files changed, 98 insertions(+), 22 deletions(-)

diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index b1625a3..7a255fa 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -458,7 +458,13 @@ class ContainerProxy(factory: (TransactionId,
       destroyContainer(newData, true)
 
     // Failed after /init (the first run failed) on prewarmed or cold start
+    // - container will be destroyed
+    // - buffered will be aborted (if init fails, we assume it will always fail)
     case Event(f: FailureMessage, data: PreWarmedData) =>
+      logging.error(
+        this,
+        s"Failed during init of cold container ${data.getContainer}, queued activations will be aborted.")
+
       activeCount -= 1
       //reuse an existing init failure for any buffered activations that will be aborted
       val r = f.cause match {
@@ -468,7 +474,12 @@ class ContainerProxy(factory: (TransactionId,
       destroyContainer(data, true, true, r)
 
     // Failed for a subsequent /run
+    // - container will be destroyed
+    // - buffered will be resent (at least 1 has completed, so others are given a chance to complete)
     case Event(_: FailureMessage, data: WarmedData) =>
+      logging.error(
+        this,
+        s"Failed during use of warm container ${data.getContainer}, queued activations will be resent.")
       activeCount -= 1
       if (activeCount == 0) {
         destroyContainer(data, true)
@@ -479,10 +490,13 @@ class ContainerProxy(factory: (TransactionId,
       }
 
     // Failed at getting a container for a cold-start run
+    // - container will be destroyed
+    // - buffered will be aborted (if cold start container fails to start, we assume it will continue to fail)
     case Event(_: FailureMessage, _) =>
+      logging.error(this, "Failed to start cold container, queued activations will be aborted.")
       activeCount -= 1
       context.parent ! ContainerRemoved(true)
-      rejectBuffered()
+      abortBuffered()
       stop()
 
     case _ => delay
@@ -649,7 +663,7 @@ class ContainerProxy(factory: (TransactionId,
    */
   def destroyContainer(newData: ContainerStarted,
                        replacePrewarm: Boolean,
-                       abortBuffered: Boolean = false,
+                       abort: Boolean = false,
                        abortResponse: Option[ActivationResponse] = None) = {
     val container = newData.container
     if (!rescheduleJob) {
@@ -657,26 +671,8 @@ class ContainerProxy(factory: (TransactionId,
     } else {
       context.parent ! RescheduleJob
     }
-    if (abortBuffered && runBuffer.length > 0) {
-      logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init")
-      runBuffer.foreach { job =>
-        implicit val tid = job.msg.transid
-        logging.info(this, s"aborting activation ${job.msg.activationId} after failed init with ${abortResponse}")
-        val result = ContainerProxy.constructWhiskActivation(
-          job,
-          None,
-          Interval.zero,
-          false,
-          abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
-        val context = UserContext(job.msg.user)
-        val msg = if (job.msg.blocking) {
-          CombinedCompletionAndResultMessage(tid, result, instance)
-        } else {
-          CompletionMessage(tid, result, instance)
-        }
-        sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
-        storeActivation(tid, result, job.msg.blocking, context)
-      }
+    if (abort && runBuffer.nonEmpty) {
+      abortBuffered(abortResponse)
     } else {
       rejectBuffered()
     }
@@ -697,6 +693,35 @@ class ContainerProxy(factory: (TransactionId,
     }
   }
 
+  def abortBuffered(abortResponse: Option[ActivationResponse] = None) = {
+    logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init or failed cold start")
+    runBuffer.foreach { job =>
+      implicit val tid = job.msg.transid
+      logging.info(
+        this,
+        s"aborting activation ${job.msg.activationId} after failed init or cold start with ${abortResponse}")
+      val result = ContainerProxy.constructWhiskActivation(
+        job,
+        None,
+        Interval.zero,
+        false,
+        abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
+      val context = UserContext(job.msg.user)
+      val msg = if (job.msg.blocking) {
+        CombinedCompletionAndResultMessage(tid, result, instance)
+      } else {
+        CompletionMessage(tid, result, instance)
+      }
+      sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+        .andThen {
+          case Failure(e) => logging.error(this, s"failed to send abort ack $e")
+        }
+      storeActivation(tid, result, job.msg.blocking, context).andThen {
+        case Failure(e) => logging.error(this, s"failed to store aborted activation $e")
+      }
+    }
+  }
+
   /**
    * Return any buffered jobs to parent, in case buffer is not empty at removal/error time.
    */
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index a63e108..1c80bd6 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1273,6 +1273,57 @@ class ContainerProxyTests
     }
   }
 
+  it should "terminate buffered concurrent activations when cold init fails to launch container" in {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+    val initPromise = Promise[Interval]()
+    val container = new TestContainer(Some(initPromise))
+    val factory = createFactory(Future.failed(new Exception("simulating a container creation failure")))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    //no prewarming
+
+    machine ! Run(concurrentAction, message) //first in Uninitialized state
+    machine ! Run(concurrentAction, message) //second in Uninitialized or Running state
+
+    expectMsg(Transition(machine, Uninitialized, Running))
+
+    expectMsg(ContainerRemoved(true))
+    //go to Removing state when a failure happens while others are in flight
+    expectNoMessage(100.milliseconds)
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 0
+      container.runCount shouldBe 0
+      container.atomicLogsCount.get() shouldBe 0
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      //we should have 2 activations that are whisk error
+      acker.calls.filter(_._2.response.isWhiskError) should have size 2
+    }
+  }
+
   it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
     timeout) {
     val container = new TestContainer {