You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/08/10 12:14:26 UTC
[incubator-openwhisk] branch master updated: Make amount of
parallel docker runs configurable. (#3698)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 9d20eae Make amount of parallel docker runs configurable. (#3698)
9d20eae is described below
commit 9d20eae1b51c1599273506c85d7fe45fa4d6e0bc
Author: Christian Bickel <gi...@cbickel.de>
AuthorDate: Fri Aug 10 14:14:22 2018 +0200
Make amount of parallel docker runs configurable. (#3698)
---
ansible/roles/invoker/tasks/deploy.yml | 3 ++-
.../src/main/scala/whisk/core/WhiskConfig.scala | 2 +-
core/invoker/src/main/resources/application.conf | 28 ++++++++++++++-------
.../core/containerpool/docker/DockerClient.scala | 29 +++++++++++++---------
4 files changed, 39 insertions(+), 23 deletions(-)
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 98b5c14..8fd93bf 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -206,6 +206,7 @@
"INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
"CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}"
"CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}"
+ "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}"
"CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
"WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
"METRICS_KAMON": "{{ metrics.kamon.enabled }}"
@@ -271,7 +272,7 @@
volumes: "{{ volumes|default('') }},{{ coverage_logs_dir }}/invoker:/coverage"
when: coverage_enabled
-- name: start invoker using docker cli
+- name: start invoker
docker_container:
userns_mode: "host"
pid_mode: "host"
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7fdb10b..5f7a8db 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -213,7 +213,7 @@ object ConfigKeys {
val db = "whisk.db"
val docker = "whisk.docker"
- val dockerTimeouts = s"$docker.timeouts"
+ val dockerClient = s"$docker.client"
val dockerContainerFactory = s"${docker}.container-factory"
val runc = "whisk.runc"
val runcTimeouts = s"$runc.timeouts"
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 8edb386..c471d1b 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -10,15 +10,25 @@ whisk {
poll-interval: 5 minutes
}
- # Timeouts for docker commands. Set to "Inf" to disable timeout.
- docker.timeouts {
- run: 1 minute
- rm: 1 minute
- pull: 10 minutes
- ps: 1 minute
- inspect: 1 minute
- pause: 10 seconds
- unpause: 10 seconds
+ docker.client {
+ # Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
+ # concurrently, there is a good chance that some of them will fail.
+ # See https://github.com/moby/moby/issues/29369
+ # Use a semaphore to make sure that at most 10 `docker run` commands are active
+ # the same time.
+ # 0 means that there are infinite parallel runs.
+ parallel-runs: 10
+
+ # Timeouts for docker commands. Set to "Inf" to disable timeout.
+ timeouts {
+ run: 1 minute
+ rm: 1 minute
+ pull: 10 minutes
+ ps: 1 minute
+ inspect: 1 minute
+ pause: 10 seconds
+ unpause: 10 seconds
+ }
}
docker.container-factory {
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 44102b8..8a87e37 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -66,6 +66,11 @@ case class DockerClientTimeoutConfig(run: Duration,
inspect: Duration)
/**
+ * Configuration for docker client
+ */
+case class DockerClientConfig(parallelRuns: Int, timeouts: DockerClientTimeoutConfig)
+
+/**
* Serves as interface to the docker CLI tool.
*
* Be cautious with the ExecutionContext passed to this, as the
@@ -74,8 +79,7 @@ case class DockerClientTimeoutConfig(run: Duration,
* You only need one instance (and you shouldn't get more).
*/
class DockerClient(dockerHost: Option[String] = None,
- timeouts: DockerClientTimeoutConfig =
- loadConfigOrThrow[DockerClientTimeoutConfig](ConfigKeys.dockerTimeouts))(
+ config: DockerClientConfig = loadConfigOrThrow[DockerClientConfig](ConfigKeys.dockerClient))(
executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
extends DockerApi
with ProcessRunner {
@@ -96,8 +100,9 @@ class DockerClient(dockerHost: Option[String] = None,
Seq(dockerBin) ++ host
}
- protected val maxParallelRuns = 10
- protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true)
+ protected val maxParallelRuns = config.parallelRuns
+ protected val runSemaphore =
+ new Semaphore( /* permits= */ if (maxParallelRuns > 0) maxParallelRuns else Int.MaxValue, /* fair= */ true)
// Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
// concurrently, there is a good chance that some of them will fail.
@@ -114,7 +119,7 @@ class DockerClient(dockerHost: Option[String] = None,
}
}.flatMap { _ =>
// Iff the semaphore was acquired successfully
- runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run)
+ runCmd(Seq("run", "-d") ++ args ++ Seq(image), config.timeouts.run)
.andThen {
// Release the semaphore as quick as possible regardless of the runCmd() result
case _ => runSemaphore.release()
@@ -139,26 +144,26 @@ class DockerClient(dockerHost: Option[String] = None,
def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] =
runCmd(
Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString),
- timeouts.inspect).flatMap {
+ config.timeouts.inspect).flatMap {
case "<no value>" => Future.failed(new NoSuchElementException)
case stdout => Future.successful(ContainerAddress(stdout))
}
def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ())
+ runCmd(Seq("pause", id.asString), config.timeouts.pause).map(_ => ())
def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ())
+ runCmd(Seq("unpause", id.asString), config.timeouts.unpause).map(_ => ())
def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
- runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ())
+ runCmd(Seq("rm", "-f", id.asString), config.timeouts.rm).map(_ => ())
def ps(filters: Seq[(String, String)] = Seq.empty, all: Boolean = false)(
implicit transid: TransactionId): Future[Seq[ContainerId]] = {
val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value") }
val allArg = if (all) Seq("--all") else Seq.empty[String]
val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs
- runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
+ runCmd(cmd, config.timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply))
}
/**
@@ -169,11 +174,11 @@ class DockerClient(dockerHost: Option[String] = None,
private val pullsInFlight = TrieMap[String, Future[Unit]]()
def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
pullsInFlight.getOrElseUpdate(image, {
- runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) }
+ runCmd(Seq("pull", image), config.timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) }
})
def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] =
- runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean)
+ runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), config.timeouts.inspect).map(_.toBoolean)
private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
val cmd = dockerCmd ++ args