You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/26 04:56:54 UTC
[pulsar] branch master updated: [Tests] Improve integration test
logging to improve readability and efficiency (#10320)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7eff07c [Tests] Improve integration test logging to improve readability and efficiency (#10320)
7eff07c is described below
commit 7eff07c53e5cf730b7e383dddb7e4d2c8b53bc0d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Apr 26 07:56:15 2021 +0300
[Tests] Improve integration test logging to improve readability and efficiency (#10320)
* Use simple docker log tailing
* Refactor DockerUtils
* Improve "tail -f logfile" efficiency in integration tests
---
.../integration/containers/BrokerContainer.java | 4 +-
.../integration/containers/ChaosContainer.java | 39 +--
.../containers/PrestoWorkerContainer.java | 7 +-
.../tests/integration/utils/DockerUtils.java | 354 ++++++++++++---------
4 files changed, 202 insertions(+), 202 deletions(-)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
index 636d60f..07558ce 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BrokerContainer.java
@@ -30,12 +30,12 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {
public BrokerContainer(String clusterName, String hostName) {
super(
clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_HTTP_PORT);
+ tailContainerLog();
}
@Override
protected void afterStart() {
- this.tailContainerLog();
- DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+ DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
"tail", "-f", "/var/log/pulsar/broker.log");
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index cf5a884..6064d5b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -64,43 +64,8 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
super.stop();
}
- public void tailContainerLog() {
- CompletableFuture.runAsync(() -> {
- while (null == getContainerId()) {
- try {
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException e) {
- return;
- }
- }
-
- LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(getContainerId());
- logContainerCmd.withStdOut(true).withStdErr(true).withFollowStream(true);
- logContainerCmd.exec(new LogContainerResultCallback() {
- @Override
- public void onNext(Frame item) {
- log.info(new String(item.getPayload(), UTF_8));
- }
- });
- });
- }
-
- public String getContainerLog() {
- StringBuilder sb = new StringBuilder();
-
- LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(getContainerId());
- logContainerCmd.withStdOut(true).withStdErr(true);
- try {
- logContainerCmd.exec(new LogContainerResultCallback() {
- @Override
- public void onNext(Frame item) {
- sb.append(new String(item.getPayload(), UTF_8));
- }
- }).awaitCompletion();
- } catch (InterruptedException e) {
-
- }
- return sb.toString();
+ protected void tailContainerLog() {
+ withLogConsumer(item -> log.info(item.getUtf8String()));
}
public void putFile(String path, byte[] contents) throws Exception {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
index 7c6c9d6..fe08b81 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -37,15 +37,14 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
-1,
PRESTO_HTTP_PORT,
"/v1/info/state");
-
+ tailContainerLog();
}
@Override
protected void afterStart() {
- this.tailContainerLog();
- DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+ DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
"tail", "-f", "/var/log/pulsar/presto_worker.log");
- DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+ DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
"tail", "-f", "/pulsar/lib/presto/var/log/server.log");
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 8fe8e33..7c90cd3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.utils;
+import static java.nio.charset.StandardCharsets.UTF_8;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -26,29 +27,30 @@ import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPOutputStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);
@@ -66,47 +68,41 @@ public class DockerUtils {
}
public static void dumpContainerLogToTarget(DockerClient dockerClient, String containerId) {
- final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
- // docker api returns names prefixed with "/", it's part of it's legacy design,
- // this removes it to be consistent with what docker ps shows.
- final String containerName = inspectContainerResponse.getName().replace("/","");
- File output = new File(getTargetDirectory(containerName), "docker.log");
- int i = 0;
- while (output.exists()) {
- LOG.info("{} exists, incrementing", output);
- output = new File(getTargetDirectory(containerName), "docker." + i++ + ".log");
- }
- try (FileOutputStream os = new FileOutputStream(output)) {
+ final String containerName = getContainerName(dockerClient, containerId);
+ File output = getUniqueFileInTargetDirectory(containerName, "docker", ".log");
+ try (OutputStream os = new BufferedOutputStream(new FileOutputStream(output))) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
dockerClient.logContainerCmd(containerName).withStdOut(true)
- .withStdErr(true).withTimestamps(true).exec(new ResultCallback<Frame>() {
- @Override
- public void close() {}
-
- @Override
- public void onStart(Closeable closeable) {}
-
- @Override
- public void onNext(Frame object) {
- try {
- os.write(object.getPayload());
- } catch (IOException e) {
- onError(e);
- }
- }
+ .withStdErr(true).withTimestamps(true).exec(new ResultCallback<Frame>() {
+ @Override
+ public void close() {
+ }
- @Override
- public void onError(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
+ @Override
+ public void onStart(Closeable closeable) {
+ }
- @Override
- public void onComplete() {
- future.complete(true);
- }
- });
+ @Override
+ public void onNext(Frame object) {
+ try {
+ os.write(object.getPayload());
+ } catch (IOException e) {
+ onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ future.complete(true);
+ }
+ });
future.get();
- } catch (RuntimeException|ExecutionException|IOException e) {
+ } catch (RuntimeException | ExecutionException | IOException e) {
LOG.error("Error dumping log for {}", containerName, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -114,29 +110,36 @@ public class DockerUtils {
}
}
- public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
- String path) {
- final int READ_BLOCK_SIZE = 10000;
+ private static File getUniqueFileInTargetDirectory(String containerName, String prefix, String suffix) {
+ return getUniqueFileInDirectory(getTargetDirectory(containerName), prefix, suffix);
+ }
+
+ private static File getUniqueFileInDirectory(File directory, String prefix, String suffix) {
+ File file = new File(directory, prefix + suffix);
+ int i = 0;
+ while (file.exists()) {
+ LOG.info("{} exists, incrementing", file);
+ file = new File(directory, prefix + "_" + (i++) + suffix);
+ }
+ return file;
+ }
+
+ private static String getContainerName(DockerClient dockerClient, String containerId) {
final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
// docker api returns names prefixed with "/", it's part of it's legacy design,
// this removes it to be consistent with what docker ps shows.
- final String containerName = inspectContainerResponse.getName().replace("/","");
+ return inspectContainerResponse.getName().replace("/", "");
+ }
+
+ public static void dumpContainerDirToTargetCompressed(DockerClient dockerClient, String containerId,
+ String path) {
+ final String containerName = getContainerName(dockerClient, containerId);
final String baseName = path.replace("/", "-").replaceAll("^-", "");
- File output = new File(getTargetDirectory(containerName), baseName + ".tar.gz");
- int i = 0;
- while (output.exists()) {
- LOG.info("{} exists, incrementing", output);
- output = new File(getTargetDirectory(containerName), baseName + "_" + i++ + ".tar.gz");
- }
+ File output = getUniqueFileInTargetDirectory(containerName, baseName, ".tar.gz");
try (InputStream dockerStream = dockerClient.copyArchiveFromContainerCmd(containerId, path).exec();
- OutputStream os = new GZIPOutputStream(new FileOutputStream(output))) {
- byte[] block = new byte[READ_BLOCK_SIZE];
- int read = dockerStream.read(block, 0, READ_BLOCK_SIZE);
- while (read > -1) {
- os.write(block, 0, read);
- read = dockerStream.read(block, 0, READ_BLOCK_SIZE);
- }
- } catch (RuntimeException|IOException e) {
+ OutputStream os = new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(output)))) {
+ IOUtils.copy(dockerStream, os);
+ } catch (RuntimeException | IOException e) {
if (!(e instanceof NotFoundException)) {
LOG.error("Error reading dir from container {}", containerName, e);
}
@@ -145,33 +148,25 @@ public class DockerUtils {
public static void dumpContainerLogDirToTarget(DockerClient docker, String containerId,
String path) {
- final int READ_BLOCK_SIZE = 10000;
-
+ File targetDirectory = getTargetDirectory(containerId);
try (InputStream dockerStream = docker.copyArchiveFromContainerCmd(containerId, path).exec();
TarArchiveInputStream stream = new TarArchiveInputStream(dockerStream)) {
TarArchiveEntry entry = stream.getNextTarEntry();
while (entry != null) {
if (entry.isFile()) {
- File output = new File(getTargetDirectory(containerId), entry.getName().replace("/", "-"));
- try (FileOutputStream os = new FileOutputStream(output)) {
- byte[] block = new byte[READ_BLOCK_SIZE];
- int read = stream.read(block, 0, READ_BLOCK_SIZE);
- while (read > -1) {
- os.write(block, 0, read);
- read = stream.read(block, 0, READ_BLOCK_SIZE);
- }
- }
+ File output = new File(targetDirectory, entry.getName().replace("/", "-"));
+ Files.copy(stream, output.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
entry = stream.getNextTarEntry();
}
- } catch (RuntimeException|IOException e) {
+ } catch (RuntimeException | IOException e) {
LOG.error("Error reading logs from container {}", containerId, e);
}
}
public static String getContainerIP(DockerClient docker, String containerId) {
for (Map.Entry<String, ContainerNetwork> e : docker.inspectContainerCmd(containerId)
- .exec().getNetworkSettings().getNetworks().entrySet()) {
+ .exec().getNetworkSettings().getNetworks().entrySet()) {
return e.getValue().getIpAddress();
}
throw new IllegalArgumentException("Container " + containerId + " has no networks");
@@ -196,72 +191,66 @@ public class DockerUtils {
String... cmd) {
CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
String execId = dockerClient.execCreateCmd(containerId)
- .withCmd(cmd)
- .withAttachStderr(true)
- .withAttachStdout(true)
- .exec()
- .getId();
- final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
- final String containerName = inspectContainerResponse.getName().replace("/","");
+ .withCmd(cmd)
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ .exec()
+ .getId();
+ final String containerName = getContainerName(dockerClient, containerId);
String cmdString = String.join(" ", cmd);
StringBuilder stdout = new StringBuilder();
StringBuilder stderr = new StringBuilder();
dockerClient.execStartCmd(execId).withDetach(false)
- .exec(new ResultCallback<Frame>() {
- @Override
- public void close() {}
+ .exec(new ResultCallback<Frame>() {
+ @Override
+ public void close() {
+ }
- @Override
- public void onStart(Closeable closeable) {
- LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
- }
+ @Override
+ public void onStart(Closeable closeable) {
+ LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
+ }
- @Override
- public void onNext(Frame object) {
- LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
- if (StreamType.STDOUT == object.getStreamType()) {
- stdout.append(new String(object.getPayload(), UTF_8));
- } else if (StreamType.STDERR == object.getStreamType()) {
- stderr.append(new String(object.getPayload(), UTF_8));
+ @Override
+ public void onNext(Frame object) {
+ LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
+ if (StreamType.STDOUT == object.getStreamType()) {
+ stdout.append(new String(object.getPayload(), UTF_8));
+ } else if (StreamType.STDERR == object.getStreamType()) {
+ stderr.append(new String(object.getPayload(), UTF_8));
+ }
}
- }
- @Override
- public void onError(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
+ @Override
+ public void onError(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
- @Override
- public void onComplete() {
- LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
+ @Override
+ public void onComplete() {
+ LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
- InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
- while (resp.isRunning()) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
+ InspectExecResponse resp = waitForExecCmdToFinish(dockerClient, execId);
+ int retCode = resp.getExitCode();
+ ContainerExecResult result = ContainerExecResult.of(
+ retCode,
+ stdout.toString(),
+ stderr.toString()
+ );
+ LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
+
+ if (retCode != 0) {
+ LOG.error(
+ "DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr:"
+ + " {}",
+ containerName, cmdString, result.getExitCode(), result.getStdout(),
+ result.getStderr());
+ future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
+ } else {
+ future.complete(result);
}
- resp = dockerClient.inspectExecCmd(execId).exec();
- }
- int retCode = resp.getExitCode();
- ContainerExecResult result = ContainerExecResult.of(
- retCode,
- stdout.toString(),
- stderr.toString()
- );
- LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
-
- if (retCode != 0) {
- LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
- containerName, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
- future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
- } else {
- future.complete(result);
}
- }
- });
+ });
return future;
}
@@ -275,11 +264,10 @@ public class DockerUtils {
.withAttachStdout(true)
.exec()
.getId();
- final InspectContainerResponse inspectContainerResponse = dockerClient.inspectContainerCmd(containerId).exec();
- final String containerName = inspectContainerResponse.getName().replace("/","");
+ final String containerName = getContainerName(dockerClient, containerId);
String cmdString = String.join(" ", cmd);
- ByteBuf stdout = Unpooled.buffer();
- ByteBuf stderr = Unpooled.buffer();
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
dockerClient.execStartCmd(execId).withDetach(false)
.exec(new ResultCallback<Frame>() {
@Override
@@ -293,10 +281,14 @@ public class DockerUtils {
@Override
public void onNext(Frame object) {
- if (StreamType.STDOUT == object.getStreamType()) {
- stdout.writeBytes(object.getPayload());
- } else if (StreamType.STDERR == object.getStreamType()) {
- stderr.writeBytes(object.getPayload());
+ try {
+ if (StreamType.STDOUT == object.getStreamType()) {
+ stdout.write(object.getPayload());
+ } else if (StreamType.STDERR == object.getStreamType()) {
+ stderr.write(object.getPayload());
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
@@ -313,27 +305,13 @@ public class DockerUtils {
});
future.join();
- InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
- while (resp.isRunning()) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- resp = dockerClient.inspectExecCmd(execId).exec();
- }
+ InspectExecResponse resp = waitForExecCmdToFinish(dockerClient, execId);
int retCode = resp.getExitCode();
- byte[] stdoutBytes = new byte[stdout.readableBytes()];
- stdout.readBytes(stdoutBytes);
- byte[] stderrBytes = new byte[stderr.readableBytes()];
- stderr.readBytes(stderrBytes);
-
ContainerExecResultBytes result = ContainerExecResultBytes.of(
retCode,
- stdoutBytes,
- stderrBytes);
+ stdout.toByteArray(),
+ stderr.toByteArray());
LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
if (retCode != 0) {
@@ -342,8 +320,66 @@ public class DockerUtils {
return result;
}
+ public static CompletableFuture<Integer> runCommandAsyncWithLogging(DockerClient dockerClient,
+ String containerId, String... cmd) {
+ CompletableFuture<Integer> future = new CompletableFuture<>();
+ String execId = dockerClient.execCreateCmd(containerId)
+ .withCmd(cmd)
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ .exec()
+ .getId();
+ final String containerName = getContainerName(dockerClient, containerId);
+ String cmdString = String.join(" ", cmd);
+ dockerClient.execStartCmd(execId).withDetach(false)
+ .exec(new ResultCallback<Frame>() {
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void onStart(Closeable closeable) {
+ LOG.info("DOCKER.exec({}:{}): Executing...", containerName, cmdString);
+ }
+
+ @Override
+ public void onNext(Frame object) {
+ LOG.info("DOCKER.exec({}:{}): {}", containerName, cmdString, object);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ LOG.info("DOCKER.exec({}:{}): Done", containerName, cmdString);
+ InspectExecResponse resp = waitForExecCmdToFinish(dockerClient, execId);
+ int retCode = resp.getExitCode();
+ LOG.info("DOCKER.exec({}:{}): completed with {}", containerName, cmdString, retCode);
+ future.complete(retCode);
+ }
+ });
+ return future;
+ }
+
+ private static InspectExecResponse waitForExecCmdToFinish(DockerClient dockerClient, String execId) {
+ InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
+ while (resp.isRunning()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ resp = dockerClient.inspectExecCmd(execId).exec();
+ }
+ return resp;
+ }
+
public static Optional<String> getContainerCluster(DockerClient docker, String containerId) {
return Optional.ofNullable(docker.inspectContainerCmd(containerId)
- .exec().getConfig().getLabels().get("cluster"));
+ .exec().getConfig().getLabels().get("cluster"));
}
}