You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/08 10:33:28 UTC

[GitHub] chetanmeh commented on a change in pull request #3704: Invoker graceful shutdown and drain mode

chetanmeh commented on a change in pull request #3704: Invoker graceful shutdown and drain mode
URL: https://github.com/apache/incubator-openwhisk/pull/3704#discussion_r194017675
 
 

 ##########
 File path: core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
 ##########
 @@ -284,4 +290,93 @@ class InvokerReactive(
       })
   }
 
+  val healthScheduler = Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+    producer.send("health", PingMessage(instance)).andThen {
+      case Failure(t) => logging.error(this, s"failed to ping the controller(s): $t")
+    }
+  })
+
+  /** Polls the pool's status and returns a future which completes once the pool is idle. */
+  def waitForContainerPoolIdle(pool: ActorRef): Future[Unit] = {
+    implicit val timeout = Timeout(5 seconds)
+    val delay = 1.second
+
+    (pool ? Busy)
+      .mapTo[Boolean]
+      .flatMap {
+        case true =>
+          logging.info(this, "Container pool is not idle.")
+          after(delay, actorSystem.scheduler)(waitForContainerPoolIdle(pool))
+        case false =>
+          Future.successful(())
+      }
+      .recoverWith { case _ => after(delay, actorSystem.scheduler)(waitForContainerPoolIdle(pool)) }
+  }
+
+  /** Polls the feed's status and returns a future which completes once the feed is idle. */
+  def waitForActivationFeedIdle(feed: ActorRef): Future[Unit] = {
+    implicit val timeout = Timeout(5 seconds)
+    val delay = 1.second
+
+    activationFeed ! MessageFeed.GracefulShutdown
+    (feed ? MessageFeed.Busy)
+      .mapTo[Boolean]
+      .flatMap {
+        case true =>
+          logging.info(this, "Activation feed is not idle.")
+          after(delay, actorSystem.scheduler)(waitForActivationFeedIdle(feed))
+        case false =>
+          Future.successful(())
+      }
+      .recoverWith { case _ => after(delay, actorSystem.scheduler)(waitForActivationFeedIdle(feed)) }
+  }
+
+  // Capture SIGTERM signals to gracefully shutdown the invoker. When gracefully shutting down, the health scheduler is
+  // shutdown preventing additional actions from being scheduler to the invoker, then the invoker processes its buffered
+  // messages from the activation feed, and waits for its user containers to finish running before the process exits.
+  Signal.handle(
+    new Signal("TERM"),
+    new SignalHandler() {
+      override def handle(signal: Signal) = {
+        logging.info(this, s"Starting graceful shutdown")
+
+        // Order is important here so futures are ran sequentially
+        val shutdowns = for {
+          _ <- gracefulStop(healthScheduler, 5.seconds).recover {
+            case _ => logging.info(this, "Health communication failed to shutdown gracefully")
+          }
+          _ <- waitForActivationFeedIdle(activationFeed)
+          _ <- waitForContainerPoolIdle(pool)
+          _ <- gracefulStop(activationFeed, 5.seconds).recover {
+            case _ => logging.info(this, "Activation feed failed to shutdown gracefully")
+          }
+        } yield {
+          logging.info(this, "Successfully shutdown health scheduler, activation feed, and container pool")
+        }
+
+        try {
+          // Allow the shutdown to take a maximum of 3 times the maximum action runtime since the feed can be
+          // buffered and we want to allow for some grace period.
+          Await.result(shutdowns, TimeLimit.MAX_DURATION * 3)
 
 Review comment:
   May be we return a different exit code in case of unclean shutdown

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services