You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/05/31 21:11:12 UTC

[incubator-pulsar] branch master updated: adding CmdFunctions unit tests and clean up (#1869)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 866bbec  adding CmdFunctions unit tests and clean up (#1869)
866bbec is described below

commit 866bbece0cc57894bdc70dd2fb99c23abf0f0bef
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 31 14:11:07 2018 -0700

    adding CmdFunctions unit tests and clean up (#1869)
    
    * adding CmdFunctions unit tests and clean up
    
    * remove unnecessary import
---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 309 ++++++++++++++++++---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  68 +----
 2 files changed, 290 insertions(+), 87 deletions(-)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 1cc239d..0826956 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -18,29 +18,10 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
 import com.google.gson.Gson;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -50,7 +31,6 @@ import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
-import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -68,9 +48,34 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 /**
  * Unit test of {@link CmdFunctions}.
  */
+@Slf4j
 @PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class })
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
 public class CmdFunctionsTest {
@@ -124,16 +129,16 @@ public class CmdFunctionsTest {
         when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class));
     }
 
-    @Test
-    public void testLocalRunnerCmdNoArguments() throws Exception {
-        cmd.run(new String[] { "run" });
-
-        LocalRunner runner = cmd.getLocalRunner();
-        assertNull(runner.getFunctionName());
-        assertNull(runner.getInputs());
-        assertNull(runner.getOutput());
-        assertNull(runner.getFnConfigFile());
-    }
+//    @Test
+//    public void testLocalRunnerCmdNoArguments() throws Exception {
+//        cmd.run(new String[] { "run" });
+//
+//        LocalRunner runner = cmd.getLocalRunner();
+//        assertNull(runner.getFunctionName());
+//        assertNull(runner.getInputs());
+//        assertNull(runner.getOutput());
+//        assertNull(runner.getFnConfigFile());
+//    }
 
     /*
     TODO(sijie):- Can we fix this?
@@ -347,8 +352,6 @@ public class CmdFunctionsTest {
         String inputTopicName = TEST_NAME + "-input-topic";
         String outputTopicName = TEST_NAME + "-output-topic";
 
-
-
         cmd.run(new String[] {
             "update",
             "--name", fnName,
@@ -425,4 +428,244 @@ public class CmdFunctionsTest {
             "test-key",
             new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8));
     }
+
+    private static final String fnName = TEST_NAME + "-function";
+    private static final String inputTopicName = TEST_NAME + "-input-topic";
+    private static final String outputTopicName = TEST_NAME + "-output-topic";
+
+    private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorrectArgs,
+                                              String errMessageCheck) throws Exception {
+
+        String[] cmds = {"create", "update", "localrun"};
+
+        for (String type : cmds) {
+            List<String> correctArgList = new LinkedList<>();
+            List<String> incorrectArgList = new LinkedList<>();
+            correctArgList.add(type);
+            incorrectArgList.add(type);
+
+            correctArgList.addAll(Arrays.asList(correctArgs));
+            incorrectArgList.addAll(Arrays.asList(incorrectArgs));
+            cmd.run(correctArgList.toArray(new String[correctArgList.size()]));
+
+            if (type.equals("create")) {
+                CreateFunction creater = cmd.getCreater();
+                assertEquals(fnName, creater.getFunctionName());
+                assertEquals(inputTopicName, creater.getInputs());
+                assertEquals(outputTopicName, creater.getOutput());
+            } else if (type.equals("update")){
+                UpdateFunction updater = cmd.getUpdater();
+                assertEquals(fnName, updater.getFunctionName());
+                assertEquals(inputTopicName, updater.getInputs());
+                assertEquals(outputTopicName, updater.getOutput());
+            } else {
+                CmdFunctions.LocalRunner localRunner = cmd.getLocalRunner();
+                assertEquals(fnName, localRunner.getFunctionName());
+                assertEquals(inputTopicName, localRunner.getInputs());
+                assertEquals(outputTopicName, localRunner.getOutput());
+            }
+
+            if (type.equals("create")) {
+                verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+            } else if (type.equals("update")) {
+                verify(functions, times(1)).updateFunction(any(FunctionDetails.class), anyString());
+            }
+
+            setup();
+            ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
+            consoleOutputCapturer.start();
+            cmd.run(incorrectArgList.toArray(new String[incorrectArgList.size()]));
+
+            consoleOutputCapturer.stop();
+            String output = consoleOutputCapturer.getStderr();
+            assertEquals(output.replace("\n", ""), errMessageCheck);
+        }
+    }
+
+    @Test
+    public void TestCreateFunctionParallelism() throws Exception{
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+                "--parallelism", "1"
+        };
+
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+                "--parallelism", "-1"
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Field 'parallelism' must be a Positive Number");
+
+    }
+
+    @Test
+    public void TestCreateTopicName() throws Exception {
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        String wrongOutputTopicName = TEST_NAME + "-output-topic/test:";
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", wrongOutputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "The topic name " + wrongOutputTopicName + " is invalid for field 'output'");
+    }
+
+    @Test
+    public void TestCreateClassName() throws Exception {
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        String cannotLoadClass = "com.test.Function";
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", cannotLoadClass,
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Cannot find/load class " + cannotLoadClass);
+    }
+
+    @Test
+    public void TestCreateSameInOutTopic() throws Exception {
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", inputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs,
+                "Output topic " + inputTopicName
+                        + " is also being used as an input topic (topics must be one or the other)");
+
+    }
+
+
+    public static class ConsoleOutputCapturer {
+        private ByteArrayOutputStream stdout;
+        private ByteArrayOutputStream stderr;
+        private PrintStream previous;
+        private boolean capturing;
+
+        public void start() {
+            if (capturing) {
+                return;
+            }
+
+            capturing = true;
+            previous = System.out;
+            stdout = new ByteArrayOutputStream();
+            stderr = new ByteArrayOutputStream();
+
+            OutputStream outputStreamCombinerstdout =
+                    new OutputStreamCombiner(Arrays.asList(previous, stdout));
+            PrintStream stdoutStream = new PrintStream(outputStreamCombinerstdout);
+
+            OutputStream outputStreamCombinerStderr =
+                    new OutputStreamCombiner(Arrays.asList(previous, stderr));
+            PrintStream stderrStream = new PrintStream(outputStreamCombinerStderr);
+
+            System.setOut(stdoutStream);
+            System.setErr(stderrStream);
+        }
+
+        public void stop() {
+            if (!capturing) {
+                return;
+            }
+
+            System.setOut(previous);
+
+            previous = null;
+            capturing = false;
+        }
+
+        public String getStdout() {
+            return stdout.toString();
+        }
+
+        public String getStderr() {
+            return stderr.toString();
+        }
+
+        private static class OutputStreamCombiner extends OutputStream {
+            private List<OutputStream> outputStreams;
+
+            public OutputStreamCombiner(List<OutputStream> outputStreams) {
+                this.outputStreams = outputStreams;
+            }
+
+            public void write(int b) throws IOException {
+                for (OutputStream os : outputStreams) {
+                    os.write(b);
+                }
+            }
+
+            public void flush() throws IOException {
+                for (OutputStream os : outputStreams) {
+                    os.flush();
+                }
+            }
+
+            public void close() throws IOException {
+                for (OutputStream os : outputStreams) {
+                    os.close();
+                }
+            }
+        }
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3181290..9b45f3c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -45,7 +44,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -65,11 +63,9 @@ import org.apache.pulsar.functions.windowing.WindowUtils;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -359,17 +355,14 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
                 userCodeFile = pyFile;
             } else {
-                throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
+                throw new ParameterException("Either a Java jar or a Python file needs to be specified for the function");
             }
 
             // infer default vaues
             inferMissingArguments(functionConfig);
-
-            // check if function configs are valid
-            validateFunctionConfigs(functionConfig);
         }
 
-        private void validateFunctionConfigs(FunctionConfig functionConfig) {
+        protected void validateFunctionConfigs(FunctionConfig functionConfig) {
 
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                 File file = new File(jarFile);
@@ -377,7 +370,7 @@ public class CmdFunctions extends CmdBase {
                 try {
                     userJarLoader = Reflections.loadJar(file);
                 } catch (MalformedURLException e) {
-                    throw new RuntimeException("Failed to load user jar " + file, e);
+                    throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage());
                 }
                 // make sure the function class loader is accessible thread-locally
                 Thread.currentThread().setContextClassLoader(userJarLoader);
@@ -415,7 +408,7 @@ public class CmdFunctions extends CmdBase {
                 // set auto ack to false since windowing framework is responsible
                 // for acking and not the function framework
                 if (autoAck != null && autoAck == true) {
-                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+                    throw new ParameterException("Cannot enable auto ack when using windowing functionality");
                 }
                 functionConfig.setAutoAck(false);
             }
@@ -423,7 +416,7 @@ public class CmdFunctions extends CmdBase {
 
         private void inferMissingFunctionName(FunctionConfig functionConfig) {
             if (isNull(functionConfig.getClassName())) {
-                throw new IllegalArgumentException("You must specify a class name for the function");
+                throw new ParameterException("You must specify a class name for the function");
             }
 
             String [] domains = functionConfig.getClassName().split("\\.");
@@ -469,13 +462,7 @@ public class CmdFunctions extends CmdBase {
 
             Class<?>[] typeArgs = null;
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-
-                File file = new File(jarFile);
-                try {
-                    Reflections.loadJar(file);
-                } catch (MalformedURLException e) {
-                    throw new RuntimeException("Failed to load user jar " + file, e);
-                }
+                // Assuming any external jars are already loaded
                 typeArgs = Utils.getFunctionTypes(functionConfig);
             }
 
@@ -588,7 +575,8 @@ public class CmdFunctions extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            checkRequiredFields(functionConfig);
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
             CmdFunctions.startLocalRun(convertProto2(functionConfig),
                     functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin);
         }
@@ -598,7 +586,8 @@ public class CmdFunctions extends CmdBase {
     class CreateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            checkRequiredFields(functionConfig);
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
             admin.functions().createFunction(convert(functionConfig), userCodeFile);
             print("Created successfully");
         }
@@ -637,7 +626,8 @@ public class CmdFunctions extends CmdBase {
     class UpdateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            checkRequiredFields(functionConfig);
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
             admin.functions().updateFunction(convert(functionConfig), userCodeFile);
             print("Updated successfully");
         }
@@ -720,7 +710,7 @@ public class CmdFunctions extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if (triggerFile == null && triggerValue == null) {
-                throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
+                throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified");
             }
             String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
             System.out.println(retval);
@@ -850,36 +840,6 @@ public class CmdFunctions extends CmdBase {
         return mapper.readValue(file, FunctionConfig.class);
     }
 
-    private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
-        if (inputTopics.contains(outputTopic)) {
-            throw new IllegalArgumentException(
-                    String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
-                            outputTopic));
-        }
-    }
-
-    private static void checkRequiredFields(FunctionConfig config) throws IllegalArgumentException {
-        if (isNull(config.getTenant())) {
-            throw new IllegalArgumentException("You must specify a tenant for the function");
-        }
-
-        if (isNull(config.getNamespace())) {
-            throw new IllegalArgumentException("You must specify a namespace for the function");
-        }
-
-        if (isNull(config.getName())) {
-            throw new IllegalArgumentException("You must specify a name for the function");
-        }
-
-        if (isNull(config.getClassName())) {
-            throw new IllegalArgumentException("You must specify a class name for the function");
-        }
-
-        if (config.getInputs().isEmpty() && config.getCustomSerdeInputs().isEmpty()) {
-            throw new IllegalArgumentException("You must specify one or more input topics for the function");
-        }
-    }
-
     private static FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) {
             if (type.name().equals(runtime.name())) {
@@ -912,7 +872,7 @@ public class CmdFunctions extends CmdBase {
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
-            throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
+            throw new ParameterException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
         } else {
             functionConfig.setTenant(args[0]);
             functionConfig.setNamespace(args[1]);

-- 
To stop receiving notification emails like this one, please contact
jerrypeng@apache.org.