You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/05 08:53:21 UTC

[GitHub] markusthoemmes commented on a change in pull request #3704: Provide graceful shutdown for invoker

markusthoemmes commented on a change in pull request #3704: Provide graceful shutdown for invoker
URL: https://github.com/apache/incubator-openwhisk/pull/3704#discussion_r192994858
 
 

 ##########
 File path: core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
 ##########
 @@ -284,4 +290,50 @@ class InvokerReactive(
       })
   }
 
+  val healthScheduler = Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+    producer.send("health", PingMessage(instance)).andThen {
+      case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
+    }
+  })
+
+  private def gracefulShutdown: Unit = {
+    logging.info(this, s"Starting graceful shutdown")
+
+    try {
+      Await.result(gracefulStop(healthScheduler, 5.seconds), 6.seconds)
+    } catch {
+      case e: akka.pattern.AskTimeoutException =>
+        logging.info(this, "Health communication failed to shutdown gracefully")
+    }
+
+    try {
+      Await.result(gracefulStop(activationFeed, 5.seconds), 6.seconds)
+    } catch {
+      case e: akka.pattern.AskTimeoutException => logging.info(this, "Activation feed failed to shutdown gracefully")
+    }
+
+    implicit val timeout = Timeout(5 seconds)
+
+    while (Await.result(pool ? Busy, timeout.duration).asInstanceOf[Boolean] == true) {
+      logging.info(this, s"Container pool is busy")
+      Thread.sleep(1000)
+    }
 
 Review comment:
   WDYT about applying some Future composition here to simplify the flow, like
   
   ```scala
   /** Polls the pools status and returns a future which completes once the pool is idle. */
   def waitForContainerPoolIdle(pool: ActorRef): Future[Unit] = {
     (pool ? Busy).mapTo[Boolean].flatMap {
       case false => akka.pattern.after(1.second, system.scheduler)(waitForContainerPoolIdle(pool))
       case true => Future.successful(())
     }.recoverWith { case _ => akka.pattern.after(1.second, system.scheduler) }
   }
   
   val shutdowns = Seq(
     gracefulStop(healthScheduler, 5.seconds).recover { case _ => logging.info(this, "Health communication failed to shutdown gracefully")}, 
     gracefulStop(activationFeed, 5.seconds).recover { case _ => logging.info(this, "Activation feed failed to shutdown gracefully")}, 
     waitForContainerPoolIdle(pool))
   
   // Allow the shutdown to take a maximum of 3 times the maximum action runtime since the 
   // feed can be buffered and we want to allow for some grace period.
   Await.result(shutdowns, TimeLimit.MAX_DURATION * 3)
   ```
   
   Does that make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services