You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/08/18 21:49:01 UTC

[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4938: ContainerProxy - improve failure handling for concurrent activations

tysonnorris commented on a change in pull request #4938:
URL: https://github.com/apache/openwhisk/pull/4938#discussion_r472512777



##########
File path: core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
##########
@@ -617,15 +647,39 @@ class ContainerProxy(factory: (TransactionId,
    *
    * @param newData the ContainerStarted which container will be destroyed
    */
-  def destroyContainer(newData: ContainerStarted, replacePrewarm: Boolean) = {
+  def destroyContainer(newData: ContainerStarted,
+                       replacePrewarm: Boolean,
+                       abortBuffered: Boolean = false,
+                       abortResponse: Option[ActivationResponse] = None) = {
     val container = newData.container
     if (!rescheduleJob) {
       context.parent ! ContainerRemoved(replacePrewarm)
     } else {
       context.parent ! RescheduleJob
     }
-
-    rejectBuffered()
+    if (abortBuffered && runBuffer.length > 0) {
+      logging.info(this, s"aborting ${runBuffer.length} queued activations after failed init")
+      runBuffer.foreach { job =>
+        implicit val tid = job.msg.transid
+        logging.info(this, s"aborting activation ${job.msg.activationId} after failed init with ${abortResponse}")
+        val result = ContainerProxy.constructWhiskActivation(
+          job,
+          None,
+          Interval.zero,
+          false,
+          abortResponse.getOrElse(ActivationResponse.whiskError(Messages.abnormalRun)))
+        val context = UserContext(job.msg.user)
+        val msg = if (job.msg.blocking) {
+          CombinedCompletionAndResultMessage(tid, result, instance)
+        } else {
+          CompletionMessage(tid, result, instance)
+        }
+        sendActiveAck(tid, result, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+        storeActivation(tid, result, job.msg.blocking, context)
+      }
+    } else {
+      rejectBuffered()

Review comment:
       Yes - in general, It is also worth mentioning this only affects concurrent handling containers, not containers where concurrency=1. 
   If init fails, all queued activations should be aborted.  
   If init succeeds, all queued activations will be attempted, and if any fail, the container will be removed, and any activations queued during that time will be resent. Activations in flight when the first one fails currently also receive an error, if the runtime doesn't tolerate that.
   
   The only "transient" errors tolerated are handled in `Container` class, where http connections are made, and if network related errors occur there is some retry logic for /init, but not for /run.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org