You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/09/05 07:35:27 UTC
[incubator-openwhisk] branch master updated: Add blocking indicator
for activation request. (#2677)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 9adf702 Add blocking indicator for activation request. (#2677)
9adf702 is described below
commit 9adf7026c7b8d8cb8f13997d36a704bd344b58ae
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Tue Sep 5 03:35:24 2017 -0400
Add blocking indicator for activation request. (#2677)
This allows non blocking requests to drop their results from the activeack response.
---
common/scala/src/main/scala/whisk/core/connector/Message.scala | 3 ++-
.../src/main/scala/whisk/core/entity/ActivationResult.scala | 1 +
.../src/main/scala/whisk/core/entity/WhiskActivation.scala | 1 +
.../src/main/scala/whisk/core/controller/RestAPIs.scala | 2 +-
.../scala/whisk/core/controller/actions/PrimitiveActions.scala | 1 +
.../scala/whisk/core/entitlement/ActivationThrottler.scala | 2 +-
.../scala/whisk/core/loadBalancer/InvokerSupervision.scala | 1 +
.../scala/whisk/core/loadBalancer/LoadBalancerService.scala | 2 +-
.../main/scala/whisk/core/containerpool/ContainerProxy.scala | 8 ++++----
.../src/main/scala/whisk/core/invoker/InvokerReactive.scala | 7 ++++---
tests/src/test/scala/common/LoggedFunction.scala | 10 ++++++++++
.../whisk/core/containerpool/test/ContainerPoolTests.scala | 3 ++-
.../whisk/core/containerpool/test/ContainerProxyTests.scala | 5 +++--
.../whisk/core/loadBalancer/test/InvokerSupervisionTests.scala | 1 +
14 files changed, 33 insertions(+), 14 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index d977a61..8da3d1a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -55,6 +55,7 @@ case class ActivationMessage(
activationId: ActivationId,
activationNamespace: EntityPath,
rootControllerIndex: InstanceId,
+ blocking: Boolean,
content: Option[JsObject],
cause: Option[ActivationId] = None)
extends Message {
@@ -82,7 +83,7 @@ object ActivationMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
- implicit val serdes = jsonFormat9(ActivationMessage.apply)
+ implicit val serdes = jsonFormat10(ActivationMessage.apply)
}
/**
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 705e722..d700062 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -46,6 +46,7 @@ protected[core] case class ActivationResponse private (
def isApplicationError = statusCode == ActivationResponse.ApplicationError
def isContainerError = statusCode == ActivationResponse.ContainerError
def isWhiskError = statusCode == ActivationResponse.WhiskError
+ def withoutResult = ActivationResponse(statusCode, None)
override def toString = toJsonObject.compactPrint
}
diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
index 0fe0dcd..1904fe1 100644
--- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala
@@ -90,6 +90,7 @@ case class WhiskActivation(
}
}
+ def withoutLogsOrResult = copy(response = response.withoutResult, logs = ActivationLogs()).revision[WhiskActivation](rev)
def withoutLogs = copy(logs = ActivationLogs()).revision[WhiskActivation](rev)
def withLogs(logs: ActivationLogs) = copy(logs = logs).revision[WhiskActivation](rev)
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index f75750e..7fd3231 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -232,7 +232,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)(
override val logging: Logging,
override val whiskConfig: WhiskConfig)
extends WhiskActionsApi with WhiskServices {
- logging.info(this, s"actionSequenceLimit '${whiskConfig.actionSequenceLimit}'")
+ logging.info(this, s"actionSequenceLimit '${whiskConfig.actionSequenceLimit}'")(TransactionId.controller)
assert(whiskConfig.actionSequenceLimit.toInt > 0)
}
diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index f07347a..8fee625 100644
--- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -109,6 +109,7 @@ protected[actions] trait PrimitiveActions {
activationIdFactory.make(), // activation id created here
activationNamespace = user.namespace.toPath,
activeAckTopicIndex,
+ waitForResponse.isDefined,
args,
cause = cause)
diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index b9567c5..b184d50 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -34,7 +34,7 @@ import whisk.core.loadBalancer.LoadBalancer
*/
class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: Int, systemOverloadLimit: Int)(implicit logging: Logging) {
- logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")
+ logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, systemOverloadLimit = $systemOverloadLimit")(TransactionId.controller)
/**
* Checks whether the operation should be allowed to proceed.
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 51598d1..2ae224b 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -308,6 +308,7 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId)
activationId = new ActivationIdGenerator {}.make(),
activationNamespace = action.namespace,
rootControllerIndex = controllerInstance,
+ blocking = false,
content = None)
context.parent ! ActivationRequest(activationMessage, invokerInstance)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 1ff22d8..7684fa6 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -95,7 +95,7 @@ class LoadBalancerService(
/** How many invokers are dedicated to blackbox images. We range bound to something sensical regardless of configuration. */
private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, config.controllerBlackboxFraction))
- logging.info(this, s"blackboxFraction = $blackboxFraction")
+ logging.info(this, s"blackboxFraction = $blackboxFraction")(TransactionId.loadbalancer)
private val loadBalancerData = new LoadBalancerData()
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 1c3c4f1..a703190 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -89,7 +89,7 @@ case object ContainerRemoved
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
+ sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
storeActivation: (TransactionId, WhiskActivation) => Future[Any],
instance: InstanceId,
unusedTimeout: FiniteDuration,
@@ -147,7 +147,7 @@ class ContainerProxy(
// implicitly via a FailureMessage which will be processed later when the state
// transitions to Running
val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
- sendActiveAck(transid, activation, job.msg.rootControllerIndex)
+ sendActiveAck(transid, activation, job.msg.blocking, job.msg.rootControllerIndex)
storeActivation(transid, activation)
}.flatMap {
container =>
@@ -349,7 +349,7 @@ class ContainerProxy(
// asynchronous.
activation.andThen {
// the activation future will always complete with Success
- case Success(ack) => sendActiveAck(tid, ack, job.msg.rootControllerIndex)
+ case Success(ack) => sendActiveAck(tid, ack, job.msg.blocking, job.msg.rootControllerIndex)
}.flatMap { activation =>
container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs =>
activation.withLogs(ActivationLogs(logs.toVector))
@@ -367,7 +367,7 @@ class ContainerProxy(
object ContainerProxy {
def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container],
- ack: (TransactionId, WhiskActivation, InstanceId) => Future[Any],
+ ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
store: (TransactionId, WhiskActivation) => Future[Any],
instance: InstanceId,
unusedTimeout: FiniteDuration = 10.minutes,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 3647691..7ddde48 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -119,18 +119,19 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
}
/** Sends an active-ack. */
- val ack = (tid: TransactionId, activationResult: WhiskActivation, controllerInstance: InstanceId) => {
+ val ack = (tid: TransactionId, activationResult: WhiskActivation, blockingInvoke: Boolean, controllerInstance: InstanceId) => {
implicit val transid = tid
def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
val msg = CompletionMessage(transid, res, instance)
+
producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
case Success(_) =>
logging.info(this, s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
}
}
- send(Right(activationResult)).recoverWith {
+ send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
send(Left(activationResult.activationId), recovery = true)
}
@@ -220,7 +221,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
})
activationFeed ! MessageFeed.Processed
- ack(msg.transid, activation, msg.rootControllerIndex)
+ ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex)
store(msg.transid, activation)
Future.successful(())
}
diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala
index e6c0aad..734a395 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -47,6 +47,15 @@ class LoggedFunction3[A1, A2, A3, B](body: (A1, A2, A3) => B) extends Function3[
}
}
+class LoggedFunction4[A1, A2, A3, A4, B](body: (A1, A2, A3, A4) => B) extends Function4[A1, A2, A3, A4, B] {
+ val calls = mutable.Buffer[(A1, A2, A3, A4)]()
+
+ override def apply(v1: A1, v2: A2, v3: A3, v4: A4): B = {
+ calls += ((v1, v2, v3, v4))
+ body(v1, v2, v3, v4)
+ }
+}
+
class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) extends Function5[A1, A2, A3, A4, A5, B] {
val calls = mutable.Buffer[(A1, A2, A3, A4, A5)]()
@@ -59,5 +68,6 @@ class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) ex
object LoggedFunction {
def apply[A1, A2, B](body: (A1, A2) => B) = new LoggedFunction2(body)
def apply[A1, A2, A3, B](body: (A1, A2, A3) => B) = new LoggedFunction3(body)
+ def apply[A1, A2, A3, A4, B](body: (A1, A2, A3, A4) => B) = new LoggedFunction4(body)
def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5(body)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 1988888..a631e91 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -78,7 +78,8 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool"))
ActivationId(),
invocationNamespace.toPath,
InstanceId(0),
- None)
+ blocking = false,
+ content = None)
Run(action, message)
}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index e3dc148..32a340e 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -79,7 +79,8 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
ActivationId(),
invocationNamespace.toPath,
InstanceId(0),
- None)
+ blocking = false,
+ content = None)
/*
* Helpers for assertions and actor lifecycles
@@ -129,7 +130,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys"))
}
/** Creates an inspectable version of the ack method, which records all calls in a buffer */
- def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation, _: InstanceId) => Future.successful(()) }
+ def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation, _: Boolean, _: InstanceId) => Future.successful(()) }
/** Creates an inspectable factory */
def createFactory(response: Future[Container]) = LoggedFunction {
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 50db9a6..1fc7166 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -194,6 +194,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision"))
activationId = new ActivationIdGenerator {}.make(),
activationNamespace = EntityPath("guest"),
rootControllerIndex = InstanceId(0),
+ blocking = false,
content = None)
val msg = ActivationRequest(activationMessage, invokerInstance)
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].