You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/09/21 08:10:59 UTC

[openwhisk] branch master updated: Combines active ack and slot release when both are available. (#4624)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new fac309c  Combines active ack and slot release when both are available. (#4624)
fac309c is described below

commit fac309c78a99c227978172279794ac34641cc4dc
Author: rodric rabbah <ro...@gmail.com>
AuthorDate: Sat Sep 21 01:10:45 2019 -0700

    Combines active ack and slot release when both are available. (#4624)
    
    Combine active ack and slot release when both are available. This commit changes the types of AcknowledegmentMessage exchanged on `completedxxx` topics to 3
    
    - CombinedCompletionAndResultMessage - Sent when the resource slot and the action result are available at the same time
    - ResultMessage - Sent once an action result is available for blocking actions
    - CompletionMessage - Sent once the resource slot in the invoker is free again
    
    This would ensure that the controller can quickly cleanup resources for comleted invocation when they result in error
    (instead of performing slow db polling)
---
 .../apache/openwhisk/core/connector/Message.scala  | 214 +++++++++++++++------
 .../org/apache/openwhisk/core/entity/Exec.scala    |   2 +-
 .../core/loadBalancer/CommonLoadBalancer.scala     |  29 +--
 .../core/containerpool/ContainerProxy.scala        |  27 ++-
 .../openwhisk/core/invoker/InvokerReactive.scala   | 114 ++++++-----
 .../tests/AcknowledgementMessageTests.scala        | 117 ++++++-----
 .../containerpool/test/ContainerProxyTests.scala   |  70 +++++--
 .../test/ShardingContainerPoolBalancerTests.scala  |   4 +-
 8 files changed, 393 insertions(+), 184 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 13fd435..9a1a586 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -64,101 +64,189 @@ case class ActivationMessage(override val transid: TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-    AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def messageType: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situations when the resource slot and the action
+ * result are available at the same time, and so the split-phase notification is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
+ *
+ * The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
+ * Right when this message is created.
  */
-case class CompletionMessage(override val transid: TransactionId,
-                             activationId: ActivationId,
-                             isSystemError: Boolean,
-                             invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
+                                                       response: Either[ActivationId, WhiskActivation],
+                                                       override val isSystemError: Option[Boolean],
+                                                       invoker: InvokerInstanceId)
     extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-    activationId.asString
-  }
+  override def messageType = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => Left(a.activationId)))
+  override def toString = activationId.asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The `CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage private (override val transid: TransactionId,
+                                      override val activationId: ActivationId,
+                                      override val isSystemError: Option[Boolean],
+                                      invoker: InvokerInstanceId)
+    extends AcknowledegmentMessage(transid) {
+  override def messageType = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action completion if the user wants to have
- * the result immediately (blocking activation).
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * This message is sent from an invoker to the load balancer once an action result is available for blocking actions.
+ * This is part of a split phase notification, and does not indicate that the slot is available, which is indicated with
+ * a `CompletionMessage`. Note that activation record will not contain any logs from the action execution, only the result.
+ *
+ * The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
+ * Right when this message is created.
  */
-case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
+case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
     extends AcknowledegmentMessage(transid) {
+  override def messageType = "result"
+  override def result = Some(response)
+  override def isSlotFree = None
+  override def isSystemError = response.fold(_ => None, a => Some(a.response.isWhiskError))
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = ResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => Left(a.activationId)))
+  override def toString = activationId.asString
+}
 
-  override def toString = {
-    response.fold(l => l, r => r.activationId).asString
+object ActivationMessage extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
+  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+  implicit val serdes = jsonFormat11(ActivationMessage.apply)
+}
+
+object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {
+  // this constructor is restricted to ensure the message is always created with certain invariants
+  private def apply(transid: TransactionId,
+                    activation: Either[ActivationId, WhiskActivation],
+                    isSystemError: Option[Boolean],
+                    invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
+    new CombinedCompletionAndResultMessage(transid, activation, isSystemError, invoker)
+
+  def apply(transid: TransactionId,
+            activation: WhiskActivation,
+            invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
+    new CombinedCompletionAndResultMessage(transid, Right(activation), Some(activation.response.isWhiskError), invoker)
+
+  implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+  implicit val serdes = jsonFormat4(
+    CombinedCompletionAndResultMessage
+      .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: Option[Boolean], _: InvokerInstanceId))
+}
+
+object CompletionMessage extends DefaultJsonProtocol {
+  // this constructor is restricted to ensure the message is always created with certain invariants
+  private def apply(transid: TransactionId,
+                    activation: WhiskActivation,
+                    isSystemError: Option[Boolean],
+                    invoker: InvokerInstanceId): CompletionMessage =
+    new CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), invoker)
+
+  def apply(transid: TransactionId, activation: WhiskActivation, invoker: InvokerInstanceId): CompletionMessage = {
+    new CompletionMessage(transid, activation.activationId, Some(activation.response.isWhiskError), invoker)
   }
+
+  implicit val serdes = jsonFormat4(
+    CompletionMessage.apply(_: TransactionId, _: ActivationId, _: Option[Boolean], _: InvokerInstanceId))
 }
 
 object ResultMessage extends DefaultJsonProtocol {
-  implicit def eitherResponse =
-    new JsonFormat[Either[ActivationId, WhiskActivation]] {
-      def write(either: Either[ActivationId, WhiskActivation]) = either match {
-        case Right(a) => a.toJson
-        case Left(b)  => b.toJson
-      }
+  // this constructor is restricted to ensure the message is always created with certain invariants
+  private def apply(transid: TransactionId, response: Either[ActivationId, WhiskActivation]): ResultMessage =
+    new ResultMessage(transid, response)
 
-      def read(value: JsValue) = value match {
-        // per the ActivationId's serializer, it is guaranteed to be a String even if it only consists of digits
-        case _: JsString => Left(value.convertTo[ActivationId])
-        case _: JsObject => Right(value.convertTo[WhiskActivation])
-        case _           => deserializationError("could not read ResultMessage")
-      }
-    }
+  def apply(transid: TransactionId, activation: WhiskActivation): ResultMessage =
+    new ResultMessage(transid, Right(activation))
 
-  def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat2(ResultMessage.apply)
+  implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+  implicit val serdes = jsonFormat2(ResultMessage.apply(_: TransactionId, _: Either[ActivationId, WhiskActivation]))
 }
 
 object AcknowledegmentMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[AcknowledegmentMessage] = {
-    Try(serdes.read(msg.parseJson))
+  def parse(msg: String): Try[AcknowledegmentMessage] = Try(serdes.read(msg.parseJson))
+
+  protected[connector] val eitherResponse = new JsonFormat[Either[ActivationId, WhiskActivation]] {
+    def write(either: Either[ActivationId, WhiskActivation]) = either.fold(_.toJson, _.toJson)
+
+    def read(value: JsValue) = value match {
+      case _: JsString =>
+        // per the ActivationId serializer, an activation id is a String even if it only consists of digits
+        Left(value.convertTo[ActivationId])
+
+      case _: JsObject => Right(value.convertTo[WhiskActivation])
+      case _           => deserializationError("could not read ResultMessage")
+    }
   }
 
   implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
-    override def write(obj: AcknowledegmentMessage): JsValue = {
-      obj match {
-        case c: CompletionMessage => c.toJson
-        case r: ResultMessage     => r.toJson
-      }
-    }
+    override def write(m: AcknowledegmentMessage): JsValue = m.toJson
 
+    // The field invoker is only part of CombinedCompletionAndResultMessage and CompletionMessage.
+    // If this field is part of the JSON, we try to deserialize into one of these two types,
+    // and otherwise to a ResultMessage. If all conversions fail, an error will be thrown that needs to be handled.
     override def read(json: JsValue): AcknowledegmentMessage = {
-      json.asJsObject
-      // The field invoker is only part of the CompletionMessage. If this field is part of the JSON, we try to convert
-      // it to a CompletionMessage. Otherwise to a ResultMessage.
-      // If both conversions fail, an error will be thrown that needs to be handled.
-        .getFields("invoker")
-        .headOption
-        .map(_ => json.convertTo[CompletionMessage])
-        .getOrElse(json.convertTo[ResultMessage])
+      val JsObject(fields) = json
+      val completion = fields.contains("invoker")
+      val result = fields.contains("response")
+      if (completion && result) {
+        json.convertTo[CombinedCompletionAndResultMessage]
+      } else if (completion) {
+        json.convertTo[CompletionMessage]
+      } else {
+        json.convertTo[ResultMessage]
+      }
     }
   }
 }
@@ -178,7 +266,7 @@ trait EventMessageBody extends Message {
 
 object EventMessageBody extends DefaultJsonProtocol {
 
-  implicit def format = new JsonFormat[EventMessageBody] {
+  implicit val format = new JsonFormat[EventMessageBody] {
     def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
       case m: Metric     => m.toJson
       case a: Activation => a.toJson
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
index e9be6cf..6830399 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
@@ -242,7 +242,7 @@ object Exec extends ArgNormalizer[Exec] with DefaultJsonProtocol {
   protected[core] val SEQUENCE = "sequence"
   protected[core] val BLACKBOX = "blackbox"
 
-  // This is for error cases while cannot get the `kind` of Exec
+  // This is for error cases where the action `kind` may not be known.
   protected[core] val UNKNOWN = "unknown"
 
   private def execManifests = ExecManifest.runtimesManifest
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 51bf4a8..2f6652e 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -204,17 +204,20 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
   protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
     val raw = new String(bytes, StandardCharsets.UTF_8)
     AcknowledegmentMessage.parse(raw) match {
-      case Success(m: CompletionMessage) =>
-        processCompletion(
-          m.activationId,
-          m.transid,
-          forced = false,
-          isSystemError = m.isSystemError,
-          invoker = m.invoker)
-        activationFeed ! MessageFeed.Processed
+      case Success(acknowledegment) =>
+        acknowledegment.isSlotFree.foreach { invoker =>
+          processCompletion(
+            acknowledegment.activationId,
+            acknowledegment.transid,
+            forced = false,
+            isSystemError = acknowledegment.isSystemError.getOrElse(false),
+            invoker)
+        }
+
+        acknowledegment.result.foreach { response =>
+          processResult(acknowledegment.activationId, acknowledegment.transid, response)
+        }
 
-      case Success(m: ResultMessage) =>
-        processResult(m.response, m.transid)
         activationFeed ! MessageFeed.Processed
 
       case Failure(t) =>
@@ -228,9 +231,9 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
   }
 
   /** 5. Process the result ack and return it to the user */
-  protected def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
-    val aid = response.fold(l => l, r => r.activationId)
-
+  protected def processResult(aid: ActivationId,
+                              tid: TransactionId,
+                              response: Either[ActivationId, WhiskActivation]): Unit = {
     // Resolve the promise to send the result back to the user.
     // The activation will be removed from the activation slots later, when the completion message
     // is received (because the slot in the invoker is not yet free for new activations).
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 47b2532..513dae8 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -18,17 +18,24 @@
 package org.apache.openwhisk.core.containerpool
 
 import java.time.Instant
+
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
 import akka.pattern.pipe
 import pureconfig.loadConfigOrThrow
+
 import scala.collection.immutable
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import org.apache.openwhisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
 import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.connector.{
+  ActivationMessage,
+  CombinedCompletionAndResultMessage,
+  CompletionMessage,
+  ResultMessage
+}
 import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
 import org.apache.openwhisk.core.database.UserContext
 import org.apache.openwhisk.core.entity.ExecManifest.ImageName
@@ -36,6 +43,7 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
 import org.apache.openwhisk.http.Messages
+
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
@@ -300,7 +308,7 @@ class ContainerProxy(
               job.msg.blocking,
               job.msg.rootControllerIndex,
               job.msg.user.namespace.uuid,
-              true)
+              CombinedCompletionAndResultMessage(transid, activation, instance))
             storeActivation(transid, activation, context)
         }
         .flatMap { container =>
@@ -628,8 +636,15 @@ class ContainerProxy(
     // completion message which frees a load balancer slot is sent after the active ack future
     // completes to ensure proper ordering.
     val sendResult = if (job.msg.blocking) {
-      activation.map(
-        sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
+      activation.map { result =>
+        sendActiveAck(
+          tid,
+          result,
+          job.msg.blocking,
+          job.msg.rootControllerIndex,
+          job.msg.user.namespace.uuid,
+          ResultMessage(tid, result))
+      }
     } else {
       // For non-blocking request, do not forward the result.
       Future.successful(())
@@ -673,7 +688,7 @@ class ContainerProxy(
               job.msg.blocking,
               job.msg.rootControllerIndex,
               job.msg.user.namespace.uuid,
-              true))
+              CompletionMessage(tid, activation, instance)))
         // Storing the record. Entirely asynchronous and not waited upon.
         storeActivation(tid, activation, context)
       }
@@ -699,7 +714,7 @@ object ContainerProxy {
               ByteSize,
               Int,
               Option[ExecutableWhiskAction]) => Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
+    ack: ActiveAck,
     store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InvokerInstanceId,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 14ccce3..86a44cb 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -27,7 +27,7 @@ import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.common.tracing.WhiskTracerProvider
-import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, _}
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
 import org.apache.openwhisk.core.database.{UserContext, _}
@@ -49,14 +49,21 @@ object InvokerReactive extends InvokerProvider {
    * are either completion messages for an activation to indicate a resource slot is free, or result-forwarding
    * messages for continuations (e.g., sequences and conductor actions).
    *
-   * @param TransactionId the transaction id for the activation
-   * @param WhiskActivaiton is the activation result
-   * @param Boolean is true iff the activation was a blocking request
-   * @param ControllerInstanceId the originating controller/loadbalancer id
-   * @param UUID is the UUID for the namespace owning the activation
-   * @param Boolean is true this is resource free message and false if this is a result forwarding message
+   * @param tid the transaction id for the activation
+   * @param activaiton is the activation result
+   * @param blockingInvoke is true iff the activation was a blocking request
+   * @param controllerInstance the originating controller/loadbalancer id
+   * @param userId is the UUID for the namespace owning the activation
+   * @param acknowledegment the acknowledgement message to send
    */
-  type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any]
+  trait ActiveAck {
+    def apply(tid: TransactionId,
+              activation: WhiskActivation, // the activation property is used primarily for testing
+              blockingInvoke: Boolean,
+              controllerInstance: ControllerInstanceId,
+              userId: UUID,
+              acknowledegment: AcknowledegmentMessage): Future[Any]
+  }
 
   override def instance(
     config: WhiskConfig,
@@ -145,44 +152,46 @@ class InvokerReactive(
     new MessageFeed("activation", logging, consumer, maxPeek, 1.second, processActivationMessage)
   })
 
-  /** Sends an active-ack. */
-  private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
-                                                activationResult: WhiskActivation,
-                                                blockingInvoke: Boolean,
-                                                controllerInstance: ControllerInstanceId,
-                                                userId: UUID,
-                                                isSlotFree: Boolean) => {
-    implicit val transid: TransactionId = tid
-
-    def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
-      val msg = if (isSlotFree) {
-        val aid = res.fold(identity, _.activationId)
-        val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
-        CompletionMessage(transid, aid, isWhiskSystemError, instance)
-      } else {
-        ResultMessage(transid, res)
+  private val ack = new InvokerReactive.ActiveAck {
+    override def apply(tid: TransactionId,
+                       activation: WhiskActivation,
+                       blockingInvoke: Boolean,
+                       controllerInstance: ControllerInstanceId,
+                       userId: UUID,
+                       acknowledegment: AcknowledegmentMessage): Future[Any] = {
+      implicit val transid: TransactionId = tid
+
+      def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
+        producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
+          case Success(_) =>
+            val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
+            logging.info(this, s"posted $info of activation ${acknowledegment.activationId}")
+        }
       }
 
-      producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
-        case Success(_) =>
-          logging.info(
-            this,
-            s"posted ${if (recovery) "recovery" else "completion"} of activation ${activationResult.activationId}")
+      // UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent.
+      if (UserEvents.enabled && acknowledegment.isSlotFree.nonEmpty) {
+        acknowledegment.result match {
+          case Some(Right(activationResult: WhiskActivation)) =>
+            EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
+              case Success(msg) => UserEvents.send(producer, msg)
+              case Failure(t)   => logging.error(this, s"activation event was not sent: $t")
+            }
+          case _ =>
+            // all acknowledegment messages should have a result
+            logging.error(this, s"activation event was not sent because the result is missing")
+        }
       }
-    }
 
-    // UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent.
-    if (UserEvents.enabled && isSlotFree) {
-      EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
-        case Success(msg) => UserEvents.send(producer, msg)
-        case Failure(t)   => logging.error(this, s"activation event was not sent: $t")
+      // An acknowledgement containing the result is only needed for blocking invokes in order to further the
+      // continuation. A result message for a non-blocking activation is not actually registered in the load balancer
+      // and the container proxy should not send such an acknowlegement unless it's a blocking request. Here the code
+      // is defensive and will shrink all non-blocking acknowledegments.
+      send(if (blockingInvoke) acknowledegment else acknowledegment.shrink).recoverWith {
+        case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
+          send(acknowledegment.shrink, recovery = true)
       }
     }
-
-    send(Right(if (blockingInvoke) activationResult else activationResult.withoutLogsOrResult)).recoverWith {
-      case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
-        send(Left(activationResult.activationId), recovery = true)
-    }
   }
 
   /** Stores an activation in the database. */
@@ -262,20 +271,35 @@ class InvokerReactive(
                     ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
                 }
 
-                val context = UserContext(msg.user)
-                val activation = generateFallbackActivation(msg, response)
                 activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true)
-                store(msg.transid, activation, context)
+
+                val activation = generateFallbackActivation(msg, response)
+                ack(
+                  msg.transid,
+                  activation,
+                  msg.blocking,
+                  msg.rootControllerIndex,
+                  msg.user.namespace.uuid,
+                  CombinedCompletionAndResultMessage(transid, activation, instance))
+
+                store(msg.transid, activation, UserContext(msg.user))
                 Future.successful(())
             }
         } else {
           // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
           // Due to the protective nature of the blacklist, a database entry is not written.
           activationFeed ! MessageFeed.Processed
+
           val activation =
             generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
-          ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true)
+          ack(
+            msg.transid,
+            activation,
+            false,
+            msg.rootControllerIndex,
+            msg.user.namespace.uuid,
+            CombinedCompletionAndResultMessage(transid, activation, instance))
+
           logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
           Future.successful(())
         }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
index 602cc60..5216bb8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
@@ -22,7 +22,12 @@ import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
 import spray.json._
 import org.apache.openwhisk.common.{TransactionId, WhiskInstants}
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, CompletionMessage, ResultMessage}
+import org.apache.openwhisk.core.connector.{
+  AcknowledegmentMessage,
+  CombinedCompletionAndResultMessage,
+  CompletionMessage,
+  ResultMessage
+}
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size.SizeInt
 
@@ -35,7 +40,7 @@ import scala.util.Success
 @RunWith(classOf[JUnitRunner])
 class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInstants {
 
-  behavior of "result message"
+  behavior of "acknowledgement message"
 
   val defaultUserMemory: ByteSize = 1024.MB
   val activation = WhiskActivation(
@@ -49,59 +54,85 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
     annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
     duration = Some(123))
 
-  it should "serialize a left result message" in {
-    val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+  it should "serialize and deserialize a Result message with Left result" in {
+    val m = ResultMessage(TransactionId.testing, activation).shrink
+    m.response shouldBe 'left
+    m.isSlotFree shouldBe empty
     m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.left.get.toJson).compactPrint
+    m.serialize shouldBe m.toJson.compactPrint
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
   }
 
-  it should "serialize a right result message" in {
-    val m =
-      ResultMessage(TransactionId.testing, Right(activation))
+  it should "serialize and deserialize a Result message with Right result" in {
+    val m = ResultMessage(TransactionId.testing, activation)
+    m.response shouldBe 'right
+    m.isSlotFree shouldBe empty
     m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.right.get.toJson).compactPrint
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
   }
 
-  it should "deserialize a left result message" in {
-    val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
-    ResultMessage.parse(m.serialize) shouldBe Success(m)
-  }
-
-  it should "deserialize a right result message" in {
-    val m =
-      ResultMessage(TransactionId.testing, Right(activation))
-    ResultMessage.parse(m.serialize) shouldBe Success(m)
-  }
-
-  behavior of "acknowledgement message"
-
-  it should "serialize a Completion message" in {
-    val c = CompletionMessage(
+  it should "serialize and deserialize a Completion message" in {
+    val m = CompletionMessage(
       TransactionId.testing,
       ActivationId.generate(),
-      false,
+      Some(false),
       InvokerInstanceId(0, userMemory = defaultUserMemory))
-    val m: AcknowledegmentMessage = c
-    m.serialize shouldBe c.toJson.compactPrint
+    m.isSlotFree should not be empty
+    m.serialize shouldBe m.toJson.compactPrint
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
   }
 
-  it should "serialize a Result message" in {
-    val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
-    val m: AcknowledegmentMessage = r
-    m.serialize shouldBe r.toJson.compactPrint
-  }
+  it should "serialize and deserialize a CombinedCompletionAndResultMessage" in {
+    withClue("system error false and right") {
+      val c = CombinedCompletionAndResultMessage(
+        TransactionId.testing,
+        activation,
+        InvokerInstanceId(0, userMemory = defaultUserMemory))
+      c.response shouldBe 'right
+      c.isSlotFree should not be empty
+      c.isSystemError shouldBe Some(false)
+      c.serialize shouldBe c.toJson.compactPrint
+      AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+    }
 
-  it should "deserialize a Completion message" in {
-    val c = CompletionMessage(
-      TransactionId.testing,
-      ActivationId.generate(),
-      false,
-      InvokerInstanceId(0, userMemory = defaultUserMemory))
-    val m: AcknowledegmentMessage = c
-    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(c)
-  }
+    withClue("system error true and right") {
+      val response = ActivationResponse.whiskError(JsString("error"))
+      val someActivation = activation.copy(response = response)
+      val c = CombinedCompletionAndResultMessage(
+        TransactionId.testing,
+        someActivation,
+        InvokerInstanceId(0, userMemory = defaultUserMemory))
+      c.response shouldBe 'right
+      c.isSlotFree should not be empty
+      c.isSystemError shouldBe Some(true)
+      c.serialize shouldBe c.toJson.compactPrint
+      AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+    }
+
+    withClue("system error false and left") {
+      val c = CombinedCompletionAndResultMessage(
+        TransactionId.testing,
+        activation,
+        InvokerInstanceId(0, userMemory = defaultUserMemory)).shrink
+      c.response shouldBe 'left
+      c.isSlotFree should not be empty
+      c.isSystemError shouldBe Some(false)
+      c.serialize shouldBe c.toJson.compactPrint
+      AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+    }
 
-  it should "deserialize a Result message" in {
-    val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
-    val m: AcknowledegmentMessage = r
-    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(r)
+    withClue("system error true and left") {
+      val response = ActivationResponse.whiskError(JsString("error"))
+      val someActivation = activation.copy(response = response)
+      val c = CombinedCompletionAndResultMessage(
+        TransactionId.testing,
+        someActivation,
+        InvokerInstanceId(0, userMemory = defaultUserMemory)).shrink
+      c.response shouldBe 'left
+      c.isSlotFree should not be empty
+      c.isSystemError shouldBe Some(true)
+      c.serialize shouldBe c.toJson.compactPrint
+      AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+    }
   }
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index e4d041d..af5df73 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,6 +18,7 @@
 package org.apache.openwhisk.core.containerpool.test
 
 import java.time.Instant
+
 import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
 import akka.actor.{ActorRef, ActorSystem, FSM}
 import akka.stream.scaladsl.Source
@@ -26,13 +27,14 @@ import akka.util.ByteString
 import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties}
 import java.time.temporal.ChronoUnit
 import java.util.concurrent.atomic.AtomicInteger
+
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import org.apache.openwhisk.common.{Logging, TransactionId}
-import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
 import org.apache.openwhisk.core.containerpool.WarmingData
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
@@ -41,6 +43,9 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.invoker.InvokerReactive
+
+import scala.collection.mutable
 import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -159,22 +164,65 @@ class ContainerProxyTests
     expectMsg(Transition(machine, Pausing, Paused))
   }
 
-  /** Creates an inspectable version of the ack method, which records all calls in a buffer */
-  def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: Boolean) =>
+  trait LoggedAcker extends InvokerReactive.ActiveAck {
+    def calls =
+      mutable.Buffer[(TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, AcknowledegmentMessage)]()
+
+    def verifyAnnotations(activation: WhiskActivation, a: ExecutableWhiskAction) = {
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
-      Future.successful(())
+    }
+  }
+
+  /** Creates an inspectable version of the ack method, which records all calls in a buffer */
+  def createAcker(a: ExecutableWhiskAction = action) = new LoggedAcker {
+    val acker = LoggedFunction {
+      (_: TransactionId,
+       activation: WhiskActivation,
+       _: Boolean,
+       _: ControllerInstanceId,
+       _: UUID,
+       _: AcknowledegmentMessage) =>
+        Future.successful(())
+    }
+
+    override def calls = acker.calls
+
+    override def apply(tid: TransactionId,
+                       activation: WhiskActivation,
+                       blockingInvoke: Boolean,
+                       controllerInstance: ControllerInstanceId,
+                       userId: UUID,
+                       acknowledegment: AcknowledegmentMessage): Future[Any] = {
+      verifyAnnotations(activation, a)
+      acker(tid, activation, blockingInvoke, controllerInstance, userId, acknowledegment)
+    }
   }
 
   /** Creates an synchronized inspectable version of the ack method, which records all calls in a buffer */
-  def createSyncAcker(a: ExecutableWhiskAction = action) = SynchronizedLoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: Boolean) =>
-      activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
-      activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
-      activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
-      Future.successful(())
+  def createSyncAcker(a: ExecutableWhiskAction = action) = new LoggedAcker {
+    val acker = SynchronizedLoggedFunction {
+      (_: TransactionId,
+       activation: WhiskActivation,
+       _: Boolean,
+       _: ControllerInstanceId,
+       _: UUID,
+       _: AcknowledegmentMessage) =>
+        Future.successful(())
+    }
+
+    override def calls = acker.calls
+
+    override def apply(tid: TransactionId,
+                       activation: WhiskActivation,
+                       blockingInvoke: Boolean,
+                       controllerInstance: ControllerInstanceId,
+                       userId: UUID,
+                       acknowledegment: AcknowledegmentMessage): Future[Any] = {
+      verifyAnnotations(activation, a)
+      acker(tid, activation, blockingInvoke, controllerInstance, userId, acknowledegment)
+    }
   }
 
   /** Creates an inspectable factory */
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 7646b49..df1cc3b 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -565,8 +565,8 @@ class ShardingContainerPoolBalancerTests
 
   def completeActivation(invoker: InvokerInstanceId, balancer: ShardingContainerPoolBalancer, aid: ActivationId) = {
     //complete activation
-    val ack = CompletionMessage(TransactionId.testing, aid, false, invoker).serialize
-      .getBytes(StandardCharsets.UTF_8)
+    val ack =
+      CompletionMessage(TransactionId.testing, aid, Some(false), invoker).serialize.getBytes(StandardCharsets.UTF_8)
     balancer.processAcknowledgement(ack)
   }
 }