You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/19 15:35:46 UTC

[openwhisk] branch master updated: ContainerProxy - improve failure handling for concurrent activations (#4938)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d4f9e8a  ContainerProxy - improve failure handling for concurrent activations (#4938)
d4f9e8a is described below

commit d4f9e8abf6739bd6ee48d64f567f93a90ef91daf
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Wed Aug 19 08:35:30 2020 -0700

    ContainerProxy - improve failure handling for concurrent activations (#4938)
    
    * ContainerProxy - improve failure handling for concurrent activations
---
 .../core/containerpool/ContainerProxy.scala        | 101 +++++-
 tests/dat/actions/concurrentFail1.js               |  34 ++
 tests/dat/actions/concurrentFail2.js               |  34 ++
 tests/src/test/scala/common/WhiskProperties.java   |   2 +-
 .../containerpool/test/ContainerProxyTests.scala   | 396 ++++++++++++++++++++-
 .../openwhisk/core/limits/ConcurrencyTests.scala   |  67 +++-
 6 files changed, 602 insertions(+), 32 deletions(-)

diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 76dba7e..cf1dad6 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -457,15 +457,26 @@ class ContainerProxy(factory: (TransactionId,
       rejectBuffered()
       destroyContainer(newData, true)
 
-    // Failed after /init (the first run failed)
-    case Event(_: FailureMessage, data: PreWarmedData) =>
+    // Failed after /init (the first run failed) on prewarmed or cold start
+    case Event(f: FailureMessage, data: PreWarmedData) =>
       activeCount -= 1
-      destroyContainer(data, true)
+      //reuse an existing init failure for any buffered activations that will be aborted
+      val r = f.cause match {
+        case ActivationUnsuccessfulError(r) => Some(r.response)
+        case _                              => None
+      }
+      destroyContainer(data, true, true, r)
 
     // Failed for a subsequent /run
     case Event(_: FailureMessage, data: WarmedData) =>
       activeCount -= 1
-      destroyContainer(data, true)
+      if (activeCount == 0) {
+        destroyContainer(data, true)
+      } else {
+        //signal that this container is going away (but don't remove it yet...)
+        rescheduleJob = true
+        goto(Removing)
+      }
 
     // Failed at getting a container for a cold-start run
     case Event(_: FailureMessage, _) =>
@@ -537,8 +548,27 @@ class ContainerProxy(factory: (TransactionId,
       // Send the job back to the pool to be rescheduled
       context.parent ! job
       stay
-    case Event(ContainerRemoved(_), _) => stop()
-    case Event(_: FailureMessage, _)   => stop()
+    // Run was successful, after another failed concurrent Run
+    case Event(RunCompleted, data: WarmedData) =>
+      activeCount -= 1
+      val newData = data.withoutResumeRun()
+      //if there are items in runbuffer, process them if there is capacity, and stay; otherwise if we have any pending activations, also stay
+      if (activeCount == 0) {
+        destroyContainer(newData, true)
+      } else {
+        stay using newData
+      }
+    case Event(ContainerRemoved(_), _) =>
+      stop()
+    // Run failed, after another failed concurrent Run
+    case Event(_: FailureMessage, data: WarmedData) =>
+      activeCount -= 1
+      val newData = data.withoutResumeRun()
+      if (activeCount == 0) {
+        destroyContainer(newData, true)
+      } else {
+        stay using newData
+      }
   }
 
   // Unstash all messages stashed while in intermediate state
@@ -617,15 +647,39 @@ class ContainerProxy(factory: (TransactionId,
    *
    * @param newData the ContainerStarted which container will be destroyed
    */
-  def destroyContainer(newData: ContainerStarted, replacePrewarm: Boolean) = {
+  def destroyContainer(newData: ContainerStarted,
+                       replacePrewarm: Boolean,
+                       abortBuffered: Boolean = false,
+                       abortResponse: Option[ActivationResponse] = None) = {
     val container = newData.container
     if (!rescheduleJob) {
       context.parent ! ContainerRemoved(replacePrewarm)
     } else {
       context.parent ! RescheduleJob
     }
-
-    rejectBuffered()
+    if (abortBuffered && runBuffer.length > 0) {
+      logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init")
+      runBuffer.foreach { job =>
+        implicit val tid = job.msg.transid
+        logging.info(this, s"aborting activation ${job.msg.activationId} after failed init with ${abortResponse}")
+        val result = ContainerProxy.constructWhiskActivation(
+          job,
+          None,
+          Interval.zero,
+          false,
+          abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
+        val context = UserContext(job.msg.user)
+        val msg = if (job.msg.blocking) {
+          CombinedCompletionAndResultMessage(tid, result, instance)
+        } else {
+          CompletionMessage(tid, result, instance)
+        }
+        sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+        storeActivation(tid, result, job.msg.blocking, context)
+      }
+    } else {
+      rejectBuffered()
+    }
 
     val unpause = stateName match {
       case Paused => container.resume()(TransactionId.invokerNanny)
@@ -636,7 +690,11 @@ class ContainerProxy(factory: (TransactionId,
       .flatMap(_ => container.destroy()(TransactionId.invokerNanny))
       .map(_ => ContainerRemoved(replacePrewarm))
       .pipeTo(self)
-    goto(Removing) using newData
+    if (stateName != Removing) {
+      goto(Removing) using newData
+    } else {
+      stay using newData
+    }
   }
 
   /**
@@ -844,9 +902,30 @@ class ContainerProxy(factory: (TransactionId,
     // Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
     activationWithLogs.flatMap {
       case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
+        val truncatedResult = truncatedError(act)
+        logging.info(
+          this,
+          s"Activation ${act.activationId} was unsuccessful at container ${stateData.getContainer} (with ${activeCount} still active) due to ${truncatedResult}")
         Future.failed(ActivationUnsuccessfulError(act))
       case Left(error) => Future.failed(error)
-      case Right(act)  => Future.successful(act)
+      case Right(act) =>
+        if (act.response.isApplicationError) {
+          val truncatedResult = truncatedError(act)
+          logging.error(
+            this,
+            s"Activation ${act.activationId} at container ${stateData.getContainer} (with ${activeCount} still active) returned an error ${truncatedResult}")
+        }
+        Future.successful(act)
+    }
+  }
+  //to ensure we don't blow up logs with potentially large activation response error
+  private def truncatedError(act: WhiskActivation) = {
+    val truncate = 1024
+    val resultString = act.response.result.map(_.compactPrint).getOrElse("[no result]")
+    if (resultString.length > truncate) {
+      s"${resultString.take(truncate)}..."
+    } else {
+      resultString
     }
   }
 }
diff --git a/tests/dat/actions/concurrentFail1.js b/tests/dat/actions/concurrentFail1.js
new file mode 100644
index 0000000..f15f8b9
--- /dev/null
+++ b/tests/dat/actions/concurrentFail1.js
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+let count=0;
+function main(args) {
+    console.log("sleeping for "+(args.time||1000));
+    var shouldFail = args.fail||false;
+    var sleepTime = args.time||1000;
+    count = count+1;
+    if (shouldFail) {
+        console.log("skipping the return..");
+        return new Promise();
+    } else {
+        return new Promise(function (resolve, reject) {
+            setTimeout(function () {
+                resolve({body: "done sleeping "+sleepTime, done: true});
+            }, sleepTime);
+        })
+    }
+}
diff --git a/tests/dat/actions/concurrentFail2.js b/tests/dat/actions/concurrentFail2.js
new file mode 100644
index 0000000..895349f
--- /dev/null
+++ b/tests/dat/actions/concurrentFail2.js
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+let count=0;
+function main(args) {
+    console.log("sleeping for "+(args.time||1000));
+    var sleepTime = args.time||1000;
+    var shouldFail = count % 2 === 0;
+    count = count+1;
+    if (shouldFail) {
+        console.log("a catastrophic failure..");
+        process.exit(123);
+    } else {
+        return new Promise(function (resolve, reject) {
+            setTimeout(function () {
+                resolve({body: "done sleeping "+sleepTime, done: true});
+            }, sleepTime);
+        })
+    }
+}
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 540d72c..05d8b32 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -408,7 +408,7 @@ public class WhiskProperties {
         return getPropFromSystemOrEnv(WHISK_SERVER) == null;
     }
 
-    private static String getProperty(String key, String defaultValue) {
+    public static String getProperty(String key, String defaultValue) {
         String value = getPropFromSystemOrEnv(key);
         if (value == null) {
             value = whiskProperties.getProperty(key, defaultValue);
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 3316dc5..f64a0f8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -52,6 +52,7 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.entity.ActivationResponse.ContainerResponse
 import org.apache.openwhisk.core.invoker.Invoker
 
 import scala.collection.mutable
@@ -85,7 +86,7 @@ class ContainerProxyTests
   val invocationNamespace = EntityName("invocationSpace")
   val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
 
-  val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+  val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency", "false")).exists(_.toBoolean)
   val testConcurrencyLimit = if (concurrencyEnabled) ConcurrencyLimit(2) else ConcurrencyLimit(1)
   val concurrentAction = ExecutableWhiskAction(
     EntityPath("actionSpace"),
@@ -565,22 +566,8 @@ class ContainerProxyTests
 
   it should "resend a failed Run when it is first Run after Ready state" in within(timeout) {
     val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
-    val container = new TestContainer {
-      override def run(
-        parameters: JsObject,
-        environment: JsObject,
-        timeout: FiniteDuration,
-        concurrent: Int,
-        reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
-        atomicRunCount.incrementAndGet()
-        //every run after first fails
-        if (runCount > 1) {
-          Future.failed(ContainerHealthError(messageTransId, "intentional failure"))
-        } else {
-          Future.successful((runInterval, ActivationResponse.success()))
-        }
-      }
-    }
+    val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
+    val container = new TestContainer(runPromises = runPromises)
     val factory = createFactory(Future.successful(container))
     val acker = createAcker(noLogsAction)
     val store = createStore
@@ -602,6 +589,8 @@ class ContainerProxyTests
 
     machine ! Run(noLogsAction, message)
     expectMsg(Transition(machine, Uninitialized, Running))
+    //run the first successfully
+    runPromises(0).success(runInterval, ActivationResponse.success())
     expectWarmed(invocationNamespace.name, noLogsAction)
     expectMsg(Transition(machine, Running, Ready))
 
@@ -611,6 +600,8 @@ class ContainerProxyTests
     machine ! failingRun
     machine ! runAfterFail //will be buffered first, and then retried
     expectMsg(Transition(machine, Ready, Running))
+    //run the second as failure
+    runPromises(1).failure(ContainerHealthError(messageTransId, "intentional failure"))
     //on failure, buffered are resent first
     expectMsg(runAfterFail)
     //resend the first run to parent, and start removal process
@@ -793,7 +784,7 @@ class ContainerProxyTests
   //without waiting for the completion of the previous Run message (signaled by NeedWork message)
   //Multiple messages can only be handled after Warming.
   it should "stay in Running state if others are still running" in within(timeout) {
-    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+    assume(concurrencyEnabled)
 
     val initPromise = Promise[Interval]()
     val runPromises = Seq(
@@ -913,6 +904,375 @@ class ContainerProxyTests
 
   }
 
+  it should "not destroy on failure during Removing state when concurrent activations are in flight" in {
+    assume(concurrencyEnabled)
+
+    val initPromise = Promise[Interval]()
+    val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
+    val container = new TestContainer(Some(initPromise), runPromises)
+    val factory = createFactory(Future.successful(container))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    preWarm(machine) //ends in Started state
+
+    machine ! Run(concurrentAction, message) //first in Started state
+    machine ! Run(concurrentAction, message) //second in Started or Running state
+
+    //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+    //second message will be delayed until we get to Running state with WarmedData
+    //   (and will produce 1 NeedWork message after run)
+    expectMsg(Transition(machine, Started, Running))
+
+    //complete the init
+    initPromise.success(initInterval)
+
+    //fail the first run
+    runPromises(0).success(runInterval, ActivationResponse.whiskError("intentional failure in test"))
+    //fail the second run
+    runPromises(1).success(runInterval, ActivationResponse.whiskError("intentional failure in test"))
+    //go to Removing state when a failure happens while others are in flight
+    expectMsg(Transition(machine, Running, Removing))
+    expectMsg(RescheduleJob)
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 2
+      container.atomicLogsCount.get() shouldBe 2
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      // As the active acks are sent asynchronously, it is possible, that the activation with the init time is not the
+      // first one in the buffer.
+      val initializedActivations =
+        acker.calls.filter(_._2.annotations.get(WhiskActivation.initTimeAnnotation).isDefined)
+      initializedActivations should have size 1
+
+      initializedActivations.head._2.annotations
+        .get(WhiskActivation.initTimeAnnotation)
+        .get
+        .convertTo[Int] shouldBe initInterval.duration.toMillis
+    }
+  }
+
+  it should "not destroy on failure during Running state when concurrent activations are in flight" in {
+    assume(concurrencyEnabled)
+
+    val initPromise = Promise[Interval]()
+    val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
+    val container = new TestContainer(Some(initPromise), runPromises)
+    val factory = createFactory(Future.successful(container))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    preWarm(machine) //ends in Started state
+
+    machine ! Run(concurrentAction, message) //first in Started state
+    machine ! Run(concurrentAction, message) //second in Started or Running state
+
+    //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+    //second message will be delayed until we get to Running state with WarmedData
+    //   (and will produce 1 NeedWork message after run)
+    expectMsg(Transition(machine, Started, Running))
+
+    //complete the init
+    initPromise.success(initInterval)
+
+    //fail the first run
+    runPromises(0).success(runInterval, ActivationResponse.whiskError("intentional failure in test"))
+    //succeed the second run
+    runPromises(1).success(runInterval, ActivationResponse.success())
+    //go to Removing state when a failure happens while others are in flight
+    expectMsg(Transition(machine, Running, Removing))
+    expectMsg(RescheduleJob)
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 2
+      container.atomicLogsCount.get() shouldBe 2
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      // As the active acks are sent asynchronously, it is possible, that the activation with the init time is not the
+      // first one in the buffer.
+      val initializedActivations =
+        acker.calls.filter(_._2.annotations.get(WhiskActivation.initTimeAnnotation).isDefined)
+      initializedActivations should have size 1
+
+      initializedActivations.head._2.annotations
+        .get(WhiskActivation.initTimeAnnotation)
+        .get
+        .convertTo[Int] shouldBe initInterval.duration.toMillis
+    }
+  }
+  it should "terminate buffered concurrent activations when prewarm init fails with an error" in {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+    val initPromise = Promise[Interval]()
+    val container = new TestContainer(Some(initPromise))
+    val factory = createFactory(Future.successful(container))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    preWarm(machine) //ends in Started state
+
+    machine ! Run(concurrentAction, message) //first in Started state
+    machine ! Run(concurrentAction, message) //second in Started or Running state
+
+    //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+    //second message will be delayed until we get to Running state with WarmedData
+    //   (and will produce 1 NeedWork message after run)
+    expectMsg(Transition(machine, Started, Running))
+
+    //complete the init
+    initPromise.failure(
+      InitializationError(
+        initInterval,
+        ActivationResponse
+          .processInitResponseContent(Right(ContainerResponse(false, "some bad init response...")), logging)))
+
+    expectMsg(ContainerRemoved(true))
+    //go to Removing state when a failure happens while others are in flight
+    expectMsg(Transition(machine, Running, Removing))
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 0
+      container.atomicLogsCount.get() shouldBe 1
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      //we should have 2 activations that are container error
+      acker.calls.filter(_._2.response.isContainerError) should have size 2
+    }
+  }
+
+  it should "terminate buffered concurrent activations when prewarm init fails unexpectedly" in {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+    val initPromise = Promise[Interval]()
+    val container = new TestContainer(Some(initPromise))
+    val factory = createFactory(Future.successful(container))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    preWarm(machine) //ends in Started state
+
+    machine ! Run(concurrentAction, message) //first in Started state
+    machine ! Run(concurrentAction, message) //second in Started or Running state
+
+    //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+    //second message will be delayed until we get to Running state with WarmedData
+    //   (and will produce 1 NeedWork message after run)
+    expectMsg(Transition(machine, Started, Running))
+
+    //complete the init
+    initPromise.failure(new IllegalStateException("intentional failure during init test"))
+
+    expectMsg(ContainerRemoved(true))
+    //go to Removing state when a failure happens while others are in flight
+    expectMsg(Transition(machine, Running, Removing))
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 0
+      container.atomicLogsCount.get() shouldBe 1
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      //we should have 2 activations that are whisk error
+      acker.calls.filter(_._2.response.isWhiskError) should have size 2
+    }
+  }
+
+  it should "terminate buffered concurrent activations when cold init fails with an error" in {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+    val initPromise = Promise[Interval]()
+    val container = new TestContainer(Some(initPromise))
+    val factory = createFactory(Future.successful(container))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    //no prewarming
+
+    machine ! Run(concurrentAction, message) //first in Uninitialized state
+    machine ! Run(concurrentAction, message) //second in Uninitialized or Running state
+
+    expectMsg(Transition(machine, Uninitialized, Running))
+
+    //complete the init
+    initPromise.failure(
+      InitializationError(
+        initInterval,
+        ActivationResponse
+          .processInitResponseContent(Right(ContainerResponse(false, "some bad init response...")), logging)))
+
+    expectMsg(ContainerRemoved(true))
+    //go to Removing state when a failure happens while others are in flight
+    expectMsg(Transition(machine, Running, Removing))
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 0
+      container.atomicLogsCount.get() shouldBe 1
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      //we should have 2 activations that are container error
+      acker.calls.filter(_._2.response.isContainerError) should have size 2
+    }
+  }
+
+  it should "terminate buffered concurrent activations when cold init fails unexpectedly" in {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+    val initPromise = Promise[Interval]()
+    val container = new TestContainer(Some(initPromise))
+    val factory = createFactory(Future.successful(container))
+    val acker = createSyncAcker(concurrentAction)
+    val store = createSyncStore
+    val collector =
+      createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(
+            factory,
+            acker,
+            store,
+            collector,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            poolConfig,
+            healthchecksConfig(),
+            pauseGrace = pauseGrace)
+          .withDispatcher(CallingThreadDispatcher.Id))
+    registerCallback(machine)
+    //no prewarming
+
+    machine ! Run(concurrentAction, message) //first in Uninitialized state
+    machine ! Run(concurrentAction, message) //second in Uninitialized or Running state
+
+    expectMsg(Transition(machine, Uninitialized, Running))
+
+    //complete the init
+    initPromise.failure(new IllegalStateException("intentional failure during init test"))
+
+    expectMsg(ContainerRemoved(true))
+    //go to Removing state when a failure happens while others are in flight
+    expectMsg(Transition(machine, Running, Removing))
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 0
+      container.atomicLogsCount.get() shouldBe 1
+      container.suspendCount shouldBe 0
+      container.resumeCount shouldBe 0
+      acker.calls should have size 2
+
+      store.calls should have size 2
+
+      //we should have 2 activations that are whisk error
+      acker.calls.filter(_._2.response.isWhiskError) should have size 2
+    }
+  }
+
   it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
     timeout) {
     val container = new TestContainer {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
index 4110d92..2ea660c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
@@ -17,6 +17,7 @@
 
 package org.apache.openwhisk.core.limits
 
+import akka.http.scaladsl.model.StatusCodes
 import common._
 import common.rest.WskRestOperations
 import org.apache.openwhisk.core.ConfigKeys
@@ -53,7 +54,7 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
 
   //This tests generates a concurrent load against the concurrent.js action with concurrency set to 5
   it should "execute activations concurrently when concurrency > 1 " in withAssetCleaner(wskprops) {
-    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
 
     (wp, assetHelper) =>
       val name = "TestConcurrentAction"
@@ -102,7 +103,7 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
 
   //This tests generates the same load against the same action as previous test, BUT with concurrency set to 1
   it should "execute activations sequentially when concurrency = 1 " in withAssetCleaner(wskprops) {
-    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
 
     (wp, assetHelper) =>
       val name = "TestNonConcurrentAction"
@@ -149,4 +150,66 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
       }
   }
 
+  it should "allow concurrent activations to gracefully complete when one fails" in withAssetCleaner(wskprops) {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
+    (wp, assetHelper) =>
+      val name = "TestFailingConcurrentAction1"
+      assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
+        //this action fails by returning an empty promise
+        val actionName = TestUtils.getTestActionFilename("concurrentFail1.js")
+        (action, _) =>
+          //disable log collection since concurrent activation requires specialized log processing
+          // (at action runtime and using specialized LogStore)
+          action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(2))
+      }
+      //with concurrency 2, at least some of the 3 activations will fail, but not all
+      val requestCount = 3
+      println(s"executing $requestCount activations")
+      val runs = (1 to requestCount).map { i =>
+        Future {
+          //within the action, return empty promise on one specific invocation
+          val params: Map[String, JsValue] = if (i == 2) {
+            Map("fail" -> true.toJson)
+          } else {
+            Map.empty
+          }
+          val result = wsk.action.invoke(name, params, blocking = true, expectedExitCode = TestUtils.DONTCARE_EXIT)
+          result
+        }
+      }
+      val results = Await.result(Future.sequence(runs), 30.seconds)
+      //some will be 200, some will be 400, but all should be completed (no forced acks that take > 30s)
+      results.count(_.statusCode == StatusCodes.OK) should be > 0
+      results.count(_.statusCode == StatusCodes.BadGateway) should be > 0
+  }
+  it should "allow concurrent activations to gracefully complete when one fails catastrophically" in withAssetCleaner(
+    wskprops) {
+    assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
+    (wp, assetHelper) =>
+      val name = "TestFailingConcurrentAction2"
+      assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
+        //this action does a process.exit() on the 5th activation
+        val actionName = TestUtils.getTestActionFilename("concurrentFail2.js")
+        (action, _) =>
+          //disable log collection since concurrent activation requires specialized log processing
+          // (at action runtime and using specialized LogStore)
+          action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(2))
+      }
+      //we'll make every container fail every other activation, so with at least 2 to each container, all will fail
+      val requestCount = 4
+      println(s"executing $requestCount activations")
+      val runs = (1 to requestCount).map { i =>
+        Future {
+          //within the action, exite the nodejs process on second invocation
+          //use default params
+          val result = wsk.action.invoke(name, blocking = true, expectedExitCode = TestUtils.DONTCARE_EXIT)
+          result
+        }
+      }
+      val results = Await.result(Future.sequence(runs), 30.seconds)
+      //some will no 200, since each each container gets at least 5 concurrent activations,
+      //and each container crashes on the 5 activation.
+      results.count(_.statusCode == StatusCodes.OK) shouldBe 0
+      results.count(_.statusCode == StatusCodes.BadGateway) shouldBe 4
+  }
 }