You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/12/06 07:22:42 UTC

[GitHub] cbickel closed pull request #4145: Record the blocking activation in the proper map before the request is sent to the invoker.

cbickel closed pull request #4145: Record the blocking activation in the proper map before the request is sent to the invoker.
URL: https://github.com/apache/incubator-openwhisk/pull/4145
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4010cc1e54..5ddcd1195d 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -174,8 +174,8 @@ class ShardingContainerPoolBalancer(
   }
 
   /** State related to invocations and throttling */
-  protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]()
-  protected[loadBalancer] val blockingPromises =
+  protected[loadBalancer] val activationSlots = TrieMap[ActivationId, ActivationEntry]()
+  protected[loadBalancer] val activationPromises =
     TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
@@ -264,14 +264,8 @@ class ShardingContainerPoolBalancer(
 
     chosen
       .map { invoker =>
-        setupActivation(msg, action, invoker)
-        sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
-          if (msg.blocking) {
-            blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
-          } else {
-            Future.successful(Left(msg.activationId))
-          }
-        }
+        val activationResult = setupActivation(msg, action, invoker)
+        sendActivationToInvoker(messageProducer, msg, invoker).map(_ => activationResult)
       }
       .getOrElse {
         // report the state of all invokers
@@ -286,10 +280,17 @@ class ShardingContainerPoolBalancer(
       }
   }
 
-  /** 2. Update local state with the to be executed activation */
+  /**
+   * 2. Update local state with the to be executed activation.
+   *
+   * All activations are tracked in the activationSlots map. Additionally, blocking invokes
+   * are tracked in the activation results map. When a result is received via activeack, it
+   * will cause the result to be forwarded to the caller waiting on the result, and cancel
+   * the DB poll which is also trying to do the same.
+   */
   private def setupActivation(msg: ActivationMessage,
                               action: ExecutableWhiskActionMetaData,
-                              instance: InvokerInstanceId): ActivationEntry = {
+                              instance: InvokerInstanceId): Future[Either[ActivationId, WhiskActivation]] = {
 
     totalActivations.increment()
     totalActivationMemory.add(action.limits.memory.megabytes)
@@ -301,11 +302,15 @@ class ShardingContainerPoolBalancer(
     // to allow in your topics before you start reporting failed activations.
     val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute
 
+    val resultPromise = if (msg.blocking) {
+      activationPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
+    } else Future.successful(Left(msg.activationId))
+
     // Install a timeout handler for the catastrophic case where an active ack is not received at all
     // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
-    // the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
+    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
     // in this case, if the activation handler is still registered, remove it and update the books.
-    activations.getOrElseUpdate(
+    activationSlots.getOrElseUpdate(
       msg.activationId, {
         val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
           processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
@@ -321,6 +326,8 @@ class ShardingContainerPoolBalancer(
           action.fullyQualifiedName(true),
           timeoutHandler)
       })
+
+    resultPromise
   }
 
   private val messageProducer = messagingProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
@@ -389,10 +396,10 @@ class ShardingContainerPoolBalancer(
   private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
     val aid = response.fold(l => l, r => r.activationId)
 
-    // Resolve the promise to send the result back to the user
-    // The activation will be removed from `activations`-map later, when we receive the completion message, because the
-    // slot of the invoker is not yet free for new activations.
-    blockingPromises.remove(aid).map(_.trySuccess(response))
+    // Resolve the promise to send the result back to the user.
+    // The activation will be removed from the activation slots later, when the completion message
+    // is received (because the slot in the invoker is not yet free for new activations).
+    activationPromises.remove(aid).foreach(_.trySuccess(response))
     logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
@@ -415,7 +422,7 @@ class ShardingContainerPoolBalancer(
       }
     }
 
-    activations.remove(aid) match {
+    activationSlots.remove(aid) match {
       case Some(entry) =>
         totalActivations.decrement()
         totalActivationMemory.add(entry.memory.toMB * (-1))
@@ -425,9 +432,15 @@ class ShardingContainerPoolBalancer(
           .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt))
         if (!forced) {
           entry.timeoutHandler.cancel()
+          // notice here that the activationPromises is not touched, because the expectation is that
+          // the active ack is received as expected, and processing that message removed the promise
+          // from the corresponding map
         } else {
-          // remove blocking promise when timeout, if the ResultMessage is already processed, this will do nothing
-          blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no completion ack received")))
+          // the entry has timed out; if the active ack is still around, remove its entry also
+          // and complete the promise with a failure if necessary
+          activationPromises
+            .remove(aid)
+            .foreach(_.tryFailure(new Throwable("no completion or active ack received yet")))
         }
 
         logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
@@ -710,7 +723,7 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
 case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, timeoutFactor: Int)
 
 /**
- * State kept for each activation until completion.
+ * State kept for each activation slot until completion.
  *
  * @param id id of the activation
  * @param namespaceId namespace that invoked the action
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 0c24b318da..2c34b01308 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -509,7 +509,7 @@ class ShardingContainerPoolBalancerTests
 
     //complete all
     val acks = ids.par.map { aid =>
-      val invoker = balancer.activations(aid).invokerName
+      val invoker = balancer.activationSlots(aid).invokerName
       completeActivation(invoker, balancer, aid)
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services