You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/08/18 11:46:57 UTC

[GitHub] [openwhisk] style95 commented on a change in pull request #4938: ContainerProxy - improve failure handling for concurrent activations

style95 commented on a change in pull request #4938:
URL: https://github.com/apache/openwhisk/pull/4938#discussion_r472116563



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
##########
@@ -617,15 +647,39 @@ class ContainerProxy(factory: (TransactionId,
    *
    * @param newData the ContainerStarted which container will be destroyed
    */
-  def destroyContainer(newData: ContainerStarted, replacePrewarm: Boolean) = {
+  def destroyContainer(newData: ContainerStarted,
+                       replacePrewarm: Boolean,
+                       abortBuffered: Boolean = false,
+                       abortResponse: Option[ActivationResponse] = None) = {
     val container = newData.container
     if (!rescheduleJob) {
       context.parent ! ContainerRemoved(replacePrewarm)
     } else {
       context.parent ! RescheduleJob
     }
-
-    rejectBuffered()
+    if (abortBuffered && runBuffer.length > 0) {
+      logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init")
+      runBuffer.foreach { job =>
+        implicit val tid = job.msg.transid
+        logging.info(this, s"aborting activation ${job.msg.activationId} after failed init with ${abortResponse}")
+        val result = ContainerProxy.constructWhiskActivation(
+          job,
+          None,
+          Interval.zero,
+          false,
+          abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
+        val context = UserContext(job.msg.user)
+        val msg = if (job.msg.blocking) {
+          CombinedCompletionAndResultMessage(tid, result, instance)
+        } else {
+          CompletionMessage(tid, result, instance)
+        }
+        sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+        storeActivation(tid, result, job.msg.blocking, context)
+      }
+    } else {
+      rejectBuffered()

Review comment:
       Let me recap my understanding.
   
   When it fails to initialize a container, the `InitializationError` will happen in `initializeAndRun()` so that proper ack will be sent there. So it does not send any ack unless there are some activations waiting in the `runBuffer`.
   If there are some activations it will end up with sending `whiskError` responses for them.
   
   But if it fails to invoke an activation after initialization is finished, it can be a transient error for the container and reschedule all activations in the `runBuffer` to the parent.
   
   Is this the intended change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org