You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/10 20:13:39 UTC

[GitHub] tysonnorris closed pull request #3941: to address #3918, reuse a container on applicationError

tysonnorris closed pull request #3941: to address #3918, reuse a container on applicationError
URL: https://github.com/apache/incubator-openwhisk/pull/3941
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 7c46615d77..41d210de93 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -111,7 +111,7 @@ trait Container {
           Future.failed(
             InitializationError(
               result.interval,
-              ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true))))
+              ActivationResponse.containerError(Messages.timedoutActivation(timeout, true))))
         } else {
           Future.failed(
             InitializationError(
@@ -148,7 +148,7 @@ trait Container {
       }
       .map { result =>
         val response = if (result.interval.duration >= timeout) {
-          ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false))
+          ActivationResponse.containerError(Messages.timedoutActivation(timeout, false))
         } else {
           ActivationResponse.processRunResponseContent(result.response, logging)
         }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index d04dfb3547..566207610b 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -153,7 +153,7 @@ class ContainerProxy(
             // the failure is either the system fault, or for docker actions, the application/developer fault
             val response = t match {
               case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg)
-              case BlackboxStartupError(msg)       => ActivationResponse.applicationError(msg)
+              case BlackboxStartupError(msg)       => ActivationResponse.containerError(msg)
               case _                               => ActivationResponse.whiskError(Messages.resourceProvisionError)
             }
             // construct an appropriate activation and record it in the datastore,
@@ -419,9 +419,10 @@ class ContainerProxy(
 
     // Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
     activationWithLogs.flatMap {
-      case Right(act) if !act.response.isSuccess => Future.failed(ActivationUnsuccessfulError(act))
-      case Left(error)                           => Future.failed(error)
-      case Right(act)                            => Future.successful(act)
+      case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
+        Future.failed(ActivationUnsuccessfulError(act))
+      case Left(error) => Future.failed(error)
+      case Right(act)  => Future.successful(act)
     }
   }
 }
diff --git a/tests/src/test/scala/whisk/core/cli/test/WskRestBasicUsageTests.scala b/tests/src/test/scala/whisk/core/cli/test/WskRestBasicUsageTests.scala
index 23209087c6..c82445e53c 100644
--- a/tests/src/test/scala/whisk/core/cli/test/WskRestBasicUsageTests.scala
+++ b/tests/src/test/scala/whisk/core/cli/test/WskRestBasicUsageTests.scala
@@ -199,7 +199,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct
       withActivation(wsk.activation, wsk.action.invoke(name)) { activation =>
         val response = activation.response
         response.result.get.fields("error") shouldBe Messages.timedoutActivation(3 seconds, true).toJson
-        response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ApplicationError)
+        response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError)
       }
   }
 
@@ -285,7 +285,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct
 
       val run = wsk.action.invoke(name)
       withActivation(wsk.activation, run) { activation =>
-        activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ApplicationError)
+        activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError)
         activation.response.result.get
           .fields("error") shouldBe s"Failed to pull container image '$containerName'.".toJson
         activation.annotations shouldBe defined
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index a7080af784..077afa9369 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -426,7 +426,7 @@ class DockerContainerTests
 
     val error = the[InitializationError] thrownBy await(init, initTimeout)
     error.interval shouldBe interval
-    error.response.statusCode shouldBe ActivationResponse.ApplicationError
+    error.response.statusCode shouldBe ActivationResponse.ContainerError
 
     // assert the finish log is there
     val end = LogMarker.parse(logLines.last)
@@ -474,7 +474,7 @@ class DockerContainerTests
     }
 
     val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout)
-    await(runResult) shouldBe (interval, ActivationResponse.applicationError(
+    await(runResult) shouldBe (interval, ActivationResponse.containerError(
       Messages.timedoutActivation(runTimeout, false)))
 
     // assert the finish log is there
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index bd3c948410..2221e91e28 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -242,7 +242,7 @@ class KubernetesContainerTests
 
     val error = the[InitializationError] thrownBy await(init, initTimeout)
     error.interval shouldBe interval
-    error.response.statusCode shouldBe ActivationResponse.ApplicationError
+    error.response.statusCode shouldBe ActivationResponse.ContainerError
 
     // assert the finish log is there
     val end = LogMarker.parse(logLines.last)
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 25b0303271..4228c3dd5c 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -79,6 +79,11 @@ class ContainerProxyTests
     Interval(now, now.plusMillis(200))
   }
 
+  val errorInterval = {
+    val now = initInterval.end.plusMillis(75) // delay between init and run
+    Interval(now, now.plusMillis(150))
+  }
+
   val uuid = UUID()
 
   val message = ActivationMessage(
@@ -385,6 +390,73 @@ class ContainerProxyTests
     }
   }
 
+  it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
+    timeout) {
+    val container = new TestContainer {
+      override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
+        implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
+        runCount += 1
+        //every other run fails
+        if (runCount % 2 == 0) {
+          Future.successful((runInterval, ActivationResponse.success()))
+        } else {
+          Future.successful((errorInterval, ActivationResponse.applicationError(("boom"))))
+        }
+      }
+    }
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker()
+    val store = createStore
+    val collector = createCollector()
+
+    val machine =
+      childActorOf(
+        ContainerProxy
+          .props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
+    registerCallback(machine)
+    preWarm(machine)
+
+    //first one will fail
+    run(machine, Started)
+
+    // Note that there are no intermediate state changes
+    //second one will succeed
+    run(machine, Ready)
+
+    //With exception of the error on first run, the assertions should be the same as in
+    //         `run an action and continue with a next run without pausing the container`
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 2
+      collector.calls should have size 2
+      container.suspendCount shouldBe 0
+      container.destroyCount shouldBe 0
+      acker.calls should have size 2
+      store.calls should have size 2
+
+      val initErrorActivation = acker.calls(0)._2
+      initErrorActivation.duration shouldBe Some((initInterval.duration + errorInterval.duration).toMillis)
+      initErrorActivation.annotations
+        .get(WhiskActivation.initTimeAnnotation)
+        .get
+        .convertTo[Int] shouldBe initInterval.duration.toMillis
+      initErrorActivation.annotations
+        .get(WhiskActivation.waitTimeAnnotation)
+        .get
+        .convertTo[Int] shouldBe
+        Interval(message.transid.meta.start, initInterval.start).duration.toMillis
+
+      val runOnlyActivation = acker.calls(1)._2
+      runOnlyActivation.duration shouldBe Some(runInterval.duration.toMillis)
+      runOnlyActivation.annotations.get(WhiskActivation.initTimeAnnotation) shouldBe empty
+      runOnlyActivation.annotations.get(WhiskActivation.waitTimeAnnotation).get.convertTo[Int] shouldBe {
+        Interval(message.transid.meta.start, runInterval.start).duration.toMillis
+      }
+    }
+
+  }
+
   /*
    * ERROR CASES
    */
@@ -423,7 +495,7 @@ class ContainerProxyTests
       override def initialize(initializer: JsObject,
                               timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
         initializeCount += 1
-        Future.failed(InitializationError(initInterval, ActivationResponse.applicationError("boom")))
+        Future.failed(InitializationError(initInterval, ActivationResponse.containerError("boom")))
       }
     }
     val factory = createFactory(Future.successful(container))
@@ -448,7 +520,7 @@ class ContainerProxyTests
       collector.calls should have size 1
       container.destroyCount shouldBe 1
       val activation = acker.calls(0)._2
-      activation.response shouldBe ActivationResponse.applicationError("boom")
+      activation.response shouldBe ActivationResponse.containerError("boom")
       activation.annotations
         .get(WhiskActivation.initTimeAnnotation)
         .get
@@ -458,12 +530,13 @@ class ContainerProxyTests
     }
   }
 
-  it should "complete the transaction and destroy the container on a failed run" in within(timeout) {
+  it should "complete the transaction and destroy the container on a failed run IFF failure was containerError" in within(
+    timeout) {
     val container = new TestContainer {
       override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
         implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
         runCount += 1
-        Future.successful((initInterval, ActivationResponse.applicationError("boom")))
+        Future.successful((initInterval, ActivationResponse.containerError(("boom"))))
       }
     }
     val factory = createFactory(Future.successful(container))
@@ -487,7 +560,7 @@ class ContainerProxyTests
       container.runCount shouldBe 1
       collector.calls should have size 1
       container.destroyCount shouldBe 1
-      acker.calls(0)._2.response shouldBe ActivationResponse.applicationError("boom")
+      acker.calls(0)._2.response shouldBe ActivationResponse.containerError("boom")
       store.calls should have size 1
     }
   }
diff --git a/tests/src/test/scala/whisk/core/limits/MaxActionDurationTests.scala b/tests/src/test/scala/whisk/core/limits/MaxActionDurationTests.scala
index 1a854f1684..efd431cb2b 100644
--- a/tests/src/test/scala/whisk/core/limits/MaxActionDurationTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/MaxActionDurationTests.scala
@@ -86,7 +86,7 @@ class MaxActionDurationTests extends TestHelpers with WskTestHelpers with WskAct
           pollPeriod = 1.minute,
           totalWait = TimeLimit.MAX_DURATION + 2.minutes) { activation =>
           withClue("Activation result not as expected:") {
-            activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ApplicationError)
+            activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError)
             activation.response.result shouldBe Some(
               JsObject("error" -> Messages.timedoutActivation(TimeLimit.MAX_DURATION, init = false).toJson))
             activation.duration.toInt should be >= TimeLimit.MAX_DURATION.toMillis.toInt


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services