You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/11/23 21:29:28 UTC

[GitHub] rabbah closed pull request #4115: Ensure, that Result-ack is sent before Completion-ack.

rabbah closed pull request #4115: Ensure, that Result-ack is sent before Completion-ack.
URL: https://github.com/apache/incubator-openwhisk/pull/4115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 0c126a2ecc..0fdea5eb6e 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -24,6 +24,7 @@ import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
 import akka.pattern.pipe
 import pureconfig.loadConfigOrThrow
+
 import scala.collection.immutable
 import spray.json.DefaultJsonProtocol._
 import spray.json._
@@ -35,6 +36,7 @@ import org.apache.openwhisk.core.database.UserContext
 import org.apache.openwhisk.core.entity.ExecManifest.ImageName
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
 import org.apache.openwhisk.http.Messages
 
 import scala.concurrent.Future
@@ -127,7 +129,7 @@ case object RunCompleted
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
+  sendActiveAck: ActiveAck,
   storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InvokerInstanceId,
@@ -496,10 +498,16 @@ class ContainerProxy(
             ActivationResponse.whiskError(Messages.abnormalRun))
       }
 
-    // Sending active ack. Entirely asynchronous and not waited upon.
-    if (job.msg.blocking) {
-      activation.foreach(
+    // Sending an active ack is an asynchronous operation. The result is forwarded as soon as
+    // possible for blocking activations so that dependent activations can be scheduled. The
+    // completion message which frees a load balancer slot is sent after the active ack future
+    // completes to ensure proper ordering.
+    val sendResult = if (job.msg.blocking) {
+      activation.map(
         sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
+    } else {
+      // For non-blocking request, do not forward the result.
+      Future.successful(())
     }
 
     val context = UserContext(job.msg.user)
@@ -530,8 +538,17 @@ class ContainerProxy(
     activationWithLogs
       .map(_.fold(_.activation, identity))
       .foreach { activation =>
-        // Sending the completionMessage to the controller asynchronously.
-        sendActiveAck(tid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
+        // Sending the completion message to the controller after the active ack ensures proper ordering
+        // (result is received before the completion message for blocking invokes).
+        sendResult.onComplete(
+          _ =>
+            sendActiveAck(
+              tid,
+              activation,
+              job.msg.blocking,
+              job.msg.rootControllerIndex,
+              job.msg.user.namespace.uuid,
+              true))
         // Storing the record. Entirely asynchronous and not waited upon.
         storeActivation(tid, activation, context)
       }
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index da86843875..ab3051044b 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -43,6 +43,23 @@ import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
+object InvokerReactive {
+
+  /**
+   * An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
+   * are either completion messages for an activation to indicate a resource slot is free, or result-forwarding
+   * messages for continuations (e.g., sequences and conductor actions).
+   *
+   * @param TransactionId the transaction id for the activation
+   * @param WhiskActivaiton is the activation result
+   * @param Boolean is true iff the activation was a blocking request
+   * @param ControllerInstanceId the originating controller/loadbalancer id
+   * @param UUID is the UUID for the namespace owning the activation
+   * @param Boolean is true this is resource free message and false if this is a result forwarding message
+   */
+  type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any]
+}
+
 class InvokerReactive(
   config: WhiskConfig,
   instance: InvokerInstanceId,
@@ -115,12 +132,12 @@ class InvokerReactive(
   })
 
   /** Sends an active-ack. */
-  private val ack = (tid: TransactionId,
-                     activationResult: WhiskActivation,
-                     blockingInvoke: Boolean,
-                     controllerInstance: ControllerInstanceId,
-                     userId: UUID,
-                     isSlotFree: Boolean) => {
+  private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
+                                                activationResult: WhiskActivation,
+                                                blockingInvoke: Boolean,
+                                                controllerInstance: ControllerInstanceId,
+                                                userId: UUID,
+                                                isSlotFree: Boolean) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services