You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2017/08/17 01:28:01 UTC
[incubator-openwhisk] branch master updated: Collapse multiple
concurrent pulls of the same image. (#2626)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new f05058a Collapse multiple concurrent pulls of the same image. (#2626)
f05058a is described below
commit f05058a959848d6524601f6814ac527561c32d46
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Thu Aug 17 03:27:58 2017 +0200
Collapse multiple concurrent pulls of the same image. (#2626)
Multiple concurrent calls of `docker pull` to the same image create unnecessary network traffic. Instead, all of those calls are collapsed into the first calls. After that call finished, a subsequent pull will result in a `docker pull` again to enable updates to the same image, without having to work with tags for convenience.
---
.../core/containerpool/docker/DockerClient.scala | 11 +++++++-
.../docker/test/DockerClientTests.scala | 33 +++++++++++++++++++++-
2 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 166a618..bf94b8e 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -31,6 +31,7 @@ import akka.event.Logging.ErrorLevel
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
+import scala.collection.concurrent.TrieMap
/**
* Serves as interface to the docker CLI tool.
@@ -86,8 +87,16 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio
runCmd(cmd: _*).map(_.lines.toSeq.map(ContainerId.apply))
}
+ /**
+ * Stores pulls that are currently being executed and collapses multiple
+ * pulls into just one. After a pull is finished, the cached future is removed
+ * to enable constant updates of an image without changing its tag.
+ */
+ private val pullsInFlight = TrieMap[String, Future[Unit]]()
def pull(image: String)(implicit transid: TransactionId): Future[Unit] =
- runCmd("pull", image).map(_ => ())
+ pullsInFlight.getOrElseUpdate(image, {
+ runCmd("pull", image).map(_ => pullsInFlight.remove(image))
+ })
private def runCmd(args: String*)(implicit transid: TransactionId): Future[String] = {
val cmd = dockerCmd ++ args
diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
index 09843c4..07e31ce 100644
--- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala
@@ -37,6 +37,8 @@ import whisk.common.TransactionId
import whisk.core.containerpool.docker.ContainerId
import whisk.core.containerpool.docker.ContainerIp
import whisk.core.containerpool.docker.DockerClient
+import scala.concurrent.Promise
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class DockerClientTests extends FlatSpec with Matchers with StreamLogging with BeforeAndAfterEach {
@@ -51,7 +53,7 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
val dockerCommand = "docker"
/** Returns a DockerClient with a mocked result for 'executeProcess' */
- def dockerClient(execResult: Future[String]) = new DockerClient()(global) {
+ def dockerClient(execResult: => Future[String]) = new DockerClient()(global) {
override val dockerCmd = Seq(dockerCommand)
override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult
}
@@ -81,6 +83,35 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B
a[NoSuchElementException] should be thrownBy await(dc.inspectIPAddress(id, "foo network"))
}
+ it should "collapse multiple parallel pull calls into just one" in {
+ // Delay execution of the pull command
+ val pullPromise = Promise[String]()
+ var commandsRun = 0
+ val dc = dockerClient {
+ commandsRun += 1
+ pullPromise.future
+ }
+
+ val image = "testimage"
+
+ // Pull first, command should be run
+ dc.pull(image)
+ commandsRun shouldBe 1
+
+ // Pull again, command should not be run
+ dc.pull(image)
+ commandsRun shouldBe 1
+
+ // Finish the pulls above
+ pullPromise.success("pulled")
+
+ retry {
+ // Pulling again should execute the command again
+ await(dc.pull(image))
+ commandsRun shouldBe 2
+ }
+ }
+
it should "write proper log markers on a successful command" in {
// a dummy string works here as we do not assert any output
// from the methods below
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].