You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/09/05 07:35:27 UTC

[incubator-openwhisk] branch master updated: Add blocking indicator for activation request. (#2677)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9adf702  Add blocking indicator for activation request. (#2677)
9adf702 is described below

commit 9adf7026c7b8d8cb8f13997d36a704bd344b58ae
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Tue Sep 5 03:35:24 2017 -0400

    Add blocking indicator for activation request. (#2677)
    
    This allows non blocking requests to drop their results from the activeack response.
---
 common/scala/src/main/scala/whisk/core/connector/Message.scala |  3 ++-
 .../src/main/scala/whisk/core/entity/ActivationResult.scala    |  1 +
 .../src/main/scala/whisk/core/entity/WhiskActivation.scala     |  1 +
 .../src/main/scala/whisk/core/controller/RestAPIs.scala        |  2 +-
 .../scala/whisk/core/controller/actions/PrimitiveActions.scala |  1 +
 .../scala/whisk/core/entitlement/ActivationThrottler.scala     |  2 +-
 .../scala/whisk/core/loadBalancer/InvokerSupervision.scala     |  1 +
 .../scala/whisk/core/loadBalancer/LoadBalancerService.scala    |  2 +-
 .../main/scala/whisk/core/containerpool/ContainerProxy.scala   |  8 ++++----
 .../src/main/scala/whisk/core/invoker/InvokerReactive.scala    |  7 ++++---
 tests/src/test/scala/common/LoggedFunction.scala               | 10 ++++++++++
 .../whisk/core/containerpool/test/ContainerPoolTests.scala     |  3 ++-
 .../whisk/core/containerpool/test/ContainerProxyTests.scala    |  5 +++--
 .../whisk/core/loadBalancer/test/InvokerSupervisionTests.scala |  1 +
 14 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index d977a61..8da3d1a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -55,6 +55,7 @@ case class ActivationMessage(
     activationId: ActivationId,
     activationNamespace: EntityPath,
     rootControllerIndex: InstanceId,
+    blocking: Boolean,
     content: Option[JsObject],
     cause: Option[ActivationId] = None)
     extends Message {
@@ -82,7 +83,7 @@ object ActivationMessage extends DefaultJsonProtocol {
     def parse(msg: String) = Try(serdes.read(msg.parseJson))
 
     private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-    implicit val serdes = jsonFormat9(ActivationMessage.apply)
+    implicit val serdes = jsonFormat10(ActivationMessage.apply)
 }
 
 /**
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 705e722..d700062 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -46,6 +46,7 @@ protected[core] case class ActivationResponse private (
     def isApplicationError = statusCode == ActivationResponse.ApplicationError
     def isContainerError = statusCode == ActivationResponse.ContainerError
     def isWhiskError = statusCode == ActivationResponse.WhiskError
+    def withoutResult = ActivationResponse(statusCode, None)
 
     override def toString = toJsonObject.compactPrint
 }
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
index 0fe0dcd..1904fe1 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
@@ -90,6 +90,7 @@ case class WhiskActivation(
         }
     }
 
+    def withoutLogsOrResult = copy(response = response.withoutResult, logs = ActivationLogs()).revision[WhiskActivation](rev)
     def withoutLogs = copy(logs = ActivationLogs()).revision[WhiskActivation](rev)
     def withLogs(logs: ActivationLogs) = copy(logs = logs).revision[WhiskActivation](rev)
 }
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index f75750e..7fd3231 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -232,7 +232,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
         override val logging: Logging,
         override val whiskConfig: WhiskConfig)
     extends WhiskActionsApi with WhiskServices {
-        logging.info(this, s"actionSequenceLimit '${whiskConfig.actionSequenceLimit}'")
+        logging.info(this, s"actionSequenceLimit '${whiskConfig.actionSequenceLimit}'")(TransactionId.controller)
         assert(whiskConfig.actionSequenceLimit.toInt > 0)
     }
 
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f07347a..8fee625 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -109,6 +109,7 @@ protected[actions] trait PrimitiveActions {
             activationIdFactory.make(), // activation id created here
             activationNamespace = user.namespace.toPath,
             activeAckTopicIndex,
+            waitForResponse.isDefined,
             args,
             cause = cause)
 
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index b9567c5..b184d50 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -34,7 +34,7 @@ import whisk.core.loadBalancer.LoadBalancer
  */
 class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(implicit logging: Logging) {
 
-    logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")
+    logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")(TransactionId.controller)
 
     /**
      * Checks whether the operation should be allowed to proceed.
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 51598d1..2ae224b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -308,6 +308,7 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
                 activationId = new ActivationIdGenerator {}.make(),
                 activationNamespace = action.namespace,
                 rootControllerIndex = controllerInstance,
+                blocking = false,
                 content = None)
 
             context.parent ! ActivationRequest(activationMessage, invokerInstance)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 1ff22d8..7684fa6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -95,7 +95,7 @@ class LoadBalancerService(
 
     /** How many invokers are dedicated to blackbox images.  We range bound to something sensical regardless of configuration. */
     private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, config.controllerBlackboxFraction))
-    logging.info(this, s"blackboxFraction = $blackboxFraction")
+    logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
 
     private val loadBalancerData = new LoadBalancerData()
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 1c3c4f1..a703190 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -89,7 +89,7 @@ case object ContainerRemoved
  */
 class ContainerProxy(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
-    sendActiveAck: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
+    sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
     storeActivation: (TransactionId, WhiskActivation) => Future[Any],
     instance: InstanceId,
     unusedTimeout: FiniteDuration,
@@ -147,7 +147,7 @@ class ContainerProxy(
                     // implicitly via a FailureMessage which will be processed later when the state
                     // transitions to Running
                     val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
-                    sendActiveAck(transid, activation, job.msg.rootControllerIndex)
+                    sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex)
                     storeActivation(transid, activation)
             }.flatMap {
                 container =>
@@ -349,7 +349,7 @@ class ContainerProxy(
         // asynchronous.
         activation.andThen {
             // the activation future will always complete with Success
-            case Success(ack) => sendActiveAck(tid, ack, job.msg.rootControllerIndex)
+            case Success(ack) => sendActiveAck(tid, ack, job.msg.blocking, job.msg.rootControllerIndex)
         }.flatMap { activation =>
             container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs =>
                 activation.withLogs(ActivationLogs(logs.toVector))
@@ -367,7 +367,7 @@ class ContainerProxy(
 
 object ContainerProxy {
     def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
-              ack: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
+              ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
               store: (TransactionId, WhiskActivation) => Future[Any],
               instance: InstanceId,
               unusedTimeout: FiniteDuration = 10.minutes,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 3647691..7ddde48 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -119,18 +119,19 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     }
 
     /** Sends an active-ack. */
-    val ack = (tid: TransactionId, activationResult: WhiskActivation, controllerInstance: InstanceId) => {
+    val ack = (tid: TransactionId, activationResult: WhiskActivation, blockingInvoke: Boolean, controllerInstance: InstanceId) => {
         implicit val transid = tid
 
         def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
             val msg = CompletionMessage(transid, res, instance)
+
             producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
                 case Success(_) =>
                     logging.info(this, s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
             }
         }
 
-        send(Right(activationResult)).recoverWith {
+        send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
             case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
                 send(Left(activationResult.activationId), recovery = true)
         }
@@ -220,7 +221,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
                             })
 
                         activationFeed ! MessageFeed.Processed
-                        ack(msg.transid, activation, msg.rootControllerIndex)
+                        ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex)
                         store(msg.transid, activation)
                         Future.successful(())
                 }
diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala
index e6c0aad..734a395 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -47,6 +47,15 @@ class LoggedFunction3[A1, A2, A3, B](body: (A1, A2, A3) => B) extends Function3[
     }
 }
 
+class LoggedFunction4[A1, A2, A3, A4, B](body: (A1, A2, A3, A4) => B) extends Function4[A1, A2, A3, A4, B] {
+    val calls = mutable.Buffer[(A1, A2, A3, A4)]()
+
+    override def apply(v1: A1, v2: A2, v3: A3, v4: A4): B = {
+        calls += ((v1, v2, v3, v4))
+        body(v1, v2, v3, v4)
+    }
+}
+
 class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) extends Function5[A1, A2, A3, A4, A5, B] {
     val calls = mutable.Buffer[(A1, A2, A3, A4, A5)]()
 
@@ -59,5 +68,6 @@ class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) ex
 object LoggedFunction {
     def apply[A1, A2, B](body: (A1, A2) => B) = new LoggedFunction2(body)
     def apply[A1, A2, A3, B](body: (A1, A2, A3) => B) = new LoggedFunction3(body)
+    def apply[A1, A2, A3, A4, B](body: (A1, A2, A3, A4) => B) = new LoggedFunction4(body)
     def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5(body)
 }
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 1988888..a631e91 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -78,7 +78,8 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
             ActivationId(),
             invocationNamespace.toPath,
             InstanceId(0),
-            None)
+            blocking = false,
+            content = None)
         Run(action, message)
     }
 
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index e3dc148..32a340e 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -79,7 +79,8 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
         ActivationId(),
         invocationNamespace.toPath,
         InstanceId(0),
-        None)
+        blocking = false,
+        content = None)
 
     /*
      * Helpers for assertions and actor lifecycles
@@ -129,7 +130,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
     }
 
     /** Creates an inspectable version of the ack method, which records all calls in a buffer */
-    def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation, _: InstanceId) => Future.successful(()) }
+    def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation, _: Boolean, _: InstanceId) => Future.successful(()) }
 
     /** Creates an inspectable factory */
     def createFactory(response: Future[Container]) = LoggedFunction {
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 50db9a6..1fc7166 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -194,6 +194,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
             activationId = new ActivationIdGenerator {}.make(),
             activationNamespace = EntityPath("guest"),
             rootControllerIndex = InstanceId(0),
+            blocking = false,
             content = None)
         val msg = ActivationRequest(activationMessage, invokerInstance)
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].