You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2022/07/05 02:25:06 UTC

[GitHub] [openwhisk] ningyougang commented on a diff in pull request #5266: [Scheduler Enhancement] Increase the retention timeout for the blackbox action.

ningyougang commented on code in PR #5266:
URL: https://github.com/apache/openwhisk/pull/5266#discussion_r913337342


##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala:
##########
@@ -323,33 +332,50 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       goto(Running) using RunningData(schedulerActor, droppingActor)
 
     // log the failed information
-    case Event(FailedCreationJob(creationId, _, _, _, _, message), data: FlushingData) =>
+    case Event(FailedCreationJob(creationId, _, _, _, error, message), data: FlushingData) =>
       creationIds -= creationId.asString
       logging.info(
         this,
         s"[$invocationNamespace:$action:$stateName][$creationId] Failed to create a container due to $message")
 
       // keep updating the reason
-      stay using data.copy(reason = message)
+      stay using data.copy(error = error, reason = message)
 
     // since there is no container, activations cannot be handled.
     case Event(msg: ActivationMessage, data: FlushingData) =>
-      completeErrorActivation(msg, data.reason, ContainerCreationError.whiskErrors.contains(data.error))
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new activation message ${msg.activationId}")(
+        msg.transid)
+      val whiskError = isWhiskError(data.error)
+      if (whiskError)
+        queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+      else
+        completeErrorActivation(msg, data.reason, whiskError)
       stay() using data.copy(activeDuringFlush = true)
 
     // Since SchedulingDecisionMaker keep sending a message to create a container, this state is not automatically timed out.
     // Instead, StateTimeout message will be sent by a timer.
-    case Event(StateTimeout, data: FlushingData) =>
-      completeAllActivations(data.reason, ContainerCreationError.whiskErrors.contains(data.error))
-      if (data.activeDuringFlush)
+    case Event(StateTimeout | DropOld, data: FlushingData) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] Received StateTimeout, drop stale messages.")
+      queue =
+        MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), data.reason, completeErrorActivation)
+      if (data.activeDuringFlush || queue.nonEmpty)
         stay using data.copy(activeDuringFlush = false)
       else
         cleanUpActorsAndGotoRemoved(data)
 
-    case Event(GracefulShutdown, data: FlushingData) =>
-      completeAllActivations(data.reason, ContainerCreationError.whiskErrors.contains(data.error))
+      completeAllActivations(data.reason, isWhiskError(data.error))

Review Comment:
   Seems our downstream didn't have this statement
   ```
   completeAllActivations(data.reason, isWhiskError(data.error))
   ```
   So, just add `completeAllActivations`, is it  for safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@openwhisk.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org