You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2017/11/24 10:30:45 UTC

[GitHub] markusthoemmes closed pull request #2995: At most 10 `docker run` commands are allowed in parallel

markusthoemmes closed pull request #2995: At most 10 `docker run` commands are allowed in parallel
URL: https://github.com/apache/incubator-openwhisk/pull/2995
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index bf943252aa..e6b3dabe95 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -20,18 +20,20 @@ package whisk.core.containerpool.docker
 import java.io.FileNotFoundException
 import java.nio.file.Files
 import java.nio.file.Paths
+import java.util.concurrent.Semaphore
 
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.blocking
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
 import akka.event.Logging.ErrorLevel
+
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
-
-import scala.collection.concurrent.TrieMap
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
 
@@ -75,25 +77,46 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
     Seq(dockerBin) ++ host
   }
 
+  protected val maxParallelRuns = 10
+  protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true)
+
+  // Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run)
+  // concurrently, there is a good chance that some of them will fail.
+  // See https://github.com/moby/moby/issues/29369
+  // Use a semaphore to make sure that at most 10 `docker run` commands are active
+  // the same time.
   def run(image: String, args: Seq[String] = Seq.empty[String])(
     implicit transid: TransactionId): Future[ContainerId] = {
-    runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
-      .map {
-        ContainerId(_)
-      }
-      .recoverWith {
-        // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
-        // Exit code 125 means an error reported by the Docker daemon.
-        // Examples:
-        // - Unrecognized option specified
-        // - Not enough disk space
-        case pre: ProcessRunningException if pre.exitCode == 125 =>
-          Future.failed(
-            DockerContainerId
-              .parse(pre.stdout)
-              .map(BrokenDockerContainer(_, s"Broken container: ${pre.getMessage}"))
-              .getOrElse(pre))
+    Future {
+      blocking {
+        // Acquires a permit from this semaphore, blocking until one is available, or the thread is interrupted.
+        // Throws InterruptedException if the current thread is interrupted
+        runSemaphore.acquire()
       }
+    }.flatMap { _ =>
+      // Iff the semaphore was acquired successfully
+      runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*)
+        .andThen {
+          // Release the semaphore as quick as possible regardless of the runCmd() result
+          case _ => runSemaphore.release()
+        }
+        .map {
+          ContainerId(_)
+        }
+        .recoverWith {
+          // https://docs.docker.com/v1.12/engine/reference/run/#/exit-status
+          // Exit code 125 means an error reported by the Docker daemon.
+          // Examples:
+          // - Unrecognized option specified
+          // - Not enough disk space
+          case pre: ProcessRunningException if pre.exitCode == 125 =>
+            Future.failed(
+              DockerContainerId
+                .parse(pre.stdout)
+                .map(BrokenDockerContainer(_, s"Broken container: ${pre.getMessage}"))
+                .getOrElse(pre))
+        }
+    }
   }
 
   def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] =
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 8f447cfaa6..9c2b5b3239 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -17,6 +17,8 @@
 
 package whisk.core.containerpool.docker.test
 
+import java.util.concurrent.Semaphore
+
 import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -27,26 +29,31 @@ import scala.concurrent.Promise
 import scala.util.Success
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
+import org.scalatest.time.{Seconds, Span}
 import common.StreamLogging
+
 import whisk.common.LogMarker
 import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD
 import whisk.common.TransactionId
-import whisk.core.containerpool.docker.DockerClient
-import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
-import whisk.utils.retry
-import whisk.core.containerpool.docker.ProcessRunningException
-import whisk.core.containerpool.docker.DockerContainerId
+import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.docker.BrokenDockerContainer
+import whisk.core.containerpool.docker.DockerClient
+import whisk.core.containerpool.docker.DockerContainerId
+import whisk.core.containerpool.docker.ProcessRunningException
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
-class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach {
+class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach with Eventually {
 
   override def beforeEach = stream.reset()
 
+  implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(5, Seconds)))
+
   implicit val transid = TransactionId.testing
   val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
 
@@ -169,6 +176,96 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
     }
   }
 
+  it should "limit the number of concurrent docker run invocations" in {
+    // Delay execution of Docker run command
+    val firstRunPromise = Promise[String]()
+
+    val firstContainerId = ContainerId("1" * 64)
+    val secondContainerId = ContainerId("2" * 64)
+
+    var runCmdCount = 0
+    val dc = new DockerClient()(global) {
+      override val dockerCmd = Seq(dockerCommand)
+      override def executeProcess(args: String*)(implicit ec: ExecutionContext) = {
+        runCmdCount += 1
+        runCmdCount match {
+          case 1 => firstRunPromise.future
+          case 2 => Future.successful(secondContainerId.asString)
+          case _ => Future.failed(new Throwable())
+        }
+      }
+      // Need to override the semaphore, otherwise the tested code will still
+      // create the semaphore with the original value of maxParallelRuns.
+      override val maxParallelRuns = 1
+      override val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true)
+    }
+
+    val image = "image"
+    val args = Seq("args")
+
+    val firstRunResult = dc.run(image, args)
+    val secondRunResult = dc.run(image, args)
+
+    // The tested code won't reach the mocked executeProcess() and thus, increase runCmdCount,
+    // until at least one Future is successfully completed. For this reason, it takes
+    // some time until the following matcher is successful.
+    eventually { runCmdCount shouldBe 1 }
+
+    // Complete the first Docker run command so that the second is eligible to run
+    firstRunPromise.success(firstContainerId.asString)
+
+    // Cannot assert that the first Docker run always obtains the first container because
+    // the tested code uses Futures so that sequence may differ from test run to test run.
+    val firstResultContainerId = await(firstRunResult)
+
+    // Now, second command should be complete
+    eventually { runCmdCount shouldBe 2 }
+
+    val secondResultContainerId = await(secondRunResult)
+    Set(firstResultContainerId, secondResultContainerId) should contain theSameElementsAs Set(
+      firstContainerId,
+      secondContainerId)
+  }
+
+  it should "tolerate docker run errors when limiting the number of concurrent docker run invocations" in {
+    val secondContainerId = ContainerId("2" * 64)
+
+    var runCmdCount = 0
+    val dc = new DockerClient()(global) {
+      override val dockerCmd = Seq(dockerCommand)
+      override def executeProcess(args: String*)(implicit ec: ExecutionContext) = {
+        runCmdCount += 1
+        println(s"runCmdCount=${runCmdCount}, args.last=${args.last}")
+        runCmdCount match {
+          case 1 => Future.failed(ProcessRunningException(1, "", ""))
+          case 2 => Future.successful(secondContainerId.asString)
+          case _ => Future.failed(new Throwable())
+        }
+      }
+      // Need to override the semaphore, otherwise the tested code will still
+      // create the semaphore with the original value of maxParallelRuns.
+      override val maxParallelRuns = 1
+      override val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true)
+    }
+
+    val image = "image"
+    val args = Seq("args")
+
+    // Kick off the first Docker run command - it will fail.
+    val firstRunResult = dc.run(image, args)
+
+    an[Exception] should be thrownBy await(firstRunResult)
+    runCmdCount shouldBe 1
+
+    // Now kick off the second Docker run command - it is expected to succeed.
+    // If this command completes without timeout, the concurrency limit properly
+    // deals with errors.
+    val secondRunResult = dc.run(image, args)
+
+    await(secondRunResult) shouldBe secondContainerId
+    runCmdCount shouldBe 2
+  }
+
   it should "write proper log markers on a successful command" in {
     // a dummy string works here as we do not assert any output
     // from the methods below


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services