You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/10/23 06:50:33 UTC

[GitHub] markusthoemmes closed pull request #2858: Make Docker action container cleanup during invoker startup / shutdown more robust

markusthoemmes closed pull request #2858: Make Docker action container cleanup during invoker startup / shutdown more robust
URL: https://github.com/apache/incubator-openwhisk/pull/2858
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
index 4f5a733860..af486f277e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -31,6 +31,7 @@ import whisk.core.entity.ByteSize
 import whisk.core.entity.ExecManifest
 import whisk.core.entity.InstanceId
 import scala.concurrent.duration._
+import java.util.concurrent.TimeoutException
 
 class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, parameters: Map[String, Set[String]])(
   implicit actorSystem: ActorSystem,
@@ -69,27 +70,52 @@ class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, paramete
   }
 
   /** Perform cleanup on init */
-  override def init(): Unit = cleanup()
+  override def init(): Unit = removeAllActionContainers()
 
-  /** Cleans up all running wsk_ containers */
+  /** Perform cleanup on exit - to be registered as shutdown hook */
   override def cleanup(): Unit = {
-    val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap {
-      containers =>
-        val removals = containers.map { id =>
-          (if (config.invokerUseRunc) {
-             runc.resume(id)(TransactionId.invokerNanny)
-           } else {
-             docker.unpause(id)(TransactionId.invokerNanny)
-           })
-            .recoverWith {
-              // Ignore resume failures and try to remove anyway
-              case _ => Future.successful(())
-            }
-            .flatMap { _ =>
-              docker.rm(id)(TransactionId.invokerNanny)
-            }
-        }
-        Future.sequence(removals)
+    implicit val transid = TransactionId.invoker
+    try {
+      removeAllActionContainers()
+    } catch {
+      case e: Exception => logging.error(this, s"Failed to remove action containers: ${e.getMessage}")
+    }
+  }
+
+  /**
+   * Removes all wsk_ containers - regardless of their state
+   *
+   * If the system in general or Docker in particular has a very
+   * high load, commands may take longer than the specified time
+   * resulting in an exception.
+   *
+   * There is no checking whether container removal was successful
+   * or not.
+   *
+   * @throws InterruptedException     if the current thread is interrupted while waiting
+   * @throws TimeoutException         if after waiting for the specified time this `Awaitable` is still not ready
+   */
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  private def removeAllActionContainers(): Unit = {
+    implicit val transid = TransactionId.invoker
+    val cleaning = docker.ps(filters = Seq("name" -> s"wsk${instance.toInt}_"), all = true).flatMap { containers =>
+      logging.info(this, s"removing ${containers.size} action containers.")
+      val removals = containers.map { id =>
+        (if (config.invokerUseRunc) {
+           runc.resume(id)
+         } else {
+           docker.unpause(id)
+         })
+          .recoverWith {
+            // Ignore resume failures and try to remove anyway
+            case _ => Future.successful(())
+          }
+          .flatMap { _ =>
+            docker.rm(id)
+          }
+      }
+      Future.sequence(removals)
     }
     Await.ready(cleaning, 30.seconds)
   }
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index bf6bb39aab..857396a180 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -38,6 +38,7 @@ import whisk.core.entity.WhiskEntityStore
 import whisk.http.BasicHttpService
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
+import whisk.common.TransactionId
 
 object Invoker {
 
@@ -70,21 +71,21 @@ object Invoker {
     // load values for the required properties from the environment
     implicit val config = new WhiskConfig(requiredProperties)
 
-    def abort() = {
-      logger.error(this, "Bad configuration, cannot start.")
+    def abort(message: String) = {
+      logger.error(this, message)(TransactionId.invoker)
       actorSystem.terminate()
       Await.result(actorSystem.whenTerminated, 30.seconds)
       sys.exit(1)
     }
 
     if (!config.isValid) {
-      abort()
+      abort("Bad configuration, cannot start.")
     }
 
     val execManifest = ExecManifest.initialize(config)
     if (execManifest.isFailure) {
       logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
-      abort()
+      abort("Bad configuration, cannot start.")
     }
 
     val proposedInvokerId: Option[Int] = args.headOption.map(_.toInt)
@@ -111,8 +112,7 @@ object Invoker {
                 id.toInt - 1
               }
               .getOrElse {
-                logger.error(this, "Failed to increment invokerId")
-                abort()
+                abort("Failed to increment invokerId")
               }
             redisClient.hset("controller:registar:idAssignments", invokerName, newId)
             logger.info(this, s"invokerReg: invoker ${invokerName} was assigned invokerId ${newId}")
@@ -124,7 +124,11 @@ object Invoker {
     val invokerInstance = InstanceId(assignedInvokerId);
     val msgProvider = SpiLoader.get[MessagingProvider]
     val producer = msgProvider.getProducer(config, ec)
-    val invoker = new InvokerReactive(config, invokerInstance, producer)
+    val invoker = try {
+      new InvokerReactive(config, invokerInstance, producer)
+    } catch {
+      case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
+    }
 
     Scheduler.scheduleWaitAtMost(1.seconds)(() => {
       producer.send("health", PingMessage(invokerInstance)).andThen {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 0183757c7d..2308913e0e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -57,6 +57,29 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
   implicit val ec = actorSystem.dispatcher
   implicit val cfg = config
 
+  /**
+   * Factory used by the ContainerProxy to physically create a new container.
+   *
+   * Create and initialize the container factory before kicking off any other
+   * task or actor because further operation does not make sense if something
+   * goes wrong here. Initialization will throw an exception upon failure.
+   */
+  val containerFactory =
+    SpiLoader
+      .get[ContainerFactoryProvider]
+      .getContainerFactory(
+        actorSystem,
+        logging,
+        config,
+        instance,
+        Map(
+          "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+          "--ulimit" -> Set("nofile=1024:1024"),
+          "--pids-limit" -> Set("1024"),
+          "--dns" -> config.invokerContainerDns.toSet))
+  containerFactory.init()
+  sys.addShutdownHook(containerFactory.cleanup())
+
   /** Initialize needed databases */
   private val entityStore = WhiskEntityStore.datastore(config)
   private val activationStore = WhiskActivationStore.datastore(config)
@@ -76,23 +99,6 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
     new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage)
   })
 
-  /** Factory used by the ContainerProxy to physically create a new container. */
-  val containerFactory =
-    SpiLoader
-      .get[ContainerFactoryProvider]
-      .getContainerFactory(
-        actorSystem,
-        logging,
-        config,
-        instance,
-        Map(
-          "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
-          "--ulimit" -> Set("nofile=1024:1024"),
-          "--pids-limit" -> Set("1024"),
-          "--dns" -> config.invokerContainerDns.toSet))
-  containerFactory.init()
-  sys.addShutdownHook(containerFactory.cleanup())
-
   /** Sends an active-ack. */
   val ack = (tid: TransactionId,
              activationResult: WhiskActivation,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services