You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/12/06 07:22:46 UTC
[incubator-openwhisk] branch master updated: Record the blocking
activation in the proper map before the request is sent to the invoker.
(#4145)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 54a2f22 Record the blocking activation in the proper map before the request is sent to the invoker. (#4145)
54a2f22 is described below
commit 54a2f228b744f88cfe3186b10f00e9cb80309886
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Thu Dec 6 02:22:40 2018 -0500
Record the blocking activation in the proper map before the request is sent to the invoker. (#4145)
---
.../ShardingContainerPoolBalancer.scala | 57 +++++++++++++---------
.../test/ShardingContainerPoolBalancerTests.scala | 2 +-
2 files changed, 36 insertions(+), 23 deletions(-)
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4010cc1..5ddcd11 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -174,8 +174,8 @@ class ShardingContainerPoolBalancer(
}
/** State related to invocations and throttling */
- protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]()
- protected[loadBalancer] val blockingPromises =
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId, ActivationEntry]()
+ protected[loadBalancer] val activationPromises =
TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
private val totalActivations = new LongAdder()
@@ -264,14 +264,8 @@ class ShardingContainerPoolBalancer(
chosen
.map { invoker =>
- setupActivation(msg, action, invoker)
- sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
- if (msg.blocking) {
- blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
- } else {
- Future.successful(Left(msg.activationId))
- }
- }
+ val activationResult = setupActivation(msg, action, invoker)
+ sendActivationToInvoker(messageProducer, msg, invoker).map(_ => activationResult)
}
.getOrElse {
// report the state of all invokers
@@ -286,10 +280,17 @@ class ShardingContainerPoolBalancer(
}
}
- /** 2. Update local state with the to be executed activation */
+ /**
+ * 2. Update local state with the to be executed activation.
+ *
+ * All activations are tracked in the activationSlots map. Additionally, blocking invokes
+ * are tracked in the activation results map. When a result is received via activeack, it
+ * will cause the result to be forwarded to the caller waiting on the result, and cancel
+ * the DB poll which is also trying to do the same.
+ */
private def setupActivation(msg: ActivationMessage,
action: ExecutableWhiskActionMetaData,
- instance: InvokerInstanceId): ActivationEntry = {
+ instance: InvokerInstanceId): Future[Either[ActivationId, WhiskActivation]] = {
totalActivations.increment()
totalActivationMemory.add(action.limits.memory.megabytes)
@@ -301,11 +302,15 @@ class ShardingContainerPoolBalancer(
// to allow in your topics before you start reporting failed activations.
val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute
+ val resultPromise = if (msg.blocking) {
+ activationPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else Future.successful(Left(msg.activationId))
+
// Install a timeout handler for the catastrophic case where an active ack is not received at all
// (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
- // the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
+ // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
// in this case, if the activation handler is still registered, remove it and update the books.
- activations.getOrElseUpdate(
+ activationSlots.getOrElseUpdate(
msg.activationId, {
val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
@@ -321,6 +326,8 @@ class ShardingContainerPoolBalancer(
action.fullyQualifiedName(true),
timeoutHandler)
})
+
+ resultPromise
}
private val messageProducer = messagingProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
@@ -389,10 +396,10 @@ class ShardingContainerPoolBalancer(
private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
val aid = response.fold(l => l, r => r.activationId)
- // Resolve the promise to send the result back to the user
- // The activation will be removed from `activations`-map later, when we receive the completion message, because the
- // slot of the invoker is not yet free for new activations.
- blockingPromises.remove(aid).map(_.trySuccess(response))
+ // Resolve the promise to send the result back to the user.
+ // The activation will be removed from the activation slots later, when the completion message
+ // is received (because the slot in the invoker is not yet free for new activations).
+ activationPromises.remove(aid).foreach(_.trySuccess(response))
logging.info(this, s"received result ack for '$aid'")(tid)
}
@@ -415,7 +422,7 @@ class ShardingContainerPoolBalancer(
}
}
- activations.remove(aid) match {
+ activationSlots.remove(aid) match {
case Some(entry) =>
totalActivations.decrement()
totalActivationMemory.add(entry.memory.toMB * (-1))
@@ -425,9 +432,15 @@ class ShardingContainerPoolBalancer(
.foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt))
if (!forced) {
entry.timeoutHandler.cancel()
+ // notice here that the activationPromises is not touched, because the expectation is that
+ // the active ack is received as expected, and processing that message removed the promise
+ // from the corresponding map
} else {
- // remove blocking promise when timeout, if the ResultMessage is already processed, this will do nothing
- blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no completion ack received")))
+ // the entry has timed out; if the active ack is still around, remove its entry also
+ // and complete the promise with a failure if necessary
+ activationPromises
+ .remove(aid)
+ .foreach(_.tryFailure(new Throwable("no completion or active ack received yet")))
}
logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
@@ -710,7 +723,7 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeoutFactor: Int)
/**
- * State kept for each activation until completion.
+ * State kept for each activation slot until completion.
*
* @param id id of the activation
* @param namespaceId namespace that invoked the action
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 0c24b31..2c34b01 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -509,7 +509,7 @@ class ShardingContainerPoolBalancerTests
//complete all
val acks = ids.par.map { aid =>
- val invoker = balancer.activations(aid).invokerName
+ val invoker = balancer.activationSlots(aid).invokerName
completeActivation(invoker, balancer, aid)
}