You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/27 10:17:13 UTC

[GitHub] jiangpengcheng closed pull request #4135: Ensure ResultMessage is processed

jiangpengcheng closed pull request #4135: Ensure ResultMessage is processed
URL: https://github.com/apache/incubator-openwhisk/pull/4135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 35a45472a7..029d491fde 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -175,6 +175,8 @@ class ShardingContainerPoolBalancer(
 
   /** State related to invocations and throttling */
   protected[loadBalancer] val activations = TrieMap[ActivationId, ActivationEntry]()
+  protected[loadBalancer] val blockingPromises =
+    TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
   private val totalActivationMemory = new LongAdder()
@@ -264,7 +266,9 @@ class ShardingContainerPoolBalancer(
       .map { invoker =>
         val entry = setupActivation(msg, action, invoker)
         sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
-          entry.promise.future
+          if (msg.blocking) {
+            blockingPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future
+          } else entry.promise.future
         }
       }
       .getOrElse {
@@ -387,9 +391,7 @@ class ShardingContainerPoolBalancer(
     // Resolve the promise to send the result back to the user
     // The activation will be removed from `activations`-map later, when we receive the completion message, because the
     // slot of the invoker is not yet free for new activations.
-    activations.get(aid).map { entry =>
-      entry.promise.trySuccess(response)
-    }
+    blockingPromises.remove(aid).map(_.trySuccess(response))
     logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
@@ -422,13 +424,11 @@ class ShardingContainerPoolBalancer(
           .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt))
         if (!forced) {
           entry.timeoutHandler.cancel()
-          // If the action was blocking and the Resultmessage has been received before nothing will happen here.
-          // If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id,
-          // the controller will get the result out of the database.
-          // If the action was non-blocking, we will close the promise here.
           entry.promise.trySuccess(Left(aid))
         } else {
           entry.promise.tryFailure(new Throwable("no completion ack received"))
+          // remove blocking promise when timeout, if the ResultMessage is already processed, this will do nothing
+          blockingPromises.remove(aid).foreach(_.tryFailure(new Throwable("no result ack received")))
         }
 
         logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services