You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2020/08/19 15:35:46 UTC
[openwhisk] branch master updated: ContainerProxy - improve failure
handling for concurrent activations (#4938)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new d4f9e8a ContainerProxy - improve failure handling for concurrent activations (#4938)
d4f9e8a is described below
commit d4f9e8abf6739bd6ee48d64f567f93a90ef91daf
Author: tysonnorris <tn...@adobe.com>
AuthorDate: Wed Aug 19 08:35:30 2020 -0700
ContainerProxy - improve failure handling for concurrent activations (#4938)
* ContainerProxy - improve failure handling for concurrent activations
---
.../core/containerpool/ContainerProxy.scala | 101 +++++-
tests/dat/actions/concurrentFail1.js | 34 ++
tests/dat/actions/concurrentFail2.js | 34 ++
tests/src/test/scala/common/WhiskProperties.java | 2 +-
.../containerpool/test/ContainerProxyTests.scala | 396 ++++++++++++++++++++-
.../openwhisk/core/limits/ConcurrencyTests.scala | 67 +++-
6 files changed, 602 insertions(+), 32 deletions(-)
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 76dba7e..cf1dad6 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -457,15 +457,26 @@ class ContainerProxy(factory: (TransactionId,
rejectBuffered()
destroyContainer(newData, true)
- // Failed after /init (the first run failed)
- case Event(_: FailureMessage, data: PreWarmedData) =>
+ // Failed after /init (the first run failed) on prewarmed or cold start
+ case Event(f: FailureMessage, data: PreWarmedData) =>
activeCount -= 1
- destroyContainer(data, true)
+ //reuse an existing init failure for any buffered activations that will be aborted
+ val r = f.cause match {
+ case ActivationUnsuccessfulError(r) => Some(r.response)
+ case _ => None
+ }
+ destroyContainer(data, true, true, r)
// Failed for a subsequent /run
case Event(_: FailureMessage, data: WarmedData) =>
activeCount -= 1
- destroyContainer(data, true)
+ if (activeCount == 0) {
+ destroyContainer(data, true)
+ } else {
+ //signal that this container is going away (but don't remove it yet...)
+ rescheduleJob = true
+ goto(Removing)
+ }
// Failed at getting a container for a cold-start run
case Event(_: FailureMessage, _) =>
@@ -537,8 +548,27 @@ class ContainerProxy(factory: (TransactionId,
// Send the job back to the pool to be rescheduled
context.parent ! job
stay
- case Event(ContainerRemoved(_), _) => stop()
- case Event(_: FailureMessage, _) => stop()
+ // Run was successful, after another failed concurrent Run
+ case Event(RunCompleted, data: WarmedData) =>
+ activeCount -= 1
+ val newData = data.withoutResumeRun()
+ //if there are items in runbuffer, process them if there is capacity, and stay; otherwise if we have any pending activations, also stay
+ if (activeCount == 0) {
+ destroyContainer(newData, true)
+ } else {
+ stay using newData
+ }
+ case Event(ContainerRemoved(_), _) =>
+ stop()
+ // Run failed, after another failed concurrent Run
+ case Event(_: FailureMessage, data: WarmedData) =>
+ activeCount -= 1
+ val newData = data.withoutResumeRun()
+ if (activeCount == 0) {
+ destroyContainer(newData, true)
+ } else {
+ stay using newData
+ }
}
// Unstash all messages stashed while in intermediate state
@@ -617,15 +647,39 @@ class ContainerProxy(factory: (TransactionId,
*
* @param newData the ContainerStarted which container will be destroyed
*/
- def destroyContainer(newData: ContainerStarted, replacePrewarm: Boolean) = {
+ def destroyContainer(newData: ContainerStarted,
+ replacePrewarm: Boolean,
+ abortBuffered: Boolean = false,
+ abortResponse: Option[ActivationResponse] = None) = {
val container = newData.container
if (!rescheduleJob) {
context.parent ! ContainerRemoved(replacePrewarm)
} else {
context.parent ! RescheduleJob
}
-
- rejectBuffered()
+ if (abortBuffered && runBuffer.length > 0) {
+ logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init")
+ runBuffer.foreach { job =>
+ implicit val tid = job.msg.transid
+ logging.info(this, s"aborting activation ${job.msg.activationId} after failed init with ${abortResponse}")
+ val result = ContainerProxy.constructWhiskActivation(
+ job,
+ None,
+ Interval.zero,
+ false,
+ abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
+ val context = UserContext(job.msg.user)
+ val msg = if (job.msg.blocking) {
+ CombinedCompletionAndResultMessage(tid, result, instance)
+ } else {
+ CompletionMessage(tid, result, instance)
+ }
+ sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+ storeActivation(tid, result, job.msg.blocking, context)
+ }
+ } else {
+ rejectBuffered()
+ }
val unpause = stateName match {
case Paused => container.resume()(TransactionId.invokerNanny)
@@ -636,7 +690,11 @@ class ContainerProxy(factory: (TransactionId,
.flatMap(_ => container.destroy()(TransactionId.invokerNanny))
.map(_ => ContainerRemoved(replacePrewarm))
.pipeTo(self)
- goto(Removing) using newData
+ if (stateName != Removing) {
+ goto(Removing) using newData
+ } else {
+ stay using newData
+ }
}
/**
@@ -844,9 +902,30 @@ class ContainerProxy(factory: (TransactionId,
// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs.flatMap {
case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
+ val truncatedResult = truncatedError(act)
+ logging.info(
+ this,
+ s"Activation ${act.activationId} was unsuccessful at container ${stateData.getContainer} (with ${activeCount} still active) due to ${truncatedResult}")
Future.failed(ActivationUnsuccessfulError(act))
case Left(error) => Future.failed(error)
- case Right(act) => Future.successful(act)
+ case Right(act) =>
+ if (act.response.isApplicationError) {
+ val truncatedResult = truncatedError(act)
+ logging.error(
+ this,
+ s"Activation ${act.activationId} at container ${stateData.getContainer} (with ${activeCount} still active) returned an error ${truncatedResult}")
+ }
+ Future.successful(act)
+ }
+ }
+ //to ensure we don't blow up logs with potentially large activation response error
+ private def truncatedError(act: WhiskActivation) = {
+ val truncate = 1024
+ val resultString = act.response.result.map(_.compactPrint).getOrElse("[no result]")
+ if (resultString.length > truncate) {
+ s"${resultString.take(truncate)}..."
+ } else {
+ resultString
}
}
}
diff --git a/tests/dat/actions/concurrentFail1.js b/tests/dat/actions/concurrentFail1.js
new file mode 100644
index 0000000..f15f8b9
--- /dev/null
+++ b/tests/dat/actions/concurrentFail1.js
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+let count=0;
+function main(args) {
+ console.log("sleeping for "+(args.time||1000));
+ var shouldFail = args.fail||false;
+ var sleepTime = args.time||1000;
+ count = count+1;
+ if (shouldFail) {
+ console.log("skipping the return..");
+ return new Promise();
+ } else {
+ return new Promise(function (resolve, reject) {
+ setTimeout(function () {
+ resolve({body: "done sleeping "+sleepTime, done: true});
+ }, sleepTime);
+ })
+ }
+}
diff --git a/tests/dat/actions/concurrentFail2.js b/tests/dat/actions/concurrentFail2.js
new file mode 100644
index 0000000..895349f
--- /dev/null
+++ b/tests/dat/actions/concurrentFail2.js
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+let count=0;
+function main(args) {
+ console.log("sleeping for "+(args.time||1000));
+ var sleepTime = args.time||1000;
+ var shouldFail = count % 2 === 0;
+ count = count+1;
+ if (shouldFail) {
+ console.log("a catastrophic failure..");
+ process.exit(123);
+ } else {
+ return new Promise(function (resolve, reject) {
+ setTimeout(function () {
+ resolve({body: "done sleeping "+sleepTime, done: true});
+ }, sleepTime);
+ })
+ }
+}
diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java
index 540d72c..05d8b32 100644
--- a/tests/src/test/scala/common/WhiskProperties.java
+++ b/tests/src/test/scala/common/WhiskProperties.java
@@ -408,7 +408,7 @@ public class WhiskProperties {
return getPropFromSystemOrEnv(WHISK_SERVER) == null;
}
- private static String getProperty(String key, String defaultValue) {
+ public static String getProperty(String key, String defaultValue) {
String value = getPropFromSystemOrEnv(key);
if (value == null) {
value = whiskProperties.getProperty(key, defaultValue);
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 3316dc5..f64a0f8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -52,6 +52,7 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.entity.ActivationResponse.ContainerResponse
import org.apache.openwhisk.core.invoker.Invoker
import scala.collection.mutable
@@ -85,7 +86,7 @@ class ContainerProxyTests
val invocationNamespace = EntityName("invocationSpace")
val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec)
- val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val concurrencyEnabled = Option(WhiskProperties.getProperty("whisk.action.concurrency", "false")).exists(_.toBoolean)
val testConcurrencyLimit = if (concurrencyEnabled) ConcurrencyLimit(2) else ConcurrencyLimit(1)
val concurrentAction = ExecutableWhiskAction(
EntityPath("actionSpace"),
@@ -565,22 +566,8 @@ class ContainerProxyTests
it should "resend a failed Run when it is first Run after Ready state" in within(timeout) {
val noLogsAction = action.copy(limits = ActionLimits(logs = LogLimit(0.MB)))
- val container = new TestContainer {
- override def run(
- parameters: JsObject,
- environment: JsObject,
- timeout: FiniteDuration,
- concurrent: Int,
- reschedule: Boolean = false)(implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
- atomicRunCount.incrementAndGet()
- //every run after first fails
- if (runCount > 1) {
- Future.failed(ContainerHealthError(messageTransId, "intentional failure"))
- } else {
- Future.successful((runInterval, ActivationResponse.success()))
- }
- }
- }
+ val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
+ val container = new TestContainer(runPromises = runPromises)
val factory = createFactory(Future.successful(container))
val acker = createAcker(noLogsAction)
val store = createStore
@@ -602,6 +589,8 @@ class ContainerProxyTests
machine ! Run(noLogsAction, message)
expectMsg(Transition(machine, Uninitialized, Running))
+ //run the first successfully
+ runPromises(0).success(runInterval, ActivationResponse.success())
expectWarmed(invocationNamespace.name, noLogsAction)
expectMsg(Transition(machine, Running, Ready))
@@ -611,6 +600,8 @@ class ContainerProxyTests
machine ! failingRun
machine ! runAfterFail //will be buffered first, and then retried
expectMsg(Transition(machine, Ready, Running))
+ //run the second as failure
+ runPromises(1).failure(ContainerHealthError(messageTransId, "intentional failure"))
//on failure, buffered are resent first
expectMsg(runAfterFail)
//resend the first run to parent, and start removal process
@@ -793,7 +784,7 @@ class ContainerProxyTests
//without waiting for the completion of the previous Run message (signaled by NeedWork message)
//Multiple messages can only be handled after Warming.
it should "stay in Running state if others are still running" in within(timeout) {
- assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+ assume(concurrencyEnabled)
val initPromise = Promise[Interval]()
val runPromises = Seq(
@@ -913,6 +904,375 @@ class ContainerProxyTests
}
+ it should "not destroy on failure during Removing state when concurrent activations are in flight" in {
+ assume(concurrencyEnabled)
+
+ val initPromise = Promise[Interval]()
+ val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
+ val container = new TestContainer(Some(initPromise), runPromises)
+ val factory = createFactory(Future.successful(container))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ preWarm(machine) //ends in Started state
+
+ machine ! Run(concurrentAction, message) //first in Started state
+ machine ! Run(concurrentAction, message) //second in Started or Running state
+
+ //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+ //second message will be delayed until we get to Running state with WarmedData
+ // (and will produce 1 NeedWork message after run)
+ expectMsg(Transition(machine, Started, Running))
+
+ //complete the init
+ initPromise.success(initInterval)
+
+ //fail the first run
+ runPromises(0).success(runInterval, ActivationResponse.whiskError("intentional failure in test"))
+ //fail the second run
+ runPromises(1).success(runInterval, ActivationResponse.whiskError("intentional failure in test"))
+ //go to Removing state when a failure happens while others are in flight
+ expectMsg(Transition(machine, Running, Removing))
+ expectMsg(RescheduleJob)
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 2
+ container.atomicLogsCount.get() shouldBe 2
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ // As the active acks are sent asynchronously, it is possible, that the activation with the init time is not the
+ // first one in the buffer.
+ val initializedActivations =
+ acker.calls.filter(_._2.annotations.get(WhiskActivation.initTimeAnnotation).isDefined)
+ initializedActivations should have size 1
+
+ initializedActivations.head._2.annotations
+ .get(WhiskActivation.initTimeAnnotation)
+ .get
+ .convertTo[Int] shouldBe initInterval.duration.toMillis
+ }
+ }
+
+ it should "not destroy on failure during Running state when concurrent activations are in flight" in {
+ assume(concurrencyEnabled)
+
+ val initPromise = Promise[Interval]()
+ val runPromises = Seq(Promise[(Interval, ActivationResponse)](), Promise[(Interval, ActivationResponse)]())
+ val container = new TestContainer(Some(initPromise), runPromises)
+ val factory = createFactory(Future.successful(container))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ preWarm(machine) //ends in Started state
+
+ machine ! Run(concurrentAction, message) //first in Started state
+ machine ! Run(concurrentAction, message) //second in Started or Running state
+
+ //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+ //second message will be delayed until we get to Running state with WarmedData
+ // (and will produce 1 NeedWork message after run)
+ expectMsg(Transition(machine, Started, Running))
+
+ //complete the init
+ initPromise.success(initInterval)
+
+ //fail the first run
+ runPromises(0).success(runInterval, ActivationResponse.whiskError("intentional failure in test"))
+ //succeed the second run
+ runPromises(1).success(runInterval, ActivationResponse.success())
+ //go to Removing state when a failure happens while others are in flight
+ expectMsg(Transition(machine, Running, Removing))
+ expectMsg(RescheduleJob)
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 2
+ container.atomicLogsCount.get() shouldBe 2
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ // As the active acks are sent asynchronously, it is possible, that the activation with the init time is not the
+ // first one in the buffer.
+ val initializedActivations =
+ acker.calls.filter(_._2.annotations.get(WhiskActivation.initTimeAnnotation).isDefined)
+ initializedActivations should have size 1
+
+ initializedActivations.head._2.annotations
+ .get(WhiskActivation.initTimeAnnotation)
+ .get
+ .convertTo[Int] shouldBe initInterval.duration.toMillis
+ }
+ }
+ it should "terminate buffered concurrent activations when prewarm init fails with an error" in {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+ val initPromise = Promise[Interval]()
+ val container = new TestContainer(Some(initPromise))
+ val factory = createFactory(Future.successful(container))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ preWarm(machine) //ends in Started state
+
+ machine ! Run(concurrentAction, message) //first in Started state
+ machine ! Run(concurrentAction, message) //second in Started or Running state
+
+ //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+ //second message will be delayed until we get to Running state with WarmedData
+ // (and will produce 1 NeedWork message after run)
+ expectMsg(Transition(machine, Started, Running))
+
+ //complete the init
+ initPromise.failure(
+ InitializationError(
+ initInterval,
+ ActivationResponse
+ .processInitResponseContent(Right(ContainerResponse(false, "some bad init response...")), logging)))
+
+ expectMsg(ContainerRemoved(true))
+ //go to Removing state when a failure happens while others are in flight
+ expectMsg(Transition(machine, Running, Removing))
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 0
+ container.atomicLogsCount.get() shouldBe 1
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ //we should have 2 activations that are container error
+ acker.calls.filter(_._2.response.isContainerError) should have size 2
+ }
+ }
+
+ it should "terminate buffered concurrent activations when prewarm init fails unexpectedly" in {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+ val initPromise = Promise[Interval]()
+ val container = new TestContainer(Some(initPromise))
+ val factory = createFactory(Future.successful(container))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ preWarm(machine) //ends in Started state
+
+ machine ! Run(concurrentAction, message) //first in Started state
+ machine ! Run(concurrentAction, message) //second in Started or Running state
+
+ //first message go from Started -> Running -> Ready, with 2 NeedWork messages (1 for init, 1 for run)
+ //second message will be delayed until we get to Running state with WarmedData
+ // (and will produce 1 NeedWork message after run)
+ expectMsg(Transition(machine, Started, Running))
+
+ //complete the init
+ initPromise.failure(new IllegalStateException("intentional failure during init test"))
+
+ expectMsg(ContainerRemoved(true))
+ //go to Removing state when a failure happens while others are in flight
+ expectMsg(Transition(machine, Running, Removing))
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 0
+ container.atomicLogsCount.get() shouldBe 1
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ //we should have 2 activations that are whisk error
+ acker.calls.filter(_._2.response.isWhiskError) should have size 2
+ }
+ }
+
+ it should "terminate buffered concurrent activations when cold init fails with an error" in {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+ val initPromise = Promise[Interval]()
+ val container = new TestContainer(Some(initPromise))
+ val factory = createFactory(Future.successful(container))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ //no prewarming
+
+ machine ! Run(concurrentAction, message) //first in Uninitialized state
+ machine ! Run(concurrentAction, message) //second in Uninitialized or Running state
+
+ expectMsg(Transition(machine, Uninitialized, Running))
+
+ //complete the init
+ initPromise.failure(
+ InitializationError(
+ initInterval,
+ ActivationResponse
+ .processInitResponseContent(Right(ContainerResponse(false, "some bad init response...")), logging)))
+
+ expectMsg(ContainerRemoved(true))
+ //go to Removing state when a failure happens while others are in flight
+ expectMsg(Transition(machine, Running, Removing))
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 0
+ container.atomicLogsCount.get() shouldBe 1
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ //we should have 2 activations that are container error
+ acker.calls.filter(_._2.response.isContainerError) should have size 2
+ }
+ }
+
+ it should "terminate buffered concurrent activations when cold init fails unexpectedly" in {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+
+ val initPromise = Promise[Interval]()
+ val container = new TestContainer(Some(initPromise))
+ val factory = createFactory(Future.successful(container))
+ val acker = createSyncAcker(concurrentAction)
+ val store = createSyncStore
+ val collector =
+ createCollector(Future.successful(ActivationLogs()), () => container.logs(0.MB, false)(TransactionId.testing))
+
+ val machine =
+ childActorOf(
+ ContainerProxy
+ .props(
+ factory,
+ acker,
+ store,
+ collector,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ poolConfig,
+ healthchecksConfig(),
+ pauseGrace = pauseGrace)
+ .withDispatcher(CallingThreadDispatcher.Id))
+ registerCallback(machine)
+ //no prewarming
+
+ machine ! Run(concurrentAction, message) //first in Uninitialized state
+ machine ! Run(concurrentAction, message) //second in Uninitialized or Running state
+
+ expectMsg(Transition(machine, Uninitialized, Running))
+
+ //complete the init
+ initPromise.failure(new IllegalStateException("intentional failure during init test"))
+
+ expectMsg(ContainerRemoved(true))
+ //go to Removing state when a failure happens while others are in flight
+ expectMsg(Transition(machine, Running, Removing))
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 0
+ container.atomicLogsCount.get() shouldBe 1
+ container.suspendCount shouldBe 0
+ container.resumeCount shouldBe 0
+ acker.calls should have size 2
+
+ store.calls should have size 2
+
+ //we should have 2 activations that are whisk error
+ acker.calls.filter(_._2.response.isWhiskError) should have size 2
+ }
+ }
+
it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
timeout) {
val container = new TestContainer {
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
index 4110d92..2ea660c 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
@@ -17,6 +17,7 @@
package org.apache.openwhisk.core.limits
+import akka.http.scaladsl.model.StatusCodes
import common._
import common.rest.WskRestOperations
import org.apache.openwhisk.core.ConfigKeys
@@ -53,7 +54,7 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
//This tests generates a concurrent load against the concurrent.js action with concurrency set to 5
it should "execute activations concurrently when concurrency > 1 " in withAssetCleaner(wskprops) {
- assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
(wp, assetHelper) =>
val name = "TestConcurrentAction"
@@ -102,7 +103,7 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
//This tests generates the same load against the same action as previous test, BUT with concurrency set to 1
it should "execute activations sequentially when concurrency = 1 " in withAssetCleaner(wskprops) {
- assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
(wp, assetHelper) =>
val name = "TestNonConcurrentAction"
@@ -149,4 +150,66 @@ class ConcurrencyTests extends TestHelpers with WskTestHelpers with WskActorSyst
}
}
+ it should "allow concurrent activations to gracefully complete when one fails" in withAssetCleaner(wskprops) {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
+ (wp, assetHelper) =>
+ val name = "TestFailingConcurrentAction1"
+ assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
+ //this action fails by returning an empty promise
+ val actionName = TestUtils.getTestActionFilename("concurrentFail1.js")
+ (action, _) =>
+ //disable log collection since concurrent activation requires specialized log processing
+ // (at action runtime and using specialized LogStore)
+ action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(2))
+ }
+ //with concurrency 2, at least some of the 3 activations will fail, but not all
+ val requestCount = 3
+ println(s"executing $requestCount activations")
+ val runs = (1 to requestCount).map { i =>
+ Future {
+ //within the action, return empty promise on one specific invocation
+ val params: Map[String, JsValue] = if (i == 2) {
+ Map("fail" -> true.toJson)
+ } else {
+ Map.empty
+ }
+ val result = wsk.action.invoke(name, params, blocking = true, expectedExitCode = TestUtils.DONTCARE_EXIT)
+ result
+ }
+ }
+ val results = Await.result(Future.sequence(runs), 30.seconds)
+ //some will be 200, some will be 400, but all should be completed (no forced acks that take > 30s)
+ results.count(_.statusCode == StatusCodes.OK) should be > 0
+ results.count(_.statusCode == StatusCodes.BadGateway) should be > 0
+ }
+ it should "allow concurrent activations to gracefully complete when one fails catastrophically" in withAssetCleaner(
+ wskprops) {
+ assume(Option(WhiskProperties.getProperty("whisk.action.concurrency", "False")).exists(_.toBoolean))
+ (wp, assetHelper) =>
+ val name = "TestFailingConcurrentAction2"
+ assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
+ //this action does a process.exit() on the 5th activation
+ val actionName = TestUtils.getTestActionFilename("concurrentFail2.js")
+ (action, _) =>
+ //disable log collection since concurrent activation requires specialized log processing
+ // (at action runtime and using specialized LogStore)
+ action.create(name, Some(actionName), logsize = Some(0.bytes), concurrency = Some(2))
+ }
+ //we'll make every container fail every other activation, so with at least 2 to each container, all will fail
+ val requestCount = 4
+ println(s"executing $requestCount activations")
+ val runs = (1 to requestCount).map { i =>
+ Future {
+ //within the action, exite the nodejs process on second invocation
+ //use default params
+ val result = wsk.action.invoke(name, blocking = true, expectedExitCode = TestUtils.DONTCARE_EXIT)
+ result
+ }
+ }
+ val results = Await.result(Future.sequence(runs), 30.seconds)
+ //some will no 200, since each each container gets at least 5 concurrent activations,
+ //and each container crashes on the 5 activation.
+ results.count(_.statusCode == StatusCodes.OK) shouldBe 0
+ results.count(_.statusCode == StatusCodes.BadGateway) shouldBe 4
+ }
}