You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/07/22 00:19:14 UTC

[beam] branch master updated: Fix Python Flink runner load tests & Stop publish Python SDK image in beam_portability (#27595)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 97cb32ea5dd Fix Python Flink runner load tests & Stop publish Python SDK image in beam_portability (#27595)
97cb32ea5dd is described below

commit 97cb32ea5dd087068ff476a49f845ea8fe0fe3ef
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Jul 21 20:19:07 2023 -0400

    Fix Python Flink runner load tests & Stop publish Python SDK image in beam_portability (#27595)
---
 .test-infra/jenkins/LoadTestsBuilder.groovy        |  6 ++--
 .../jenkins/job_LoadTests_Combine_Flink_Go.groovy  |  4 +--
 .../job_LoadTests_Combine_Flink_Python.groovy      |  3 +-
 .../jenkins/job_LoadTests_GBK_Flink_Go.groovy      |  4 +--
 .../jenkins/job_LoadTests_GBK_Flink_Python.groovy  |  3 +-
 .../jenkins/job_LoadTests_ParDo_Flink_Go.groovy    |  4 +--
 .../job_LoadTests_ParDo_Flink_Python.groovy        |  3 +-
 .../job_LoadTests_SideInput_Flink_Go.groovy        |  4 +--
 .../jenkins/job_LoadTests_coGBK_Flink_Go.groovy    |  4 +--
 .../job_LoadTests_coGBK_Flink_Python.groovy        |  3 +-
 ...Commit_Python_Chicago_Taxi_Example_Flink.groovy |  3 +-
 .../jenkins/job_Publish_Docker_Snapshots.groovy    |  4 +--
 .../fnexecution/environment/DockerCommand.java     | 41 +++++++++++++++-------
 13 files changed, 53 insertions(+), 33 deletions(-)

diff --git a/.test-infra/jenkins/LoadTestsBuilder.groovy b/.test-infra/jenkins/LoadTestsBuilder.groovy
index b5b0b664acf..060a2ea6542 100644
--- a/.test-infra/jenkins/LoadTestsBuilder.groovy
+++ b/.test-infra/jenkins/LoadTestsBuilder.groovy
@@ -24,10 +24,10 @@ import InfluxDBCredentialsHelper
 import static PythonTestProperties.LOAD_TEST_PYTHON_VERSION
 
 class LoadTestsBuilder {
-  final static String DOCKER_CONTAINER_REGISTRY = 'gcr.io/apache-beam-testing/beam_portability'
-  final static String DOCKER_CONTAINER_REGISTRY_SNAPSHOTS = 'gcr.io/apache-beam-testing/beam-sdk'
-  final static String GO_SDK_CONTAINER = "${DOCKER_CONTAINER_REGISTRY_SNAPSHOTS}/beam_go_sdk:latest"
+  final static String DOCKER_CONTAINER_REGISTRY = 'gcr.io/apache-beam-testing/beam-sdk'
+  final static String GO_SDK_CONTAINER = "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest"
   final static String DOCKER_BEAM_SDK_IMAGE = "beam_python${LOAD_TEST_PYTHON_VERSION}_sdk:latest"
+  final static String DOCKER_BEAM_JOBSERVER = 'gcr.io/apache-beam-testing/beam_portability'
 
   static void loadTests(scope, CommonTestProperties.SDK sdk, List testConfigurations, String test, String mode,
       List<String> jobSpecificSwitches = null) {
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
index e161ba0c0de..9b8adc732f9 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
 import Flink
 import InfluxDBCredentialsHelper
 
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.GO_SDK_CONTAINER
 
 
@@ -107,7 +107,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         GO_SDK_CONTAINER
       ],
       initialParallelism,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   // Execute all scenarios connected with initial parallelism.
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, initialScenarios, 'combine', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
index ff7cc5a9e79..54b92fdade2 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
@@ -24,6 +24,7 @@ import Flink
 import InfluxDBCredentialsHelper
 
 import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
 
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -135,7 +136,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
       ],
       initialParallelism,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   // Execute all scenarios connected with initial parallelism.
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, initialScenarios, 'Combine', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
index 59fbeb75bc2..d5a6910b2a0 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
 import Flink
 import InfluxDBCredentialsHelper
 
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.GO_SDK_CONTAINER
 
 String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
@@ -199,7 +199,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         GO_SDK_CONTAINER
       ],
       initialParallelism,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   // Execute all scenarios connected with initial parallelism.
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, initialScenarios, 'group_by_key', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
index f4714a2d0e3..25e2647ebf3 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
@@ -24,6 +24,7 @@ import Flink
 import InfluxDBCredentialsHelper
 
 import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
 
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -146,7 +147,7 @@ def loadTest = { scope, triggeringContext ->
         "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   def configurations = testScenarios.findAll { it.pipelineOptions?.parallelism?.value == numberOfWorkers }
   loadTestsBuilder.loadTests(scope, sdk, configurations, "GBK", "batch")
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
index c5af6cc42d1..df20312f27b 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
 import Flink
 import InfluxDBCredentialsHelper
 
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.GO_SDK_CONTAINER
 
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -127,7 +127,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         GO_SDK_CONTAINER
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'ParDo', mode)
 }
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
index 2979a16300c..4af2efd1be6 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
@@ -24,6 +24,7 @@ import Flink
 import InfluxDBCredentialsHelper
 
 import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
 
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -329,7 +330,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'ParDo', mode)
 }
diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
index f2c008b4f25..bd0eaa4f23e 100644
--- a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
@@ -22,7 +22,7 @@ import LoadTestsBuilder as loadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 import InfluxDBCredentialsHelper
 
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.GO_SDK_CONTAINER
 
 def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -79,7 +79,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         GO_SDK_CONTAINER
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
       batchScenarios(), 'SideInput', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
index 95468a9d2f2..8c7a60e724f 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
@@ -23,7 +23,7 @@ import PhraseTriggeringPostCommitBuilder
 import Flink
 import InfluxDBCredentialsHelper
 
-import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.GO_SDK_CONTAINER
 
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
@@ -159,7 +159,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
         GO_SDK_CONTAINER
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, batchScenarios(), 'CoGBK', mode)
 }
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
index a0a06ea4f14..9a0798f8107 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
@@ -25,6 +25,7 @@ import InfluxDBCredentialsHelper
 
 import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
 import static LoadTestsBuilder.DOCKER_BEAM_SDK_IMAGE
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 
 String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
 
@@ -137,7 +138,7 @@ def loadTest = { scope, triggeringContext ->
         "${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON, testScenarios, 'CoGBK', 'batch')
 }
diff --git a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
index c1b4a4496f2..6cf852a1689 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
@@ -22,6 +22,7 @@ import Flink
 import LoadTestsBuilder
 import PhraseTriggeringPostCommitBuilder
 
+import static LoadTestsBuilder.DOCKER_BEAM_JOBSERVER
 import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
 import static PythonTestProperties.CHICAGO_TAXI_EXAMPLE_FLINK_PYTHON_VERSION
 
@@ -38,7 +39,7 @@ def chicagoTaxiJob = { scope ->
         "${DOCKER_CONTAINER_REGISTRY}/${beamSdkDockerImage}"
       ],
       numberOfWorkers,
-      "${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
+      "${DOCKER_BEAM_JOBSERVER}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
 
   def pipelineOptions = [
     parallelism             : numberOfWorkers,
diff --git a/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy b/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy
index e313b939255..510acd8f37d 100644
--- a/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy
+++ b/.test-infra/jenkins/job_Publish_Docker_Snapshots.groovy
@@ -41,9 +41,7 @@ job('beam_Publish_Docker_Snapshots') {
     gradle {
       rootBuildScriptDir(commonJobProperties.checkoutDir)
       commonJobProperties.setGradleSwitches(delegate)
-      SUPPORTED_CONTAINER_TASKS.each { taskVer ->
-        tasks(":sdks:python:container:${taskVer}:dockerPush")
-      }
+      tasks(":runners:spark:${CommonTestProperties.getSparkVersion()}:job-server:container:dockerPush")
       tasks(":runners:flink:${CommonTestProperties.getFlinkVersion()}:job-server-container:dockerPush")
       switches("-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability")
       switches("-Pdocker-tag=latest")
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
index bdd995b4b7c..e186a2924a1 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java
@@ -53,20 +53,28 @@ class DockerCommand {
   // but we _should_ always capture full ids.
   private static final Pattern CONTAINER_ID_PATTERN = Pattern.compile("\\p{XDigit}{64}");
 
+  /**
+   * Return a DockerCommand instance with default timeout settings: pull timeout 10 min and other
+   * command timeout 2 min.
+   */
   public static DockerCommand getDefault() {
-    return forExecutable(DEFAULT_DOCKER_COMMAND, Duration.ofMinutes(2));
+    return forExecutable(DEFAULT_DOCKER_COMMAND, Duration.ofMinutes(2), Duration.ofMinutes(10));
   }
 
-  static DockerCommand forExecutable(String dockerExecutable, Duration commandTimeout) {
-    return new DockerCommand(dockerExecutable, commandTimeout);
+  static DockerCommand forExecutable(
+      String dockerExecutable, Duration commandTimeout, Duration pullTimeout) {
+    return new DockerCommand(dockerExecutable, commandTimeout, pullTimeout);
   }
 
   private final String dockerExecutable;
   private final Duration commandTimeout;
+  // pull remote image can take longer time
+  private final Duration pullTimeout;
 
-  private DockerCommand(String dockerExecutable, Duration commandTimeout) {
+  private DockerCommand(String dockerExecutable, Duration commandTimeout, Duration pullTimeout) {
     this.dockerExecutable = dockerExecutable;
     this.commandTimeout = commandTimeout;
+    this.pullTimeout = pullTimeout;
   }
 
   /**
@@ -83,7 +91,8 @@ class DockerCommand {
     // Pull the image from docker repo. This will be no-op if the image already exists.
     try {
       runShortCommand(
-          ImmutableList.<String>builder().add(dockerExecutable).add("pull").add(imageTag).build());
+          ImmutableList.<String>builder().add(dockerExecutable).add("pull").add(imageTag).build(),
+          pullTimeout);
     } catch (IOException | TimeoutException | InterruptedException e) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Unable to pull docker image {}", imageTag, e);
@@ -134,7 +143,8 @@ class DockerCommand {
     checkArgument(
         CONTAINER_ID_PATTERN.matcher(containerId).matches(),
         "Container ID must be a 64-character hexadecimal string");
-    return runShortCommand(Arrays.asList(dockerExecutable, "logs", containerId), true, "\n");
+    return runShortCommand(
+        Arrays.asList(dockerExecutable, "logs", containerId), true, "\n", commandTimeout);
   }
 
   /**
@@ -168,7 +178,12 @@ class DockerCommand {
 
   private String runShortCommand(List<String> invocation)
       throws IOException, TimeoutException, InterruptedException {
-    return runShortCommand(invocation, false, "");
+    return runShortCommand(invocation, false, "", commandTimeout);
+  }
+
+  private String runShortCommand(List<String> invocation, Duration timeout)
+      throws IOException, TimeoutException, InterruptedException {
+    return runShortCommand(invocation, false, "", timeout);
   }
 
   /**
@@ -182,7 +197,10 @@ class DockerCommand {
    * @throws TimeoutException if command has not finished by {@link DockerCommand#commandTimeout}
    */
   private String runShortCommand(
-      List<String> invocation, boolean redirectErrorStream, CharSequence delimiter)
+      List<String> invocation,
+      boolean redirectErrorStream,
+      CharSequence delimiter,
+      Duration timeout)
       throws IOException, TimeoutException, InterruptedException {
     ProcessBuilder pb = new ProcessBuilder(invocation);
     pb.redirectErrorStream(redirectErrorStream);
@@ -216,7 +234,7 @@ class DockerCommand {
               });
     }
     // TODO: Retry on interrupt?
-    boolean processDone = process.waitFor(commandTimeout.toMillis(), TimeUnit.MILLISECONDS);
+    boolean processDone = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
     if (!processDone) {
       process.destroy();
       throw new TimeoutException(
@@ -228,7 +246,7 @@ class DockerCommand {
     if (exitCode != 0) {
       String errorString;
       try {
-        errorString = errorFuture.get(commandTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        errorString = errorFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
       } catch (Exception stderrEx) {
         errorString =
             String.format("Error capturing %s: %s", errorStringName, stderrEx.getMessage());
@@ -242,8 +260,7 @@ class DockerCommand {
               errorString));
     }
     try {
-      // TODO: Consider a stricter timeout.
-      return resultString.get(commandTimeout.toMillis(), TimeUnit.MILLISECONDS);
+      return resultString.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
       // Recast any exceptions in reading output as IOExceptions.