You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/09/08 17:53:54 UTC
[openwhisk] branch master updated: cold start container failure
should abort buffered concurrent activations (#4958)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5109942 cold start container failure should abort buffered concurrent activations (#4958)
5109942 is described below
commit 51099425f46d5bebb145f98134a2aa1bbc7e092b
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Tue Sep 8 10:53:42 2020 -0700
cold start container failure should abort buffered concurrent activations (#4958)
---
.../core/containerpool/ContainerProxy.scala | 69 +++++++++++++++-------
.../containerpool/test/ContainerProxyTests.scala | 51 ++++++++++++++++
2 files changed, 98 insertions(+), 22 deletions(-)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index b1625a3..7a255fa 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -458,7 +458,13 @@ class ContainerProxy(factory: (TransactionId,
destroyContainer(newData, true)
// Failed after /init (the first run failed) on prewarmed or cold start
+ // - container will be destroyed
+ // - buffered will be aborted (if init fails, we assume it will always fail)
case Event(f: FailureMessage, data: PreWarmedData) =>
+ logging.error(
+ this,
+ s"Failed during init of cold container ${data.getContainer}, queued activations will be aborted.")
+
activeCount -= 1
//reuse an existing init failure for any buffered activations that will be aborted
val r = f.cause match {
@@ -468,7 +474,12 @@ class ContainerProxy(factory: (TransactionId,
destroyContainer(data, true, true, r)
// Failed for a subsequent /run
+ // - container will be destroyed
+ // - buffered will be resent (at least 1 has completed, so others are given a chance to complete)
case Event(_: FailureMessage, data: WarmedData) =>
+ logging.error(
+ this,
+ s"Failed during use of warm container ${data.getContainer}, queued activations will be resent.")
activeCount -= 1
if (activeCount == 0) {
destroyContainer(data, true)
@@ -479,10 +490,13 @@ class ContainerProxy(factory: (TransactionId,
}
// Failed at getting a container for a cold-start run
+ // - container will be destroyed
+ // - buffered will be aborted (if cold start container fails to start, we assume it will continue to fail)
case Event(_: FailureMessage, _) =>
+ logging.error(this, "Failed to start cold container, queued activations will be aborted.")
activeCount -= 1
context.parent ! ContainerRemoved(true)
- rejectBuffered()
+ abortBuffered()
stop()
case _ => delay
@@ -649,7 +663,7 @@ class ContainerProxy(factory: (TransactionId,
*/
def destroyContainer(newData: ContainerStarted,
replacePrewarm: Boolean,
- abortBuffered: Boolean = false,
+ abort: Boolean = false,
abortResponse: Option[ActivationResponse] = None) = {
val container = newData.container
if (!rescheduleJob) {
@@ -657,26 +671,8 @@ class ContainerProxy(factory: (TransactionId,
} else {
context.parent ! RescheduleJob
}
- if (abortBuffered && runBuffer.length > 0) {
- logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init")
- runBuffer.foreach { job =>
- implicit val tid = job.msg.transid
- logging.info(this, s"aborting activation ${job.msg.activationId} after failed init with ${abortResponse}")
- val result = ContainerProxy.constructWhiskActivation(
- job,
- None,
- Interval.zero,
- false,
- abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
- val context = UserContext(job.msg.user)
- val msg = if (job.msg.blocking) {
- CombinedCompletionAndResultMessage(tid, result, instance)
- } else {
- CompletionMessage(tid, result, instance)
- }
- sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
- storeActivation(tid, result, job.msg.blocking, context)
- }
+ if (abort && runBuffer.nonEmpty) {
+ abortBuffered(abortResponse)
} else {
rejectBuffered()
}
@@ -697,6 +693,35 @@ class ContainerProxy(factory: (TransactionId,
}
}
+ def abortBuffered(abortResponse: Option[ActivationResponse] = None) = {
+ logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init or failed cold start")
+ runBuffer.foreach { job =>
+ implicit val tid = job.msg.transid
+ logging.info(
+ this,
+ s"aborting activation ${job.msg.activationId} after failed init or cold start with ${abortResponse}")
+ val result = ContainerProxy.constructWhiskActivation(
+ job,
+ None,
+ Interval.zero,
+ false,
+ abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
+ val context = UserContext(job.msg.user)
+ val msg = if (job.msg.blocking) {
+ CombinedCompletionAndResultMessage(tid, result, instance)
+ } else {
+ CompletionMessage(tid, result, instance)
+ }
+ sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+ .andThen {
+ case Failure(e) => logging.error(this, s"failed to send abort ack $e")
+ }
+ storeActivation(tid, result, job.msg.blocking, context).andThen {
+ case Failure(e) => logging.error(this, s"failed to store aborted activation $e")
+ }
+ }
+ }
+
/**
* Return any buffered jobs to parent, in case buffer is not empty at removal/error time.
*/
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index a63e108..1c80bd6 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1273,6 +1273,57 @@ class ContainerProxyTests
}
}
+ it should "terminate buffered concurrent activations when cold init fails to launch container" in {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+ val initPromise = Promise[Interval]()
+ val container = new TestContainer(Some(initPromise))
+ val factory = createFactory(Future.failed(new Exception("simulating a container creation failure")))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ //no prewarming
+
+ machine ! Run(concurrentAction, message) //first in Uninitialized state
+ machine ! Run(concurrentAction, message) //second in Uninitialized or Running state
+
+ expectMsg(Transition(machine, Uninitialized, Running))
+
+ expectMsg(ContainerRemoved(true))
+ //go to Removing state when a failure happens while others are in flight
+ expectNoMessage(100.milliseconds)
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 0
+ container.runCount shouldBe 0
+ container.atomicLogsCount.get() shouldBe 0
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ //we should have 2 activations that are whisk error
+ acker.calls.filter(_._2.response.isWhiskError) should have size 2
+ }
+ }
+
it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
timeout) {
val container = new TestContainer {