You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/26 04:56:54 UTC

[pulsar] branch master updated: [Tests] Improve integration test logging to improve readability and efficiency (#10320)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eff07c  [Tests] Improve integration test logging to improve readability and efficiency (#10320)
7eff07c is described below

commit 7eff07c53e5cf730b7e383dddb7e4d2c8b53bc0d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Apr 26 07:56:15 2021 +0300

    [Tests] Improve integration test logging to improve readability and efficiency (#10320)
    
    * Use simple docker log tailing
    
    * Refactor DockerUtils
    
    * Improve "tail -f logfile" efficiency in integration tests
---
 .../integration/containers/BrokerContainer.java    |   4 +-
 .../integration/containers/ChaosContainer.java     |  39 +--
 .../containers/PrestoWorkerContainer.java          |   7 +-
 .../tests/integration/utils/DockerUtils.java       | 354 ++++++++++++---------
 4 files changed, 202 insertions(+), 202 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
index 636d60f..07558ce 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
@@ -30,12 +30,12 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {
     public BrokerContainer(String clusterName, String hostName) {
         super(
             clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_HTTP_PORT);
+        tailContainerLog();
     }
 
     @Override
     protected void afterStart() {
-        this.tailContainerLog();
-        DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+        DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
                 "tail", "-f", "/var/log/pulsar/broker.log");
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index cf5a884..6064d5b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -64,43 +64,8 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
         super.stop();
     }
 
-    public void tailContainerLog() {
-        CompletableFuture.runAsync(() -> {
-            while (null == getContainerId()) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(100);
-                } catch (InterruptedException e) {
-                    return;
-                }
-            }
-
-            LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(getContainerId());
-            logContainerCmd.withStdOut(true).withStdErr(true).withFollowStream(true);
-            logContainerCmd.exec(new LogContainerResultCallback() {
-                @Override
-                public void onNext(Frame item) {
-                    log.info(new String(item.getPayload(), UTF_8));
-                }
-            });
-        });
-    }
-
-    public String getContainerLog() {
-        StringBuilder sb = new StringBuilder();
-
-        LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(getContainerId());
-        logContainerCmd.withStdOut(true).withStdErr(true);
-        try {
-            logContainerCmd.exec(new LogContainerResultCallback() {
-                @Override
-                public void onNext(Frame item) {
-                    sb.append(new String(item.getPayload(), UTF_8));
-                }
-            }).awaitCompletion();
-        } catch (InterruptedException e) {
-
-        }
-        return sb.toString();
+    protected void tailContainerLog() {
+        withLogConsumer(item -> log.info(item.getUtf8String()));
     }
 
     public void putFile(String path, byte[] contents) throws Exception {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
index 7c6c9d6..fe08b81 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -37,15 +37,14 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
                 -1,
                 PRESTO_HTTP_PORT,
                 "/v1/info/state");
-
+        tailContainerLog();
     }
 
     @Override
     protected void afterStart() {
-        this.tailContainerLog();
-        DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+        DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
                 "tail", "-f", "/var/log/pulsar/presto_worker.log");
-        DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+        DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
                 "tail", "-f", "/pulsar/lib/presto/var/log/server.log");
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 8fe8e33..7c90cd3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.utils;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.async.ResultCallback;
 import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -26,29 +27,30 @@ import com.github.dockerjava.api.exception.NotFoundException;
 import com.github.dockerjava.api.model.ContainerNetwork;
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.StreamType;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.zip.GZIPOutputStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DockerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);
@@ -66,47 +68,41 @@ public class DockerUtils {
     }
 
     public static void dumpContainerLogToTarget(DockerClient dockerClient, String containerId) {
-        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
-        // docker api returns names prefixed with "/", it's part of it's legacy design,
-        // this removes it to be consistent with what docker ps shows.
-        final String containerName = inspectContainerResponse.getName().replace("/","");
-        File output = new File(getTargetDirectory(containerName), "docker.log");
-        int i = 0;
-        while (output.exists()) {
-            LOG.info("{} exists, incrementing", output);
-            output = new File(getTargetDirectory(containerName), "docker." + i++ + ".log");
-        }
-        try (FileOutputStream os = new FileOutputStream(output)) {
+        final String containerName = getContainerName(dockerClient, containerId);
+        File output = getUniqueFileInTargetDirectory(containerName, "docker", ".log");
+        try (OutputStream os = new BufferedOutputStream(new FileOutputStream(output))) {
             CompletableFuture<Boolean> future = new CompletableFuture<>();
             dockerClient.logContainerCmd(containerName).withStdOut(true)
-                .withStdErr(true).withTimestamps(true).exec(new ResultCallback<Frame>() {
-                        @Override
-                        public void close() {}
-
-                        @Override
-                        public void onStart(Closeable closeable) {}
-
-                        @Override
-                        public void onNext(Frame object) {
-                            try {
-                                os.write(object.getPayload());
-                            } catch (IOException e) {
-                                onError(e);
-                            }
-                        }
+                    .withStdErr(true).withTimestamps(true).exec(new ResultCallback<Frame>() {
+                @Override
+                public void close() {
+                }
 
-                        @Override
-                        public void onError(Throwable throwable) {
-                            future.completeExceptionally(throwable);
-                        }
+                @Override
+                public void onStart(Closeable closeable) {
+                }
 
-                        @Override
-                        public void onComplete() {
-                            future.complete(true);
-                        }
-                    });
+                @Override
+                public void onNext(Frame object) {
+                    try {
+                        os.write(object.getPayload());
+                    } catch (IOException e) {
+                        onError(e);
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    future.completeExceptionally(throwable);
+                }
+
+                @Override
+                public void onComplete() {
+                    future.complete(true);
+                }
+            });
             future.get();
-        } catch (RuntimeException|ExecutionException|IOException e) {
+        } catch (RuntimeException | ExecutionException | IOException e) {
             LOG.error("Error dumping log for {}", containerName, e);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -114,29 +110,36 @@ public class DockerUtils {
         }
     }
 
-    public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
-                                                          String path) {
-        final int READ_BLOCK_SIZE = 10000;
+    private static File getUniqueFileInTargetDirectory(String containerName, String prefix, String suffix) {
+        return getUniqueFileInDirectory(getTargetDirectory(containerName), prefix, suffix);
+    }
+
+    private static File getUniqueFileInDirectory(File directory, String prefix, String suffix) {
+        File file = new File(directory, prefix + suffix);
+        int i = 0;
+        while (file.exists()) {
+            LOG.info("{} exists, incrementing", file);
+            file = new File(directory, prefix + "_" + (i++) + suffix);
+        }
+        return file;
+    }
+
+    private static String getContainerName(DockerClient dockerClient, String containerId) {
         final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
         // docker api returns names prefixed with "/", it's part of it's legacy design,
         // this removes it to be consistent with what docker ps shows.
-        final String containerName = inspectContainerResponse.getName().replace("/","");
+        return inspectContainerResponse.getName().replace("/", "");
+    }
+
+    public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
+                                                          String path) {
+        final String containerName = getContainerName(dockerClient, containerId);
         final String baseName = path.replace("/", "-").replaceAll("^-", "");
-        File output = new File(getTargetDirectory(containerName), baseName + ".tar.gz");
-        int i = 0;
-        while (output.exists()) {
-            LOG.info("{} exists, incrementing", output);
-            output = new File(getTargetDirectory(containerName), baseName + "_" + i++ + ".tar.gz");
-        }
+        File output = getUniqueFileInTargetDirectory(containerName, baseName, ".tar.gz");
         try (InputStream dockerStream = dockerClient.copyArchiveFromContainerCmd(containerId, path).exec();
-             OutputStream os = new GZIPOutputStream(new FileOutputStream(output))) {
-            byte[] block = new byte[READ_BLOCK_SIZE];
-            int read = dockerStream.read(block, 0, READ_BLOCK_SIZE);
-            while (read > -1) {
-                os.write(block, 0, read);
-                read = dockerStream.read(block, 0, READ_BLOCK_SIZE);
-            }
-        } catch (RuntimeException|IOException e) {
+             OutputStream os = new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(output)))) {
+            IOUtils.copy(dockerStream, os);
+        } catch (RuntimeException | IOException e) {
             if (!(e instanceof NotFoundException)) {
                 LOG.error("Error reading dir from container {}", containerName, e);
             }
@@ -145,33 +148,25 @@ public class DockerUtils {
 
     public static void dumpContainerLogDirToTarget(DockerClient docker, String containerId,
                                                    String path) {
-        final int READ_BLOCK_SIZE = 10000;
-
+        File targetDirectory = getTargetDirectory(containerId);
         try (InputStream dockerStream = docker.copyArchiveFromContainerCmd(containerId, path).exec();
              TarArchiveInputStream stream = new TarArchiveInputStream(dockerStream)) {
             TarArchiveEntry entry = stream.getNextTarEntry();
             while (entry != null) {
                 if (entry.isFile()) {
-                    File output = new File(getTargetDirectory(containerId), entry.getName().replace("/", "-"));
-                    try (FileOutputStream os = new FileOutputStream(output)) {
-                        byte[] block = new byte[READ_BLOCK_SIZE];
-                        int read = stream.read(block, 0, READ_BLOCK_SIZE);
-                        while (read > -1) {
-                            os.write(block, 0, read);
-                            read = stream.read(block, 0, READ_BLOCK_SIZE);
-                        }
-                    }
+                    File output = new File(targetDirectory, entry.getName().replace("/", "-"));
+                    Files.copy(stream, output.toPath(), StandardCopyOption.REPLACE_EXISTING);
                 }
                 entry = stream.getNextTarEntry();
             }
-        } catch (RuntimeException|IOException e) {
+        } catch (RuntimeException | IOException e) {
             LOG.error("Error reading logs from container {}", containerId, e);
         }
     }
 
     public static String getContainerIP(DockerClient docker, String containerId) {
         for (Map.Entry<String, ContainerNetwork> e : docker.inspectContainerCmd(containerId)
-                 .exec().getNetworkSettings().getNetworks().entrySet()) {
+                .exec().getNetworkSettings().getNetworks().entrySet()) {
             return e.getValue().getIpAddress();
         }
         throw new IllegalArgumentException("Container " + containerId + " has no networks");
@@ -196,72 +191,66 @@ public class DockerUtils {
                                                                          String... cmd) {
         CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
         String execId = dockerClient.execCreateCmd(containerId)
-            .withCmd(cmd)
-            .withAttachStderr(true)
-            .withAttachStdout(true)
-            .exec()
-            .getId();
-        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
-        final String containerName = inspectContainerResponse.getName().replace("/","");
+                .withCmd(cmd)
+                .withAttachStderr(true)
+                .withAttachStdout(true)
+                .exec()
+                .getId();
+        final String containerName = getContainerName(dockerClient, containerId);
         String cmdString = String.join(" ", cmd);
         StringBuilder stdout = new StringBuilder();
         StringBuilder stderr = new StringBuilder();
         dockerClient.execStartCmd(execId).withDetach(false)
-            .exec(new ResultCallback<Frame>() {
-                @Override
-                public void close() {}
+                .exec(new ResultCallback<Frame>() {
+                    @Override
+                    public void close() {
+                    }
 
-                @Override
-                public void onStart(Closeable closeable) {
-                    LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
-                }
+                    @Override
+                    public void onStart(Closeable closeable) {
+                        LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
+                    }
 
-                @Override
-                public void onNext(Frame object) {
-                    LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
-                    if (StreamType.STDOUT == object.getStreamType()) {
-                        stdout.append(new String(object.getPayload(), UTF_8));
-                    } else if (StreamType.STDERR == object.getStreamType()) {
-                        stderr.append(new String(object.getPayload(), UTF_8));
+                    @Override
+                    public void onNext(Frame object) {
+                        LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
+                        if (StreamType.STDOUT == object.getStreamType()) {
+                            stdout.append(new String(object.getPayload(), UTF_8));
+                        } else if (StreamType.STDERR == object.getStreamType()) {
+                            stderr.append(new String(object.getPayload(), UTF_8));
+                        }
                     }
-                }
 
-                @Override
-                public void onError(Throwable throwable) {
-                    future.completeExceptionally(throwable);
-                }
+                    @Override
+                    public void onError(Throwable throwable) {
+                        future.completeExceptionally(throwable);
+                    }
 
-                @Override
-                public void onComplete() {
-                    LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
+                    @Override
+                    public void onComplete() {
+                        LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
 
-                    InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
-                    while (resp.isRunning()) {
-                        try {
-                            Thread.sleep(200);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            throw new RuntimeException(ie);
+                        InspectExecResponse resp = waitForExecCmdToFinish(dockerClient, execId);
+                        int retCode = resp.getExitCode();
+                        ContainerExecResult result = ContainerExecResult.of(
+                                retCode,
+                                stdout.toString(),
+                                stderr.toString()
+                        );
+                        LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
+
+                        if (retCode != 0) {
+                            LOG.error(
+                                    "DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr:"
+                                            + " {}",
+                                    containerName, cmdString, result.getExitCode(), result.getStdout(),
+                                    result.getStderr());
+                            future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
+                        } else {
+                            future.complete(result);
                         }
-                        resp = dockerClient.inspectExecCmd(execId).exec();
-                    }
-                    int retCode = resp.getExitCode();
-                    ContainerExecResult result = ContainerExecResult.of(
-                            retCode,
-                            stdout.toString(),
-                            stderr.toString()
-                    );
-                    LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
-
-                    if (retCode != 0) {
-                        LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
-                                containerName, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
-                        future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
-                    } else {
-                        future.complete(result);
                     }
-                }
-            });
+                });
         return future;
     }
 
@@ -275,11 +264,10 @@ public class DockerUtils {
                 .withAttachStdout(true)
                 .exec()
                 .getId();
-        final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
-        final String containerName = inspectContainerResponse.getName().replace("/","");
+        final String containerName = getContainerName(dockerClient, containerId);
         String cmdString = String.join(" ", cmd);
-        ByteBuf stdout = Unpooled.buffer();
-        ByteBuf stderr = Unpooled.buffer();
+        ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+        ByteArrayOutputStream stderr = new ByteArrayOutputStream();
         dockerClient.execStartCmd(execId).withDetach(false)
                 .exec(new ResultCallback<Frame>() {
                     @Override
@@ -293,10 +281,14 @@ public class DockerUtils {
 
                     @Override
                     public void onNext(Frame object) {
-                        if (StreamType.STDOUT == object.getStreamType()) {
-                            stdout.writeBytes(object.getPayload());
-                        } else if (StreamType.STDERR == object.getStreamType()) {
-                            stderr.writeBytes(object.getPayload());
+                        try {
+                            if (StreamType.STDOUT == object.getStreamType()) {
+                                stdout.write(object.getPayload());
+                            } else if (StreamType.STDERR == object.getStreamType()) {
+                                stderr.write(object.getPayload());
+                            }
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
                         }
                     }
 
@@ -313,27 +305,13 @@ public class DockerUtils {
                 });
         future.join();
 
-        InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
-        while (resp.isRunning()) {
-            try {
-                Thread.sleep(200);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(ie);
-            }
-            resp = dockerClient.inspectExecCmd(execId).exec();
-        }
+        InspectExecResponse resp = waitForExecCmdToFinish(dockerClient, execId);
         int retCode = resp.getExitCode();
 
-        byte[] stdoutBytes = new byte[stdout.readableBytes()];
-        stdout.readBytes(stdoutBytes);
-        byte[] stderrBytes = new byte[stderr.readableBytes()];
-        stderr.readBytes(stderrBytes);
-
         ContainerExecResultBytes result = ContainerExecResultBytes.of(
                 retCode,
-                stdoutBytes,
-                stderrBytes);
+                stdout.toByteArray(),
+                stderr.toByteArray());
         LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
 
         if (retCode != 0) {
@@ -342,8 +320,66 @@ public class DockerUtils {
         return result;
     }
 
+    public static CompletableFuture<Integer> runCommandAsyncWithLogging(DockerClient dockerClient,
+                                                                        String containerId, String... cmd) {
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        String execId = dockerClient.execCreateCmd(containerId)
+                .withCmd(cmd)
+                .withAttachStderr(true)
+                .withAttachStdout(true)
+                .exec()
+                .getId();
+        final String containerName = getContainerName(dockerClient, containerId);
+        String cmdString = String.join(" ", cmd);
+        dockerClient.execStartCmd(execId).withDetach(false)
+                .exec(new ResultCallback<Frame>() {
+                    @Override
+                    public void close() {
+                    }
+
+                    @Override
+                    public void onStart(Closeable closeable) {
+                        LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
+                    }
+
+                    @Override
+                    public void onNext(Frame object) {
+                        LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        future.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
+                        InspectExecResponse resp = waitForExecCmdToFinish(dockerClient, execId);
+                        int retCode = resp.getExitCode();
+                        LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
+                        future.complete(retCode);
+                    }
+                });
+        return future;
+    }
+
+    private static InspectExecResponse waitForExecCmdToFinish(DockerClient dockerClient, String execId) {
+        InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
+        while (resp.isRunning()) {
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(ie);
+            }
+            resp = dockerClient.inspectExecCmd(execId).exec();
+        }
+        return resp;
+    }
+
     public static Optional<String> getContainerCluster(DockerClient docker, String containerId) {
         return Optional.ofNullable(docker.inspectContainerCmd(containerId)
-                                   .exec().getConfig().getLabels().get("cluster"));
+                .exec().getConfig().getLabels().get("cluster"));
     }
 }