You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/05/30 18:54:44 UTC

[pulsar] branch master updated: fix localrun and add tests (#4401)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cbf3c3  fix localrun and add tests (#4401)
3cbf3c3 is described below

commit 3cbf3c32f24326d09006c8c2898dd8797f17ab65
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 30 11:54:40 2019 -0700

    fix localrun and add tests (#4401)
    
    * fix localrun and add tests
    
    * minor fix
---
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |  30 ++--
 .../integration/containers/ChaosContainer.java     |   6 +
 .../integration/functions/PulsarFunctionsTest.java | 155 +++++++++++++++++++--
 .../functions/utils/CommandGenerator.java          |  40 +++++-
 .../tests/integration/suites/PulsarTestSuite.java  |  12 ++
 .../tests/integration/utils/DockerUtils.java       |  89 +++++++-----
 6 files changed, 267 insertions(+), 65 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index c3a7fd9..7f75944 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -123,18 +123,7 @@ public class PulsarAdminTool {
             adminBuilder.authentication(authPluginClassName, authParams);
             PulsarAdmin admin = adminFactory.apply(adminBuilder);
             for (Map.Entry<String, Class<?>> c : commandMap.entrySet()) {
-                if (admin != null) {
-                    // To remain backwards compatibility for "source" and "sink" commands
-                    // TODO eventually remove this
-                    if (c.getKey().equals("sources") || c.getKey().equals("source")) {
-                        jcommander.addCommand("sources", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "source");
-                    } else if (c.getKey().equals("sinks") || c.getKey().equals("sink")) {
-                        jcommander.addCommand("sinks", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "sink");
-                    } else {
-                        // Other mode, all components are initialized.
-                        jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
-                    }
-                }
+                addCommand(c, admin);
             }
         } catch (Exception e) {
             Throwable cause;
@@ -148,6 +137,23 @@ public class PulsarAdminTool {
         }
     }
 
+    private void addCommand(Map.Entry<String, Class<?>> c, PulsarAdmin admin) throws Exception {
+        // To remain backwards compatibility for "source" and "sink" commands
+        // TODO eventually remove this
+        if (c.getKey().equals("sources") || c.getKey().equals("source")) {
+            jcommander.addCommand("sources", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "source");
+        } else if (c.getKey().equals("sinks") || c.getKey().equals("sink")) {
+            jcommander.addCommand("sinks", c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "sink");
+        } else if (c.getKey().equals("functions")) {
+            jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
+        } else {
+            if (admin != null) {
+                // Other mode, all components are initialized.
+                jcommander.addCommand(c.getKey(), c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
+            }
+        }
+    }
+
     boolean run(String[] args) {
         return run(args, adminBuilder -> {
             try {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 18ba9fb..7305849 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -116,6 +116,12 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
         return DockerUtils.runCommand(client, dockerId, commands);
     }
 
+    public CompletableFuture<ContainerExecResult> execCmdAsync(String... commands) throws Exception {
+        DockerClient client = this.getDockerClient();
+        String dockerId = this.getContainerId();
+        return DockerUtils.runCommandAsync(client, dockerId, commands);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ChaosContainer)) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5393011..d7e4404 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -18,23 +18,13 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -57,13 +47,33 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.DebeziumMySqlSourceTester;
+import org.apache.pulsar.tests.integration.io.ElasticSearchSinkTester;
+import org.apache.pulsar.tests.integration.io.HdfsSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
 import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
+import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
+import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.io.SinkTester;
+import org.apache.pulsar.tests.integration.io.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 /**
  * A test base for testing sink.
  */
@@ -723,6 +733,125 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    @Test
+    public void testPythonFunctionLocalRun() throws Exception {
+        testFunctionLocalRun(Runtime.PYTHON);
+    }
+
+    @Test
+    public void testJavaFunctionLocalRun() throws Exception {
+        testFunctionLocalRun(Runtime.JAVA);
+    }
+
+    public void testFunctionLocalRun(Runtime runtime) throws  Exception {
+        if (functionRuntimeType == FunctionRuntimeType.THREAD) {
+            return;
+        }
+
+        String inputTopicName = "persistent://public/default/test-function-local-run-" + runtime + "-input-" + randomName(8);
+        String outputTopicName = "test-function-local-run-" + runtime + "-output-" + randomName(8);
+
+        final int numMessages = 10;
+        String cmd;
+        CommandGenerator commandGenerator = new CommandGenerator();
+        commandGenerator.setAdminUrl("pulsar://pulsar-broker-0:6650");
+        commandGenerator.setSourceTopic(inputTopicName);
+        commandGenerator.setSinkTopic(outputTopicName);
+        commandGenerator.setFunctionName("localRunTest");
+        commandGenerator.setRuntime(runtime);
+        if (runtime == Runtime.JAVA) {
+            commandGenerator.setFunctionClassName(EXCLAMATION_JAVA_CLASS);
+            cmd = commandGenerator.generateLocalRunCommand(null);
+        } else {
+            commandGenerator.setFunctionClassName(EXCLAMATION_PYTHON_CLASS);
+            cmd = commandGenerator.generateLocalRunCommand(EXCLAMATION_PYTHON_FILE);
+        }
+
+        log.info("cmd: {}", cmd);
+        pulsarCluster.getAnyWorker().execCmdAsync(cmd.split(" "));
+
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+
+            retryStrategically((test) -> {
+                try {
+                    return admin.topics().getStats(inputTopicName).subscriptions.size() == 1;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 30, 200);
+
+            assertEquals(admin.topics().getStats(inputTopicName).subscriptions.size(), 1);
+
+            // publish and consume result
+            if (Runtime.JAVA == runtime) {
+                // java supports schema
+                @Cleanup PulsarClient client = PulsarClient.builder()
+                        .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                        .build();
+                @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                        .topic(outputTopicName)
+                        .subscriptionType(SubscriptionType.Exclusive)
+                        .subscriptionName("test-sub")
+                        .subscribe();
+                @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                        .topic(inputTopicName)
+                        .create();
+
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send("message-" + i);
+                }
+
+                Set<String> expectedMessages = new HashSet<>();
+                for (int i = 0; i < numMessages; i++) {
+                    expectedMessages.add("message-" + i + "!");
+                }
+
+                for (int i = 0; i < numMessages; i++) {
+                    Message<String> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
+                    log.info("Received: {}", msg.getValue());
+                    assertTrue(expectedMessages.contains(msg.getValue()));
+                    expectedMessages.remove(msg.getValue());
+                }
+                assertEquals(expectedMessages.size(), 0);
+
+            } else {
+                // python doesn't support schema
+
+                @Cleanup PulsarClient client = PulsarClient.builder()
+                        .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                        .build();
+                @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+                        .topic(outputTopicName)
+                        .subscriptionType(SubscriptionType.Exclusive)
+                        .subscriptionName("test-sub")
+                        .subscribe();
+
+                @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                        .topic(inputTopicName)
+                        .create();
+
+                for (int i = 0; i < numMessages; i++) {
+                    producer.newMessage().value(("message-" + i).getBytes(UTF_8)).send();
+                }
+
+                Set<String> expectedMessages = new HashSet<>();
+                for (int i = 0; i < numMessages; i++) {
+                    expectedMessages.add("message-" + i + "!");
+                }
+
+                for (int i = 0; i < numMessages; i++) {
+                    Message<byte[]> msg = consumer.receive(60 * 2, TimeUnit.SECONDS);
+                    String msgValue = new String(msg.getValue(), UTF_8);
+                    log.info("Received: {}", msgValue);
+                    assertTrue(expectedMessages.contains(msgValue));
+                    expectedMessages.remove(msgValue);
+                }
+                assertEquals(expectedMessages.size(), 0);
+            }
+        }
+
+    }
+
     //
     // Test CRUD functions on different runtimes.
     //
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 2b16b29..f18e0c8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -54,8 +54,8 @@ public class CommandGenerator {
     private Long slidingIntervalDurationMs;
 
     private Map<String, String> userConfig = new HashMap<>();
-    private static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar";
-    private static final String PYTHONBASE = "/pulsar/examples/python-examples/";
+    public static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar";
+    public static final String PYTHONBASE = "/pulsar/examples/python-examples/";
 
     public static CommandGenerator createDefaultGenerator(String sourceTopic, String functionClassName) {
         CommandGenerator generator = new CommandGenerator();
@@ -73,6 +73,42 @@ public class CommandGenerator {
         return generator;
     }
 
+    public String generateLocalRunCommand(String codeFile) {
+        StringBuilder commandBuilder = new StringBuilder(PulsarCluster.ADMIN_SCRIPT);
+        commandBuilder.append(" functions localrun ");
+        if (adminUrl != null) {
+            commandBuilder.append(" --broker-service-url " + adminUrl);
+        }
+        if (tenant != null) {
+            commandBuilder.append(" --tenant " + tenant);
+        }
+        if (namespace != null) {
+            commandBuilder.append(" --namespace " + namespace);
+        }
+        if (functionName != null) {
+            commandBuilder.append(" --name " + functionName);
+        }
+        commandBuilder.append(" --className " + functionClassName);
+        if (sourceTopic != null) {
+            commandBuilder.append(" --inputs " + sourceTopic);
+        }
+        if (sinkTopic != null) {
+            commandBuilder.append(" --output " + sinkTopic);
+        }
+
+        if (runtime == Runtime.JAVA) {
+            commandBuilder.append(" --jar " + JAVAJAR);
+        } else {
+            if (codeFile != null) {
+                commandBuilder.append(" --py " + PYTHONBASE + codeFile);
+            } else {
+                commandBuilder.append(" --py " + PYTHONBASE);
+            }
+        }
+
+        return commandBuilder.toString();
+    }
+
     public String generateCreateFunctionCommand() {
         return generateCreateFunctionCommand(null);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 20b9da0..7fe4e46 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -23,6 +23,8 @@ import org.testng.ITest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+import java.util.function.Predicate;
+
 public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
 
     @BeforeSuite
@@ -41,4 +43,14 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
     public String getTestName() {
         return "pulsar-test-suite";
     }
+
+    public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+            throws Exception {
+        for (int i = 0; i < retryCount; i++) {
+            if (predicate.test(null) || i == (retryCount - 1)) {
+                break;
+            }
+            Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index c7e2984..fd68fbe 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.tests.integration.utils;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.async.ResultCallback;
 import com.github.dockerjava.api.command.InspectContainerResponse;
@@ -28,9 +26,15 @@ import com.github.dockerjava.api.exception.NotFoundException;
 import com.github.dockerjava.api.model.ContainerNetwork;
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.StreamType;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.File;
@@ -46,13 +50,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 public class DockerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class);
@@ -183,9 +181,22 @@ public class DockerUtils {
 
     public static ContainerExecResult runCommand(DockerClient docker,
                                                  String containerId,
-                                                 String... cmd)
-            throws ContainerExecException {
-        CompletableFuture<Boolean> future = new CompletableFuture<>();
+                                                 String... cmd) throws ContainerExecException, ExecutionException, InterruptedException {
+
+        try {
+            return runCommandAsync(docker, containerId, cmd).get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof ContainerExecException) {
+                throw (ContainerExecException) e.getCause();
+            }
+            throw e;
+        }
+    }
+
+    public static CompletableFuture<ContainerExecResult> runCommandAsync(DockerClient docker,
+                                                 String containerId,
+                                                 String... cmd) {
+        CompletableFuture<ContainerExecResult> future = new CompletableFuture<>();
         String execid = docker.execCreateCmd(containerId)
             .withCmd(cmd)
             .withAttachStderr(true)
@@ -223,33 +234,35 @@ public class DockerUtils {
                 @Override
                 public void onComplete() {
                     LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString);
-                    future.complete(true);
+
+                    InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+                    while (resp.isRunning()) {
+                        try {
+                            Thread.sleep(200);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(ie);
+                        }
+                        resp = docker.inspectExecCmd(execid).exec();
+                    }
+                    int retCode = resp.getExitCode();
+                    ContainerExecResult result = ContainerExecResult.of(
+                            retCode,
+                            stdout.toString(),
+                            stderr.toString()
+                    );
+                    LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
+
+                    if (retCode != 0) {
+                        LOG.error("DOCKER.exec({}:{}): completed with non zero return code: {}\nstdout: {}\nstderr: {}",
+                                containerId, cmdString, result.getExitCode(), result.getStdout(), result.getStderr());
+                        future.completeExceptionally(new ContainerExecException(cmdString, containerId, result));
+                    } else {
+                        future.complete(result);
+                    }
                 }
             });
-        future.join();
-
-        InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
-        while (resp.isRunning()) {
-            try {
-                Thread.sleep(200);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(ie);
-            }
-            resp = docker.inspectExecCmd(execid).exec();
-        }
-        int retCode = resp.getExitCode();
-        ContainerExecResult result = ContainerExecResult.of(
-            retCode,
-            stdout.toString(),
-            stderr.toString()
-        );
-        LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode);
-
-        if (retCode != 0) {
-            throw new ContainerExecException(cmdString, containerId, result);
-        }
-        return result;
+        return future;
     }
 
     public static ContainerExecResultBytes runCommandWithRawOutput(DockerClient docker,