You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/05/30 18:54:44 UTC
[pulsar] branch master updated: fix localrun and add tests (#4401)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3cbf3c3 fix localrun and add tests (#4401)
3cbf3c3 is described below
commit 3cbf3c32f24326d09006c8c2898dd8797f17ab65
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 30 11:54:40 2019 -0700
fix localrun and add tests (#4401)
* fix localrun and add tests
* minor fix
---
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 30 ++--
.../integration/containers/ChaosContainer.java | 6 +
.../integration/functions/PulsarFunctionsTest.java | 155 +++++++++++++++++++--
.../functions/utils/CommandGenerator.java | 40 +++++-
.../tests/integration/suites/PulsarTestSuite.java | 12 ++
.../tests/integration/utils/DockerUtils.java | 89 +++++++-----
6 files changed, 267 insertions(+), 65 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index c3a7fd9..7f75944 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -123,18 +123,7 @@ public class PulsarAdminTool {
adminBuilder.authentication(authPluginClassName, authParams);
PulsarAdmin admin = adminFactory.apply(adminBuilder);
for (Map.Entry<String, Class<?>> c : commandMap.entrySet()) {
- if (admin != null) {
- // To remain backwards compatibility for "source" and "sink" commands
- // TODO eventually remove this
- if (c.getKey().equals("sources") || c.getKey().equals("source")) {
- jcommander.addCommand("sources", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "source");
- } else if (c.getKey().equals("sinks") || c.getKey().equals("sink")) {
- jcommander.addCommand("sinks", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "sink");
- } else {
- // Other mode, all components are initialized.
- jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
- }
- }
+ addCommand(c, admin);
}
} catch (Exception e) {
Throwable cause;
@@ -148,6 +137,23 @@ public class PulsarAdminTool {
}
}
+ private void addCommand(Map.Entry<String, Class<?>> c, PulsarAdmin admin) throws Exception {
+ // To remain backwards compatibility for "source" and "sink" commands
+ // TODO eventually remove this
+ if (c.getKey().equals("sources") || c.getKey().equals("source")) {
+ jcommander.addCommand("sources", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "source");
+ } else if (c.getKey().equals("sinks") || c.getKey().equals("sink")) {
+ jcommander.addCommand("sinks", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "sink");
+ } else if (c.getKey().equals("functions")) {
+ jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
+ } else {
+ if (admin != null) {
+ // Other mode, all components are initialized.
+ jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
+ }
+ }
+ }
+
boolean run(String[] args) {
return run(args, adminBuilder -> {
try {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 18ba9fb..7305849 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -116,6 +116,12 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
return DockerUtils.runCommand(client, dockerId, commands);
}
+ public CompletableFuture<ContainerExecResult> execCmdAsync(String... commands) throws Exception {
+ DockerClient client = this.getDockerClient();
+ String dockerId = this.getContainerId();
+ return DockerUtils.runCommandAsync(client, dockerId, commands);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ChaosContainer)) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5393011..d7e4404 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,23 +18,13 @@
*/
package org.apache.pulsar.tests.integration.functions;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -57,13 +47,33 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.DebeziumMySqlSourceTester;
+import org.apache.pulsar.tests.integration.io.ElasticSearchSinkTester;
+import org.apache.pulsar.tests.integration.io.HdfsSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
+import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.io.SinkTester;
+import org.apache.pulsar.tests.integration.io.SourceTester;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.Test;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
/**
* A test base for testing sink.
*/
@@ -723,6 +733,125 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
}
+ @Test
+ public void testPythonFunctionLocalRun() throws Exception {
+ testFunctionLocalRun(Runtime.PYTHON);
+ }
+
+ @Test
+ public void testJavaFunctionLocalRun() throws Exception {
+ testFunctionLocalRun(Runtime.JAVA);
+ }
+
+ public void testFunctionLocalRun(Runtime runtime) throws Exception {
+ if (functionRuntimeType == FunctionRuntimeType.THREAD) {
+ return;
+ }
+
+ String inputTopicName = "persistent://public/default/test-function-local-run-" + runtime + "-input-" + randomName(8);
+ String outputTopicName = "test-function-local-run-" + runtime + "-output-" + randomName(8);
+
+ final int numMessages = 10;
+ String cmd;
+ CommandGenerator commandGenerator = new CommandGenerator();
+ commandGenerator.setAdminUrl("pulsar://pulsar-broker-0:6650");
+ commandGenerator.setSourceTopic(inputTopicName);
+ commandGenerator.setSinkTopic(outputTopicName);
+ commandGenerator.setFunctionName("localRunTest");
+ commandGenerator.setRuntime(runtime);
+ if (runtime == Runtime.JAVA) {
+ commandGenerator.setFunctionClassName(EXCLAMATION_JAVA_CLASS);
+ cmd = commandGenerator.generateLocalRunCommand(null);
+ } else {
+ commandGenerator.setFunctionClassName(EXCLAMATION_PYTHON_CLASS);
+ cmd = commandGenerator.generateLocalRunCommand(EXCLAMATION_PYTHON_FILE);
+ }
+
+ log.info("cmd: {}", cmd);
+ pulsarCluster.getAnyWorker().execCmdAsync(cmd.split(" "));
+
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(inputTopicName).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 30, 200);
+
+ assertEquals(admin.topics().getStats(inputTopicName).subscriptions.size(), 1);
+
+ // publish and consume result
+ if (Runtime.JAVA == runtime) {
+ // java supports schema
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopicName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message-" + i);
+ }
+
+ Set<String> expectedMessages = new HashSet<>();
+ for (int i = 0; i < numMessages; i++) {
+ expectedMessages.add("message-" + i + "!");
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
+ log.info("Received: {}", msg.getValue());
+ assertTrue(expectedMessages.contains(msg.getValue()));
+ expectedMessages.remove(msg.getValue());
+ }
+ assertEquals(expectedMessages.size(), 0);
+
+ } else {
+ // python doesn't support schema
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+ .topic(outputTopicName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic(inputTopicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("message-" + i).getBytes(UTF_8)).send();
+ }
+
+ Set<String> expectedMessages = new HashSet<>();
+ for (int i = 0; i < numMessages; i++) {
+ expectedMessages.add("message-" + i + "!");
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
+ String msgValue = new String(msg.getValue(), UTF_8);
+ log.info("Received: {}", msgValue);
+ assertTrue(expectedMessages.contains(msgValue));
+ expectedMessages.remove(msgValue);
+ }
+ assertEquals(expectedMessages.size(), 0);
+ }
+ }
+
+ }
+
//
// Test CRUD functions on different runtimes.
//
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 2b16b29..f18e0c8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -54,8 +54,8 @@ public class CommandGenerator {
private Long slidingIntervalDurationMs;
private Map<String, String> userConfig = new HashMap<>();
- private static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar";
- private static final String PYTHONBASE = "/pulsar/examples/python-examples/";
+ public static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar";
+ public static final String PYTHONBASE = "/pulsar/examples/python-examples/";
public static CommandGenerator createDefaultGenerator(String sourceTopic, String functionClassName) {
CommandGenerator generator = new CommandGenerator();
@@ -73,6 +73,42 @@ public class CommandGenerator {
return generator;
}
+ public String generateLocalRunCommand(String codeFile) {
+ StringBuilder commandBuilder = new StringBuilder(PulsarCluster.ADMIN_SCRIPT);
+ commandBuilder.append(" functions localrun ");
+ if (adminUrl != null) {
+ commandBuilder.append(" --broker-service-url " + adminUrl);
+ }
+ if (tenant != null) {
+ commandBuilder.append(" --tenant " + tenant);
+ }
+ if (namespace != null) {
+ commandBuilder.append(" --namespace " + namespace);
+ }
+ if (functionName != null) {
+ commandBuilder.append(" --name " + functionName);
+ }
+ commandBuilder.append(" --className " + functionClassName);
+ if (sourceTopic != null) {
+ commandBuilder.append(" --inputs " + sourceTopic);
+ }
+ if (sinkTopic != null) {
+ commandBuilder.append(" --output " + sinkTopic);
+ }
+
+ if (runtime == Runtime.JAVA) {
+ commandBuilder.append(" --jar " + JAVAJAR);
+ } else {
+ if (codeFile != null) {
+ commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+ } else {
+ commandBuilder.append(" --py " + PYTHONBASE);
+ }
+ }
+
+ return commandBuilder.toString();
+ }
+
public String generateCreateFunctionCommand() {
return generateCreateFunctionCommand(null);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 20b9da0..7fe4e46 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -23,6 +23,8 @@ import org.testng.ITest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
+import java.util.function.Predicate;
+
public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
@BeforeSuite
@@ -41,4 +43,14 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
public String getTestName() {
return "pulsar-test-suite";
}
+
+ public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+ throws Exception {
+ for (int i = 0; i < retryCount; i++) {
+ if (predicate.test(null) || i == (retryCount - 1)) {
+ break;
+ }
+ Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index c7e2984..fd68fbe 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.tests.integration.utils;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -28,9 +26,15 @@ import com.github.dockerjava.api.exception.NotFoundException;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
@@ -46,13 +50,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.nio.charset.StandardCharsets.UTF_8;
public class DockerUtils {
private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);
@@ -183,9 +181,22 @@ public class DockerUtils {
public static ContainerExecResult runCommand(DockerClient docker,
String containerId,
- String... cmd)
- throws ContainerExecException {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
+ String... cmd) throws ContainerExecException, ExecutionException, InterruptedException {
+
+ try {
+ return runCommandAsync(docker, containerId, cmd).get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ContainerExecException) {
+ throw (ContainerExecException) e.getCause();
+ }
+ throw e;
+ }
+ }
+
+ public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient docker,
+ String containerId,
+ String... cmd) {
+ CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
String execid = docker.execCreateCmd(containerId)
.withCmd(cmd)
.withAttachStderr(true)
@@ -223,33 +234,35 @@ public class DockerUtils {
@Override
public void onComplete() {
LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
- future.complete(true);
+
+ InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+ while (resp.isRunning()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ resp = docker.inspectExecCmd(execid).exec();
+ }
+ int retCode = resp.getExitCode();
+ ContainerExecResult result = ContainerExecResult.of(
+ retCode,
+ stdout.toString(),
+ stderr.toString()
+ );
+ LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+
+ if (retCode != 0) {
+ LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
+ containerId, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
+ future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
+ } else {
+ future.complete(result);
+ }
}
});
- future.join();
-
- InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
- while (resp.isRunning()) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- resp = docker.inspectExecCmd(execid).exec();
- }
- int retCode = resp.getExitCode();
- ContainerExecResult result = ContainerExecResult.of(
- retCode,
- stdout.toString(),
- stderr.toString()
- );
- LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
-
- if (retCode != 0) {
- throw new ContainerExecException(cmdString, containerId, result);
- }
- return result;
+ return future;
}
public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient docker,