You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/08/10 12:14:26 UTC

[incubator-openwhisk] branch master updated: Make amount of parallel docker runs configurable. (#3698)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d20eae  Make amount of parallel docker runs configurable. (#3698)
9d20eae is described below

commit 9d20eae1b51c1599273506c85d7fe45fa4d6e0bc
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Fri Aug 10 14:14:22 2018 +0200

    Make amount of parallel docker runs configurable. (#3698)
---
 ansible/roles/invoker/tasks/deploy.yml             |  3 ++-
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  2 +-
 core/invoker/src/main/resources/application.conf   | 28 ++++++++++++++-------
 .../core/containerpool/docker/DockerClient.scala   | 29 +++++++++++++---------
 4 files changed, 39 insertions(+), 23 deletions(-)

diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 98b5c14..8fd93bf 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -206,6 +206,7 @@
       "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
       "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}"
       "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}"
+      "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
       "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
       "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
       "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
@@ -271,7 +272,7 @@
     volumes: "{{ volumes|default('') }},{{ coverage_logs_dir }}/invoker:/coverage"
   when: coverage_enabled
 
-- name: start invoker using docker cli
+- name: start invoker
   docker_container:
     userns_mode: "host"
     pid_mode: "host"
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7fdb10b..5f7a8db 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -213,7 +213,7 @@ object ConfigKeys {
   val db = "whisk.db"
 
   val docker = "whisk.docker"
-  val dockerTimeouts = s"$docker.timeouts"
+  val dockerClient = s"$docker.client"
   val dockerContainerFactory = s"${docker}.container-factory"
   val runc = "whisk.runc"
   val runcTimeouts = s"$runc.timeouts"
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 8edb386..c471d1b 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -10,15 +10,25 @@ whisk {
     poll-interval: 5 minutes
   }
 
-  # Timeouts for docker commands. Set to "Inf" to disable timeout.
-  docker.timeouts {
-    run: 1 minute
-    rm: 1 minute
-    pull: 10 minutes
-    ps: 1 minute
-    inspect: 1 minute
-    pause: 10 seconds
-    unpause: 10 seconds
+  docker.client {
+    # Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
+    # concurrently, there is a good chance that some of them will fail.
+    # See https://github.com/moby/moby/issues/29369
+    # Use a semaphore to make sure that at most 10 `docker run` commands are active
+    # the same time.
+    # 0 means that there are infinite parallel runs.
+    parallel-runs: 10
+
+    # Timeouts for docker commands. Set to "Inf" to disable timeout.
+    timeouts {
+      run: 1 minute
+      rm: 1 minute
+      pull: 10 minutes
+      ps: 1 minute
+      inspect: 1 minute
+      pause: 10 seconds
+      unpause: 10 seconds
+    }
   }
 
   docker.container-factory {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 44102b8..8a87e37 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -66,6 +66,11 @@ case class DockerClientTimeoutConfig(run: Duration,
                                      inspect: Duration)
 
 /**
+ * Configuration for docker client
+ */
+case class DockerClientConfig(parallelRuns: Int, timeouts: DockerClientTimeoutConfig)
+
+/**
  * Serves as interface to the docker CLI tool.
  *
  * Be cautious with the ExecutionContext passed to this, as the
@@ -74,8 +79,7 @@ case class DockerClientTimeoutConfig(run: Duration,
  * You only need one instance (and you shouldn't get more).
  */
 class DockerClient(dockerHost: Option[String] = None,
-                   timeouts: DockerClientTimeoutConfig =
-                     loadConfigOrThrow[DockerClientTimeoutConfig](ConfigKeys.dockerTimeouts))(
+                   config: DockerClientConfig = loadConfigOrThrow[DockerClientConfig](ConfigKeys.dockerClient))(
   executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
     extends DockerApi
     with ProcessRunner {
@@ -96,8 +100,9 @@ class DockerClient(dockerHost: Option[String] = None,
     Seq(dockerBin) ++ host
   }
 
-  protected val maxParallelRuns = 10
-  protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true)
+  protected val maxParallelRuns = config.parallelRuns
+  protected val runSemaphore =
+    new Semaphore( /* permits= */ if (maxParallelRuns > 0) maxParallelRuns else Int.MaxValue, /* fair= */ true)
 
   // Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
   // concurrently, there is a good chance that some of them will fail.
@@ -114,7 +119,7 @@ class DockerClient(dockerHost: Option[String] = None,
       }
     }.flatMap { _ =>
       // Iff the semaphore was acquired successfully
-      runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run)
+      runCmd(Seq("run", "-d") ++ args ++ Seq(image), config.timeouts.run)
         .andThen {
           // Release the semaphore as quick as possible regardless of the runCmd() result
           case _ => runSemaphore.release()
@@ -139,26 +144,26 @@ class DockerClient(dockerHost: Option[String] = None,
   def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] =
     runCmd(
       Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString),
-      timeouts.inspect).flatMap {
+      config.timeouts.inspect).flatMap {
       case "<no value>" => Future.failed(new NoSuchElementException)
       case stdout       => Future.successful(ContainerAddress(stdout))
     }
 
   def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
+    runCmd(Seq("pause", id.asString), config.timeouts.pause).map(_ => ())
 
   def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ())
+    runCmd(Seq("unpause", id.asString), config.timeouts.unpause).map(_ => ())
 
   def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ())
+    runCmd(Seq("rm", "-f", id.asString), config.timeouts.rm).map(_ => ())
 
   def ps(filters: Seq[(String, String)] = Seq.empty, all: Boolean = false)(
     implicit transid: TransactionId): Future[Seq[ContainerId]] = {
     val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value") }
     val allArg = if (all) Seq("--all") else Seq.empty[String]
     val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs
-    runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
+    runCmd(cmd, config.timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
   }
 
   /**
@@ -169,11 +174,11 @@ class DockerClient(dockerHost: Option[String] = None,
   private val pullsInFlight = TrieMap[String, Future[Unit]]()
   def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
     pullsInFlight.getOrElseUpdate(image, {
-      runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) }
+      runCmd(Seq("pull", image), config.timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) }
     })
 
   def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] =
-    runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean)
+    runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), config.timeouts.inspect).map(_.toBoolean)
 
   private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
     val cmd = dockerCmd ++ args