You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2017/10/23 06:50:32 UTC

[incubator-openwhisk] branch master updated: Make Docker action container cleanup during invoker startup / shutdown more robust (#2858)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da626c  Make Docker action container cleanup during invoker startup / shutdown more robust (#2858)
0da626c is described below

commit 0da626c78a52ab0b5def2c81f2ef99954f06b554
Author: Sven Lange-Last <sv...@de.ibm.com>
AuthorDate: Mon Oct 23 08:50:29 2017 +0200

    Make Docker action container cleanup during invoker startup / shutdown more robust (#2858)
    
    Remove all Docker containers on the system that match the naming filter `wsk<invoker instance number>_` at startup and shutdown. In the past, only running containers matching the filter were removed - paused containers are considered to be running. The new strategy will also remove containers that are stopped or could never be started properly for any reason. From time to time, we see action containers that are not running but should be cleaned up.
    
    If Docker commands used to remove containers (`docker ps`, `docker-runc resume` and `docker rm`) do not return within 30 seconds during startup, abort startup and let Docker re-start the invoker container. We have seen systems with heavy load where Docker commands take too long to complete. In that situation, we need a defined behavior, i.e. re-start and re-try the cleanup. At the moment, an uncaught exception will end the main thread leaving the invoker inoperational.
    
    Also changed the invoker initialization sequence such that Docker action container cleanup is performed as soon as possible - in particular, before starting other tasks / actors. At the moment, a cleanup timeout would end the main thread while the activation message feed is already running and consuming messsages that will never be processed.
---
 .../docker/DockerContainerFactory.scala            | 64 +++++++++++++++-------
 .../main/scala/whisk/core/invoker/Invoker.scala    | 18 +++---
 .../scala/whisk/core/invoker/InvokerReactive.scala | 40 ++++++++------
 3 files changed, 79 insertions(+), 43 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index 4f5a733..af486f2 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -31,6 +31,7 @@ import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.InstanceId
 import scala.concurrent.duration._
+import java.util.concurrent.TimeoutException
 
 class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, parameters: Map[String, Set[String]])(
   implicit actorSystem: ActorSystem,
@@ -69,27 +70,52 @@ class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, paramete
   }
 
   /** Perform cleanup on init */
-  override def init(): Unit = cleanup()
+  override def init(): Unit = removeAllActionContainers()
 
-  /** Cleans up all running wsk_ containers */
+  /** Perform cleanup on exit - to be registered as shutdown hook */
   override def cleanup(): Unit = {
-    val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap {
-      containers =>
-        val removals = containers.map { id =>
-          (if (config.invokerUseRunc) {
-             runc.resume(id)(TransactionId.invokerNanny)
-           } else {
-             docker.unpause(id)(TransactionId.invokerNanny)
-           })
-            .recoverWith {
-              // Ignore resume failures and try to remove anyway
-              case _ => Future.successful(())
-            }
-            .flatMap { _ =>
-              docker.rm(id)(TransactionId.invokerNanny)
-            }
-        }
-        Future.sequence(removals)
+    implicit val transid = TransactionId.invoker
+    try {
+      removeAllActionContainers()
+    } catch {
+      case e: Exception => logging.error(this, s"Failed to remove action containers: ${e.getMessage}")
+    }
+  }
+
+  /**
+   * Removes all wsk_ containers - regardless of their state
+   *
+   * If the system in general or Docker in particular has a very
+   * high load, commands may take longer than the specified time
+   * resulting in an exception.
+   *
+   * There is no checking whether container removal was successful
+   * or not.
+   *
+   * @throws InterruptedException     if the current thread is interrupted while waiting
+   * @throws TimeoutException         if after waiting for the specified time this `Awaitable` is still not ready
+   */
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  private def removeAllActionContainers(): Unit = {
+    implicit val transid = TransactionId.invoker
+    val cleaning = docker.ps(filters = Seq("name" -> s"wsk${instance.toInt}_"), all = true).flatMap { containers =>
+      logging.info(this, s"removing ${containers.size} action containers.")
+      val removals = containers.map { id =>
+        (if (config.invokerUseRunc) {
+           runc.resume(id)
+         } else {
+           docker.unpause(id)
+         })
+          .recoverWith {
+            // Ignore resume failures and try to remove anyway
+            case _ => Future.successful(())
+          }
+          .flatMap { _ =>
+            docker.rm(id)
+          }
+      }
+      Future.sequence(removals)
     }
     Await.ready(cleaning, 30.seconds)
   }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index f08b9b4..aa2b7f3 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -38,6 +38,7 @@ import whisk.core.entity.WhiskEntityStore
 import whisk.http.BasicHttpService
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
+import whisk.common.TransactionId
 
 object Invoker {
 
@@ -70,21 +71,21 @@ object Invoker {
     // load values for the required properties from the environment
     implicit val config = new WhiskConfig(requiredProperties)
 
-    def abort() = {
-      logger.error(this, "Bad configuration, cannot start.")
+    def abort(message: String) = {
+      logger.error(this, message)(TransactionId.invoker)
       actorSystem.terminate()
       Await.result(actorSystem.whenTerminated, 30.seconds)
       sys.exit(1)
     }
 
     if (!config.isValid) {
-      abort()
+      abort("Bad configuration, cannot start.")
     }
 
     val execManifest = ExecManifest.initialize(config)
     if (execManifest.isFailure) {
       logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
-      abort()
+      abort("Bad configuration, cannot start.")
     }
 
     val proposedInvokerId: Option[Int] = args.headOption.map(_.toInt)
@@ -117,8 +118,7 @@ object Invoker {
                 id.toInt - 1
               }
               .getOrElse {
-                logger.error(this, "Failed to increment invokerId")
-                abort()
+                abort("Failed to increment invokerId")
               }
             redisClient.hset("controller:registar:idAssignments", invokerName, newId)
             logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}")
@@ -130,7 +130,11 @@ object Invoker {
     val invokerInstance = InstanceId(assignedInvokerId);
     val msgProvider = SpiLoader.get[MessagingProvider]
     val producer = msgProvider.getProducer(config, ec)
-    val invoker = new InvokerReactive(config, invokerInstance, producer)
+    val invoker = try {
+      new InvokerReactive(config, invokerInstance, producer)
+    } catch {
+      case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
+    }
 
     Scheduler.scheduleWaitAtMost(1.seconds)(() => {
       producer.send("health", PingMessage(invokerInstance)).andThen {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 0183757..2308913 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -57,6 +57,29 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   implicit val ec = actorSystem.dispatcher
   implicit val cfg = config
 
+  /**
+   * Factory used by the ContainerProxy to physically create a new container.
+   *
+   * Create and initialize the container factory before kicking off any other
+   * task or actor because further operation does not make sense if something
+   * goes wrong here. Initialization will throw an exception upon failure.
+   */
+  val containerFactory =
+    SpiLoader
+      .get[ContainerFactoryProvider]
+      .getContainerFactory(
+        actorSystem,
+        logging,
+        config,
+        instance,
+        Map(
+          "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+          "--ulimit" -> Set("nofile=1024:1024"),
+          "--pids-limit" -> Set("1024"),
+          "--dns" -> config.invokerContainerDns.toSet))
+  containerFactory.init()
+  sys.addShutdownHook(containerFactory.cleanup())
+
   /** Initialize needed databases */
   private val entityStore = WhiskEntityStore.datastore(config)
   private val activationStore = WhiskActivationStore.datastore(config)
@@ -76,23 +99,6 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
   })
 
-  /** Factory used by the ContainerProxy to physically create a new container. */
-  val containerFactory =
-    SpiLoader
-      .get[ContainerFactoryProvider]
-      .getContainerFactory(
-        actorSystem,
-        logging,
-        config,
-        instance,
-        Map(
-          "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
-          "--ulimit" -> Set("nofile=1024:1024"),
-          "--pids-limit" -> Set("1024"),
-          "--dns" -> config.invokerContainerDns.toSet))
-  containerFactory.init()
-  sys.addShutdownHook(containerFactory.cleanup())
-
   /** Sends an active-ack. */
   val ack = (tid: TransactionId,
              activationResult: WhiskActivation,

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].