You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/08/17 01:28:01 UTC

[incubator-openwhisk] branch master updated: Collapse multiple concurrent pulls of the same image. (#2626)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f05058a  Collapse multiple concurrent pulls of the same image. (#2626)
f05058a is described below

commit f05058a959848d6524601f6814ac527561c32d46
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Aug 17 03:27:58 2017 +0200

    Collapse multiple concurrent pulls of the same image. (#2626)
    
    Multiple concurrent calls of `docker pull` to the same image create unnecessary network traffic. Instead, all of those calls are collapsed into the first calls. After that call finished, a subsequent pull will result in a `docker pull` again to enable updates to the same image, without having to work with tags for convenience.
---
 .../core/containerpool/docker/DockerClient.scala   | 11 +++++++-
 .../docker/test/DockerClientTests.scala            | 33 +++++++++++++++++++++-
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 166a618..bf94b8e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -31,6 +31,7 @@ import akka.event.Logging.ErrorLevel
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
+import scala.collection.concurrent.TrieMap
 
 /**
  * Serves as interface to the docker CLI tool.
@@ -86,8 +87,16 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
         runCmd(cmd: _*).map(_.lines.toSeq.map(ContainerId.apply))
     }
 
+    /**
+     * Stores pulls that are currently being executed and collapses multiple
+     * pulls into just one. After a pull is finished, the cached future is removed
+     * to enable constant updates of an image without changing its tag.
+     */
+    private val pullsInFlight = TrieMap[String, Future[Unit]]()
     def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
-        runCmd("pull", image).map(_ => ())
+        pullsInFlight.getOrElseUpdate(image, {
+            runCmd("pull", image).map(_ => pullsInFlight.remove(image))
+        })
 
     private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = {
         val cmd = dockerCmd ++ args
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 09843c4..07e31ce 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -37,6 +37,8 @@ import whisk.common.TransactionId
 import whisk.core.containerpool.docker.ContainerId
 import whisk.core.containerpool.docker.ContainerIp
 import whisk.core.containerpool.docker.DockerClient
+import scala.concurrent.Promise
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach {
@@ -51,7 +53,7 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
     val dockerCommand = "docker"
 
     /** Returns a DockerClient with a mocked result for 'executeProcess' */
-    def dockerClient(execResult: Future[String]) = new DockerClient()(global) {
+    def dockerClient(execResult: => Future[String]) = new DockerClient()(global) {
         override val dockerCmd = Seq(dockerCommand)
         override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
     }
@@ -81,6 +83,35 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
         a[NoSuchElementException] should be thrownBy await(dc.inspectIPAddress(id, "foo network"))
     }
 
+    it should "collapse multiple parallel pull calls into just one" in {
+        // Delay execution of the pull command
+        val pullPromise = Promise[String]()
+        var commandsRun = 0
+        val dc = dockerClient {
+            commandsRun += 1
+            pullPromise.future
+        }
+
+        val image = "testimage"
+
+        // Pull first, command should be run
+        dc.pull(image)
+        commandsRun shouldBe 1
+
+        // Pull again, command should not be run
+        dc.pull(image)
+        commandsRun shouldBe 1
+
+        // Finish the pulls above
+        pullPromise.success("pulled")
+
+        retry {
+            // Pulling again should execute the command again
+            await(dc.pull(image))
+            commandsRun shouldBe 2
+        }
+    }
+
     it should "write proper log markers on a successful command" in {
         // a dummy string works here as we do not assert any output
         // from the methods below

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].