You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/10/31 13:58:21 UTC

[GitHub] vvraskin closed pull request #4041: Send active-ack after log collection for nonblocking activations.

vvraskin closed pull request #4041: Send active-ack after log collection for nonblocking activations.
URL: https://github.com/apache/incubator-openwhisk/pull/4041
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index ed677b112d..ebfb59b192 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -72,24 +72,51 @@ object ActivationMessage extends DefaultJsonProtocol {
 }
 
 /**
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * Message that is sent from the invoker to the controller after action is completed or after slot is free again for
+ * new actions.
+ */
+abstract class AcknowledegmentMessage() extends Message {
+  override val transid: TransactionId
+  override def serialize: String = {
+    AcknowledegmentMessage.serdes.write(this).compactPrint
+  }
+}
+
+/**
+ * This message is sent from the invoker to the controller, after the slot of an invoker that has been used by the
+ * current action, is free again (after log collection)
  */
 case class CompletionMessage(override val transid: TransactionId,
-                             response: Either[ActivationId, WhiskActivation],
+                             activationId: ActivationId,
+                             isSystemError: Boolean,
                              invoker: InvokerInstanceId)
-    extends Message {
+    extends AcknowledegmentMessage() {
 
-  override def serialize: String = {
-    CompletionMessage.serdes.write(this).compactPrint
+  override def toString = {
+    activationId.asString
   }
+}
+
+object CompletionMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+}
+
+/**
+ * That message will be sent from the invoker to the controller after action completion if the user wants to have
+ * the result immediately (blocking activation).
+ * When adding fields, the serdes of the companion object must be updated also.
+ * The whisk activation field will have its logs stripped.
+ */
+case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
+    extends AcknowledegmentMessage() {
 
   override def toString = {
     response.fold(l => l, r => r.activationId).asString
   }
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
+object ResultMessage extends DefaultJsonProtocol {
   implicit def eitherResponse =
     new JsonFormat[Either[ActivationId, WhiskActivation]] {
       def write(either: Either[ActivationId, WhiskActivation]) = either match {
@@ -101,12 +128,38 @@ object CompletionMessage extends DefaultJsonProtocol {
         // per the ActivationId's serializer, it is guaranteed to be a String even if it only consists of digits
         case _: JsString => Left(value.convertTo[ActivationId])
         case _: JsObject => Right(value.convertTo[WhiskActivation])
-        case _           => deserializationError("could not read CompletionMessage")
+        case _           => deserializationError("could not read ResultMessage")
       }
     }
 
-  def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
-  private val serdes = jsonFormat3(CompletionMessage.apply)
+  def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat2(ResultMessage.apply)
+}
+
+object AcknowledegmentMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[AcknowledegmentMessage] = {
+    Try(serdes.read(msg.parseJson))
+  }
+
+  implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
+    override def write(obj: AcknowledegmentMessage): JsValue = {
+      obj match {
+        case c: CompletionMessage => c.toJson
+        case r: ResultMessage     => r.toJson
+      }
+    }
+
+    override def read(json: JsValue): AcknowledegmentMessage = {
+      json.asJsObject
+      // The field invoker is only part of the CompletionMessage. If this field is part of the JSON, we try to convert
+      // it to a CompletionMessage. Otherwise to a ResultMessage.
+      // If both conversions fail, an error will be thrown that needs to be handled.
+        .getFields("invoker")
+        .headOption
+        .map(_ => json.convertTo[CompletionMessage])
+        .getOrElse(json.convertTo[ResultMessage])
+    }
+  }
 }
 
 case class PingMessage(instance: InvokerInstanceId) extends Message {
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 66f66a2fe1..1769904c72 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -279,12 +279,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
 
     // Install a timeout handler for the catastrophic case where an active ack is not received at all
     // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
-    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
+    // the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
     // in this case, if the activation handler is still registered, remove it and update the books.
     activations.getOrElseUpdate(
       msg.activationId, {
         val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
-          processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance)
+          processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
         }
 
         // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
@@ -344,36 +344,61 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
       activeAckConsumer,
       maxActiveAcksPerPoll,
       activeAckPollDuration,
-      processActiveAck)
+      processAcknowledgement)
   })
 
-  /** 4. Get the active-ack message and parse it */
-  private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
+  /** 4. Get the acknowledgement message and parse it */
+  private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
     val raw = new String(bytes, StandardCharsets.UTF_8)
-    CompletionMessage.parse(raw) match {
+    AcknowledegmentMessage.parse(raw) match {
       case Success(m: CompletionMessage) =>
-        processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
+        processCompletion(
+          m.activationId,
+          m.transid,
+          forced = false,
+          isSystemError = m.isSystemError,
+          invoker = m.invoker)
+        activationFeed ! MessageFeed.Processed
+
+      case Success(m: ResultMessage) =>
+        processResult(m.response, m.transid)
         activationFeed ! MessageFeed.Processed
 
       case Failure(t) =>
         activationFeed ! MessageFeed.Processed
-        logging.error(this, s"failed processing message: $raw with $t")
+        logging.error(this, s"failed processing message: $raw")
+
+      case _ =>
+        activationFeed ! MessageFeed.Processed
+        logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw")
+    }
+  }
+
+  /** 5. Process the result ack and return it to the user */
+  private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
+    val aid = response.fold(l => l, r => r.activationId)
+
+    // Resolve the promise to send the result back to the user
+    // The activation will be removed from `activations`-map later, when we receive the completion message, because the
+    // slot of the invoker is not yet free for new activations.
+    activations.get(aid).map { entry =>
+      entry.promise.trySuccess(response)
     }
+    logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
-  /** 5. Process the active-ack and update the state accordingly */
-  private def processCompletion(response: Either[ActivationId, WhiskActivation],
+  /** Process the completion ack and update the state */
+  private def processCompletion(aid: ActivationId,
                                 tid: TransactionId,
                                 forced: Boolean,
+                                isSystemError: Boolean,
                                 invoker: InvokerInstanceId): Unit = {
-    val aid = response.fold(l => l, r => r.activationId)
 
     val invocationResult = if (forced) {
       InvocationFinishedResult.Timeout
     } else {
       // If the response contains a system error, report that, otherwise report Success
       // Left generally is considered a Success, since that could be a message not fitting into Kafka
-      val isSystemError = response.fold(_ => false, _.response.isWhiskError)
       if (isSystemError) {
         InvocationFinishedResult.SystemError
       } else {
@@ -390,12 +415,16 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
 
         if (!forced) {
           entry.timeoutHandler.cancel()
-          entry.promise.trySuccess(response)
+          // If the action was blocking and the Resultmessage has been received before nothing will happen here.
+          // If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id,
+          // the controller will get the result out of the database.
+          // If the action was non-blocking, we will close the promise here.
+          entry.promise.trySuccess(Left(aid))
         } else {
-          entry.promise.tryFailure(new Throwable("no active ack received"))
+          entry.promise.tryFailure(new Throwable("no completion ack received"))
         }
 
-        logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
+        logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
         // Active acks that are received here are strictly from user actions - health actions are not part of
         // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
         invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
@@ -403,17 +432,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
         // Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
         // is important to pass to the invokerPool because they are used to determine if the invoker can be considered
         // healthy again.
-        logging.info(this, s"received active ack for health action on $invoker")(tid)
+        logging.info(this, s"received completion ack for health action on $invoker")(tid)
         invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
       case None if !forced =>
         // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack).
         // The result is ignored because a timeout has already been reported to the invokerPool per the force.
-        logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
+        logging.debug(this, s"received completion ack for '$aid' which has no entry")(tid)
       case None =>
         // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can
         // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active
         // ack canceled the timer). As the active ack is already processed we don't have to do anything here.
-        logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
+        logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
     }
   }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b34ce587d4..57e5c4fc5e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
   storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InvokerInstanceId,
@@ -168,7 +168,8 @@ class ContainerProxy(
               activation,
               job.msg.blocking,
               job.msg.rootControllerIndex,
-              job.msg.user.namespace.uuid)
+              job.msg.user.namespace.uuid,
+              true)
             storeActivation(transid, activation, context)
         }
         .flatMap { container =>
@@ -390,8 +391,10 @@ class ContainerProxy(
       }
 
     // Sending active ack. Entirely asynchronous and not waited upon.
-    activation.foreach(
-      sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
+    if (job.msg.blocking) {
+      activation.foreach(
+        sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
+    }
 
     val context = UserContext(job.msg.user)
 
@@ -418,8 +421,14 @@ class ContainerProxy(
         }
       }
 
-    // Storing the record. Entirely asynchronous and not waited upon.
-    activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context))
+    activationWithLogs
+      .map(_.fold(_.activation, identity))
+      .foreach { activation =>
+        // Sending the completionMessage to the controller asynchronously.
+        sendActiveAck(tid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
+        // Storing the record. Entirely asynchronous and not waited upon.
+        storeActivation(tid, activation, context)
+      }
 
     // Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
     activationWithLogs.flatMap {
@@ -436,7 +445,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
 object ContainerProxy {
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
+    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
     store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InvokerInstanceId,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 28b028984d..65d696584f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -117,11 +117,19 @@ class InvokerReactive(
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
                      controllerInstance: ControllerInstanceId,
-                     userId: UUID) => {
+                     userId: UUID,
+                     isSlotFree: Boolean) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
-      val msg = CompletionMessage(transid, res, instance)
+      val msg = if (isSlotFree) {
+        val aid = res.fold(identity, _.activationId)
+        val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
+        CompletionMessage(transid, aid, isWhiskSystemError, instance)
+      } else {
+        ResultMessage(transid, res)
+      }
+
       producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
         case Success(_) =>
           logging.info(
@@ -130,7 +138,8 @@ class InvokerReactive(
       }
     }
 
-    if (UserEvents.enabled) {
+    // UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent.
+    if (UserEvents.enabled && isSlotFree) {
       EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
         case Success(msg) => UserEvents.send(producer, msg)
         case Failure(t)   => logging.error(this, s"activation event was not sent: $t")
@@ -223,7 +232,7 @@ class InvokerReactive(
                 val context = UserContext(msg.user)
                 val activation = generateFallbackActivation(msg, response)
                 activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid)
+                ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true)
                 store(msg.transid, activation, context)
                 Future.successful(())
             }
@@ -233,7 +242,7 @@ class InvokerReactive(
           activationFeed ! MessageFeed.Processed
           val activation =
             generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
-          ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid)
+          ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true)
           logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
           Future.successful(())
         }
diff --git a/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
new file mode 100644
index 0000000000..c82fc66438
--- /dev/null
+++ b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.tests
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.connector.{AcknowledegmentMessage, CompletionMessage, ResultMessage}
+import whisk.core.entity._
+import whisk.core.entity.size.SizeInt
+
+import scala.concurrent.duration.DurationInt
+import scala.util.Success
+
+/**
+ * Unit tests for the AcknowledgementMessageTests object.
+ */
+@RunWith(classOf[JUnitRunner])
+class AcknowledgementMessageTests extends FlatSpec with Matchers {
+
+  behavior of "result message"
+
+  val defaultUserMemory: ByteSize = 1024.MB
+  val activation = WhiskActivation(
+    namespace = EntityPath("ns"),
+    name = EntityName("a"),
+    Subject(),
+    activationId = ActivationId.generate(),
+    start = Instant.now(),
+    end = Instant.now(),
+    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
+    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
+    duration = Some(123))
+
+  it should "serialize a left result message" in {
+    val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.left.get.toJson).compactPrint
+  }
+
+  it should "serialize a right result message" in {
+    val m =
+      ResultMessage(TransactionId.testing, Right(activation))
+    m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.right.get.toJson).compactPrint
+  }
+
+  it should "deserialize a left result message" in {
+    val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    ResultMessage.parse(m.serialize) shouldBe Success(m)
+  }
+
+  it should "deserialize a right result message" in {
+    val m =
+      ResultMessage(TransactionId.testing, Right(activation))
+    ResultMessage.parse(m.serialize) shouldBe Success(m)
+  }
+
+  behavior of "acknowledgement message"
+
+  it should "serialize a Completion message" in {
+    val c = CompletionMessage(
+      TransactionId.testing,
+      ActivationId.generate(),
+      false,
+      InvokerInstanceId(0, userMemory = defaultUserMemory))
+    val m: AcknowledegmentMessage = c
+    m.serialize shouldBe c.toJson.compactPrint
+  }
+
+  it should "serialize a Result message" in {
+    val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    val m: AcknowledegmentMessage = r
+    m.serialize shouldBe r.toJson.compactPrint
+  }
+
+  it should "deserialize a Completion message" in {
+    val c = CompletionMessage(
+      TransactionId.testing,
+      ActivationId.generate(),
+      false,
+      InvokerInstanceId(0, userMemory = defaultUserMemory))
+    val m: AcknowledegmentMessage = c
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(c)
+  }
+
+  it should "deserialize a Result message" in {
+    val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    val m: AcknowledegmentMessage = r
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(r)
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
deleted file mode 100644
index ed1ac37199..0000000000
--- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.connector.tests
-
-import java.time.Instant
-
-import scala.util.Success
-import scala.concurrent.duration.DurationInt
-
-import org.junit.runner.RunWith
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.connector.CompletionMessage
-import whisk.core.entity._
-import whisk.core.entity.size.SizeInt
-
-/**
- * Unit tests for the CompletionMessage object.
- */
-@RunWith(classOf[JUnitRunner])
-class CompletionMessageTests extends FlatSpec with Matchers {
-
-  behavior of "completion message"
-
-  val defaultUserMemory: ByteSize = 1024.MB
-  val activation = WhiskActivation(
-    namespace = EntityPath("ns"),
-    name = EntityName("a"),
-    Subject(),
-    activationId = ActivationId.generate(),
-    start = Instant.now(),
-    end = Instant.now(),
-    response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
-    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
-    duration = Some(123))
-
-  it should "serialize a left completion message" in {
-    val m = CompletionMessage(
-      TransactionId.testing,
-      Left(ActivationId.generate()),
-      InvokerInstanceId(0, userMemory = defaultUserMemory))
-    m.serialize shouldBe JsObject(
-      "transid" -> m.transid.toJson,
-      "response" -> m.response.left.get.toJson,
-      "invoker" -> m.invoker.toJson).compactPrint
-  }
-
-  it should "serialize a right completion message" in {
-    val m =
-      CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory))
-    m.serialize shouldBe JsObject(
-      "transid" -> m.transid.toJson,
-      "response" -> m.response.right.get.toJson,
-      "invoker" -> m.invoker.toJson).compactPrint
-  }
-
-  it should "deserialize a left completion message" in {
-    val m = CompletionMessage(
-      TransactionId.testing,
-      Left(ActivationId.generate()),
-      InvokerInstanceId(0, userMemory = defaultUserMemory))
-    CompletionMessage.parse(m.serialize) shouldBe Success(m)
-  }
-
-  it should "deserialize a right completion message" in {
-    val m =
-      CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory))
-    CompletionMessage.parse(m.serialize) shouldBe Success(m)
-  }
-}
diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 920d5358b6..24ef264ca6 100644
--- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -147,7 +147,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable version of the ack method, which records all calls in a buffer */
   def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) =>
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: Boolean) =>
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services