You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/12/06 07:22:46 UTC

[incubator-openwhisk] branch master updated: Record the blocking activation in the proper map before the request is sent to the invoker. (#4145)

This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 54a2f22  Record the blocking activation in the proper map before the request is sent to the invoker. (#4145)
54a2f22 is described below

commit 54a2f228b744f88cfe3186b10f00e9cb80309886
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Thu Dec 6 02:22:40 2018 -0500

    Record the blocking activation in the proper map before the request is sent to the invoker. (#4145)
---
 .../ShardingContainerPoolBalancer.scala            | 57 +++++++++++++---------
 .../test/ShardingContainerPoolBalancerTests.scala  |  2 +-
 2 files changed, 36 insertions(+), 23 deletions(-)

diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4010cc1..5ddcd11 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -174,8 +174,8 @@ class ShardingContainerPoolBalancer(
   }
 
   /** State related to invocations and throttling */
-  protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]()
-  protected[loadBalancer] val blockingPromises =
+  protected[loadBalancer] val activationSlots = TrieMap[ActivationId, ActivationEntry]()
+  protected[loadBalancer] val activationPromises =
     TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
@@ -264,14 +264,8 @@ class ShardingContainerPoolBalancer(
 
     chosen
       .map { invoker =>
-        setupActivation(msg, action, invoker)
-        sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
-          if (msg.blocking) {
-            blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
-          } else {
-            Future.successful(Left(msg.activationId))
-          }
-        }
+        val activationResult = setupActivation(msg, action, invoker)
+        sendActivationToInvoker(messageProducer, msg, invoker).map(_ => activationResult)
       }
       .getOrElse {
         // report the state of all invokers
@@ -286,10 +280,17 @@ class ShardingContainerPoolBalancer(
       }
   }
 
-  /** 2. Update local state with the to be executed activation */
+  /**
+   * 2. Update local state with the to be executed activation.
+   *
+   * All activations are tracked in the activationSlots map. Additionally, blocking invokes
+   * are tracked in the activation results map. When a result is received via activeack, it
+   * will cause the result to be forwarded to the caller waiting on the result, and cancel
+   * the DB poll which is also trying to do the same.
+   */
   private def setupActivation(msg: ActivationMessage,
                               action: ExecutableWhiskActionMetaData,
-                              instance: InvokerInstanceId): ActivationEntry = {
+                              instance: InvokerInstanceId): Future[Either[ActivationId, WhiskActivation]] = {
 
     totalActivations.increment()
     totalActivationMemory.add(action.limits.memory.megabytes)
@@ -301,11 +302,15 @@ class ShardingContainerPoolBalancer(
     // to allow in your topics before you start reporting failed activations.
     val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute
 
+    val resultPromise = if (msg.blocking) {
+      activationPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
+    } else Future.successful(Left(msg.activationId))
+
     // Install a timeout handler for the catastrophic case where an active ack is not received at all
     // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
-    // the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
+    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
     // in this case, if the activation handler is still registered, remove it and update the books.
-    activations.getOrElseUpdate(
+    activationSlots.getOrElseUpdate(
       msg.activationId, {
         val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
           processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
@@ -321,6 +326,8 @@ class ShardingContainerPoolBalancer(
           action.fullyQualifiedName(true),
           timeoutHandler)
       })
+
+    resultPromise
   }
 
   private val messageProducer = messagingProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
@@ -389,10 +396,10 @@ class ShardingContainerPoolBalancer(
   private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
     val aid = response.fold(l => l, r => r.activationId)
 
-    // Resolve the promise to send the result back to the user
-    // The activation will be removed from `activations`-map later, when we receive the completion message, because the
-    // slot of the invoker is not yet free for new activations.
-    blockingPromises.remove(aid).map(_.trySuccess(response))
+    // Resolve the promise to send the result back to the user.
+    // The activation will be removed from the activation slots later, when the completion message
+    // is received (because the slot in the invoker is not yet free for new activations).
+    activationPromises.remove(aid).foreach(_.trySuccess(response))
     logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
@@ -415,7 +422,7 @@ class ShardingContainerPoolBalancer(
       }
     }
 
-    activations.remove(aid) match {
+    activationSlots.remove(aid) match {
       case Some(entry) =>
         totalActivations.decrement()
         totalActivationMemory.add(entry.memory.toMB * (-1))
@@ -425,9 +432,15 @@ class ShardingContainerPoolBalancer(
           .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt))
         if (!forced) {
           entry.timeoutHandler.cancel()
+          // notice here that the activationPromises is not touched, because the expectation is that
+          // the active ack is received as expected, and processing that message removed the promise
+          // from the corresponding map
         } else {
-          // remove blocking promise when timeout, if the ResultMessage is already processed, this will do nothing
-          blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no completion ack received")))
+          // the entry has timed out; if the active ack is still around, remove its entry also
+          // and complete the promise with a failure if necessary
+          activationPromises
+            .remove(aid)
+            .foreach(_.tryFailure(new Throwable("no completion or active ack received yet")))
         }
 
         logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
@@ -710,7 +723,7 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
 case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeoutFactor: Int)
 
 /**
- * State kept for each activation until completion.
+ * State kept for each activation slot until completion.
  *
  * @param id id of the activation
  * @param namespaceId namespace that invoked the action
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 0c24b31..2c34b01 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -509,7 +509,7 @@ class ShardingContainerPoolBalancerTests
 
     //complete all
     val acks = ids.par.map { aid =>
-      val invoker = balancer.activations(aid).invokerName
+      val invoker = balancer.activationSlots(aid).invokerName
       completeActivation(invoker, balancer, aid)
     }