You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/10 12:14:24 UTC

[GitHub] markusthoemmes closed pull request #3698: Make amount of parallel docker runs configurable.

markusthoemmes closed pull request #3698: Make amount of parallel docker runs configurable.
URL: https://github.com/apache/incubator-openwhisk/pull/3698
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 98b5c146fc..8fd93bf704 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -206,6 +206,7 @@
       "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
       "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}"
       "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}"
+      "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
       "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
       "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
       "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
@@ -271,7 +272,7 @@
     volumes: "{{ volumes|default('') }},{{ coverage_logs_dir }}/invoker:/coverage"
   when: coverage_enabled
 
-- name: start invoker using docker cli
+- name: start invoker
   docker_container:
     userns_mode: "host"
     pid_mode: "host"
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7fdb10b621..5f7a8db5a0 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -213,7 +213,7 @@ object ConfigKeys {
   val db = "whisk.db"
 
   val docker = "whisk.docker"
-  val dockerTimeouts = s"$docker.timeouts"
+  val dockerClient = s"$docker.client"
   val dockerContainerFactory = s"${docker}.container-factory"
   val runc = "whisk.runc"
   val runcTimeouts = s"$runc.timeouts"
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 8edb3868a6..c471d1b726 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -10,15 +10,25 @@ whisk {
     poll-interval: 5 minutes
   }
 
-  # Timeouts for docker commands. Set to "Inf" to disable timeout.
-  docker.timeouts {
-    run: 1 minute
-    rm: 1 minute
-    pull: 10 minutes
-    ps: 1 minute
-    inspect: 1 minute
-    pause: 10 seconds
-    unpause: 10 seconds
+  docker.client {
+    # Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
+    # concurrently, there is a good chance that some of them will fail.
+    # See https://github.com/moby/moby/issues/29369
+    # Use a semaphore to make sure that at most 10 `docker run` commands are active
+    # the same time.
+    # 0 means that there are infinite parallel runs.
+    parallel-runs: 10
+
+    # Timeouts for docker commands. Set to "Inf" to disable timeout.
+    timeouts {
+      run: 1 minute
+      rm: 1 minute
+      pull: 10 minutes
+      ps: 1 minute
+      inspect: 1 minute
+      pause: 10 seconds
+      unpause: 10 seconds
+    }
   }
 
   docker.container-factory {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 44102b8dcb..8a87e379e8 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -65,6 +65,11 @@ case class DockerClientTimeoutConfig(run: Duration,
                                      unpause: Duration,
                                      inspect: Duration)
 
+/**
+ * Configuration for docker client
+ */
+case class DockerClientConfig(parallelRuns: Int, timeouts: DockerClientTimeoutConfig)
+
 /**
  * Serves as interface to the docker CLI tool.
  *
@@ -74,8 +79,7 @@ case class DockerClientTimeoutConfig(run: Duration,
  * You only need one instance (and you shouldn't get more).
  */
 class DockerClient(dockerHost: Option[String] = None,
-                   timeouts: DockerClientTimeoutConfig =
-                     loadConfigOrThrow[DockerClientTimeoutConfig](ConfigKeys.dockerTimeouts))(
+                   config: DockerClientConfig = loadConfigOrThrow[DockerClientConfig](ConfigKeys.dockerClient))(
   executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
     extends DockerApi
     with ProcessRunner {
@@ -96,8 +100,9 @@ class DockerClient(dockerHost: Option[String] = None,
     Seq(dockerBin) ++ host
   }
 
-  protected val maxParallelRuns = 10
-  protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true)
+  protected val maxParallelRuns = config.parallelRuns
+  protected val runSemaphore =
+    new Semaphore( /* permits= */ if (maxParallelRuns > 0) maxParallelRuns else Int.MaxValue, /* fair= */ true)
 
   // Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
   // concurrently, there is a good chance that some of them will fail.
@@ -114,7 +119,7 @@ class DockerClient(dockerHost: Option[String] = None,
       }
     }.flatMap { _ =>
       // Iff the semaphore was acquired successfully
-      runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run)
+      runCmd(Seq("run", "-d") ++ args ++ Seq(image), config.timeouts.run)
         .andThen {
           // Release the semaphore as quick as possible regardless of the runCmd() result
           case _ => runSemaphore.release()
@@ -139,26 +144,26 @@ class DockerClient(dockerHost: Option[String] = None,
   def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] =
     runCmd(
       Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString),
-      timeouts.inspect).flatMap {
+      config.timeouts.inspect).flatMap {
       case "<no value>" => Future.failed(new NoSuchElementException)
       case stdout       => Future.successful(ContainerAddress(stdout))
     }
 
   def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
+    runCmd(Seq("pause", id.asString), config.timeouts.pause).map(_ => ())
 
   def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ())
+    runCmd(Seq("unpause", id.asString), config.timeouts.unpause).map(_ => ())
 
   def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
-    runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ())
+    runCmd(Seq("rm", "-f", id.asString), config.timeouts.rm).map(_ => ())
 
   def ps(filters: Seq[(String, String)] = Seq.empty, all: Boolean = false)(
     implicit transid: TransactionId): Future[Seq[ContainerId]] = {
     val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value") }
     val allArg = if (all) Seq("--all") else Seq.empty[String]
     val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs
-    runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
+    runCmd(cmd, config.timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
   }
 
   /**
@@ -169,11 +174,11 @@ class DockerClient(dockerHost: Option[String] = None,
   private val pullsInFlight = TrieMap[String, Future[Unit]]()
   def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
     pullsInFlight.getOrElseUpdate(image, {
-      runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) }
+      runCmd(Seq("pull", image), config.timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) }
     })
 
   def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] =
-    runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean)
+    runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), config.timeouts.inspect).map(_.toBoolean)
 
   private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
     val cmd = dockerCmd ++ args


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services