You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/31 21:11:09 UTC

[GitHub] jerrypeng closed pull request #1869: adding CmdFunctions unit tests and clean up

jerrypeng closed pull request #1869: adding CmdFunctions unit tests and clean up
URL: https://github.com/apache/incubator-pulsar/pull/1869
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 1cc239db63..08269567f8 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -18,29 +18,10 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
 import com.google.gson.Gson;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -50,7 +31,6 @@
 import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
-import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -68,9 +48,34 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 /**
  * Unit test of {@link CmdFunctions}.
  */
+@Slf4j
 @PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class })
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
 public class CmdFunctionsTest {
@@ -124,16 +129,16 @@ public void setup() throws Exception {
         when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class));
     }
 
-    @Test
-    public void testLocalRunnerCmdNoArguments() throws Exception {
-        cmd.run(new String[] { "run" });
-
-        LocalRunner runner = cmd.getLocalRunner();
-        assertNull(runner.getFunctionName());
-        assertNull(runner.getInputs());
-        assertNull(runner.getOutput());
-        assertNull(runner.getFnConfigFile());
-    }
+//    @Test
+//    public void testLocalRunnerCmdNoArguments() throws Exception {
+//        cmd.run(new String[] { "run" });
+//
+//        LocalRunner runner = cmd.getLocalRunner();
+//        assertNull(runner.getFunctionName());
+//        assertNull(runner.getInputs());
+//        assertNull(runner.getOutput());
+//        assertNull(runner.getFnConfigFile());
+//    }
 
     /*
     TODO(sijie):- Can we fix this?
@@ -347,8 +352,6 @@ public void testUpdateFunction() throws Exception {
         String inputTopicName = TEST_NAME + "-input-topic";
         String outputTopicName = TEST_NAME + "-output-topic";
 
-
-
         cmd.run(new String[] {
             "update",
             "--name", fnName,
@@ -425,4 +428,244 @@ public void testStateGetter() throws Exception {
             "test-key",
             new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8));
     }
+
+    private static final String fnName = TEST_NAME + "-function";
+    private static final String inputTopicName = TEST_NAME + "-input-topic";
+    private static final String outputTopicName = TEST_NAME + "-output-topic";
+
+    private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorrectArgs,
+                                              String errMessageCheck) throws Exception {
+
+        String[] cmds = {"create", "update", "localrun"};
+
+        for (String type : cmds) {
+            List<String> correctArgList = new LinkedList<>();
+            List<String> incorrectArgList = new LinkedList<>();
+            correctArgList.add(type);
+            incorrectArgList.add(type);
+
+            correctArgList.addAll(Arrays.asList(correctArgs));
+            incorrectArgList.addAll(Arrays.asList(incorrectArgs));
+            cmd.run(correctArgList.toArray(new String[correctArgList.size()]));
+
+            if (type.equals("create")) {
+                CreateFunction creater = cmd.getCreater();
+                assertEquals(fnName, creater.getFunctionName());
+                assertEquals(inputTopicName, creater.getInputs());
+                assertEquals(outputTopicName, creater.getOutput());
+            } else if (type.equals("update")){
+                UpdateFunction updater = cmd.getUpdater();
+                assertEquals(fnName, updater.getFunctionName());
+                assertEquals(inputTopicName, updater.getInputs());
+                assertEquals(outputTopicName, updater.getOutput());
+            } else {
+                CmdFunctions.LocalRunner localRunner = cmd.getLocalRunner();
+                assertEquals(fnName, localRunner.getFunctionName());
+                assertEquals(inputTopicName, localRunner.getInputs());
+                assertEquals(outputTopicName, localRunner.getOutput());
+            }
+
+            if (type.equals("create")) {
+                verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+            } else if (type.equals("update")) {
+                verify(functions, times(1)).updateFunction(any(FunctionDetails.class), anyString());
+            }
+
+            setup();
+            ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
+            consoleOutputCapturer.start();
+            cmd.run(incorrectArgList.toArray(new String[incorrectArgList.size()]));
+
+            consoleOutputCapturer.stop();
+            String output = consoleOutputCapturer.getStderr();
+            assertEquals(output.replace("\n", ""), errMessageCheck);
+        }
+    }
+
+    @Test
+    public void TestCreateFunctionParallelism() throws Exception{
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+                "--parallelism", "1"
+        };
+
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+                "--parallelism", "-1"
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Field 'parallelism' must be a Positive Number");
+
+    }
+
+    @Test
+    public void TestCreateTopicName() throws Exception {
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        String wrongOutputTopicName = TEST_NAME + "-output-topic/test:";
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", wrongOutputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "The topic name " + wrongOutputTopicName + " is invalid for field 'output'");
+    }
+
+    @Test
+    public void TestCreateClassName() throws Exception {
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        String cannotLoadClass = "com.test.Function";
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", cannotLoadClass,
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Cannot find/load class " + cannotLoadClass);
+    }
+
+    @Test
+    public void TestCreateSameInOutTopic() throws Exception {
+
+        String[] correctArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        String[] incorrectArgs = new String[]{
+                "--name", fnName,
+                "--inputs", inputTopicName,
+                "--output", inputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        };
+
+        testValidateFunctionsConfigs(correctArgs, incorrectArgs,
+                "Output topic " + inputTopicName
+                        + " is also being used as an input topic (topics must be one or the other)");
+
+    }
+
+
+    public static class ConsoleOutputCapturer {
+        private ByteArrayOutputStream stdout;
+        private ByteArrayOutputStream stderr;
+        private PrintStream previous;
+        private boolean capturing;
+
+        public void start() {
+            if (capturing) {
+                return;
+            }
+
+            capturing = true;
+            previous = System.out;
+            stdout = new ByteArrayOutputStream();
+            stderr = new ByteArrayOutputStream();
+
+            OutputStream outputStreamCombinerstdout =
+                    new OutputStreamCombiner(Arrays.asList(previous, stdout));
+            PrintStream stdoutStream = new PrintStream(outputStreamCombinerstdout);
+
+            OutputStream outputStreamCombinerStderr =
+                    new OutputStreamCombiner(Arrays.asList(previous, stderr));
+            PrintStream stderrStream = new PrintStream(outputStreamCombinerStderr);
+
+            System.setOut(stdoutStream);
+            System.setErr(stderrStream);
+        }
+
+        public void stop() {
+            if (!capturing) {
+                return;
+            }
+
+            System.setOut(previous);
+
+            previous = null;
+            capturing = false;
+        }
+
+        public String getStdout() {
+            return stdout.toString();
+        }
+
+        public String getStderr() {
+            return stderr.toString();
+        }
+
+        private static class OutputStreamCombiner extends OutputStream {
+            private List<OutputStream> outputStreams;
+
+            public OutputStreamCombiner(List<OutputStream> outputStreams) {
+                this.outputStreams = outputStreams;
+            }
+
+            public void write(int b) throws IOException {
+                for (OutputStream os : outputStreams) {
+                    os.write(b);
+                }
+            }
+
+            public void flush() throws IOException {
+                for (OutputStream os : outputStreams) {
+                    os.flush();
+                }
+            }
+
+            public void close() throws IOException {
+                for (OutputStream os : outputStreams) {
+                    os.close();
+                }
+            }
+        }
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 94e17ad17b..73e99af0ee 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -34,7 +34,6 @@
 import io.netty.buffer.Unpooled;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -45,7 +44,6 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -65,11 +63,9 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -359,17 +355,14 @@ void processArguments() throws Exception {
                 functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
                 userCodeFile = pyFile;
             } else {
-                throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
+                throw new ParameterException("Either a Java jar or a Python file needs to be specified for the function");
             }
 
             // infer default vaues
             inferMissingArguments(functionConfig);
-
-            // check if function configs are valid
-            validateFunctionConfigs(functionConfig);
         }
 
-        private void validateFunctionConfigs(FunctionConfig functionConfig) {
+        protected void validateFunctionConfigs(FunctionConfig functionConfig) {
 
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                 File file = new File(jarFile);
@@ -377,7 +370,7 @@ private void validateFunctionConfigs(FunctionConfig functionConfig) {
                 try {
                     userJarLoader = Reflections.loadJar(file);
                 } catch (MalformedURLException e) {
-                    throw new RuntimeException("Failed to load user jar " + file, e);
+                    throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage());
                 }
                 // make sure the function class loader is accessible thread-locally
                 Thread.currentThread().setContextClassLoader(userJarLoader);
@@ -415,7 +408,7 @@ private void inferMissingArguments(FunctionConfig functionConfig) {
                 // set auto ack to false since windowing framework is responsible
                 // for acking and not the function framework
                 if (autoAck != null && autoAck == true) {
-                    throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+                    throw new ParameterException("Cannot enable auto ack when using windowing functionality");
                 }
                 functionConfig.setAutoAck(false);
             }
@@ -423,7 +416,7 @@ private void inferMissingArguments(FunctionConfig functionConfig) {
 
         private void inferMissingFunctionName(FunctionConfig functionConfig) {
             if (isNull(functionConfig.getClassName())) {
-                throw new IllegalArgumentException("You must specify a class name for the function");
+                throw new ParameterException("You must specify a class name for the function");
             }
 
             String [] domains = functionConfig.getClassName().split("\\.");
@@ -469,13 +462,7 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
 
             Class<?>[] typeArgs = null;
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-
-                File file = new File(jarFile);
-                try {
-                    Reflections.loadJar(file);
-                } catch (MalformedURLException e) {
-                    throw new RuntimeException("Failed to load user jar " + file, e);
-                }
+                // Assuming any external jars are already loaded
                 typeArgs = Utils.getFunctionTypes(functionConfig);
             }
 
@@ -588,7 +575,8 @@ protected FunctionDetails convert(FunctionConfig functionConfig)
 
         @Override
         void runCmd() throws Exception {
-            checkRequiredFields(functionConfig);
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
             CmdFunctions.startLocalRun(convertProto2(functionConfig),
                     functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin);
         }
@@ -598,7 +586,8 @@ void runCmd() throws Exception {
     class CreateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            checkRequiredFields(functionConfig);
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
             admin.functions().createFunction(convert(functionConfig), userCodeFile);
             print("Created successfully");
         }
@@ -637,7 +626,8 @@ void runCmd() throws Exception {
     class UpdateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            checkRequiredFields(functionConfig);
+            // check if function configs are valid
+            validateFunctionConfigs(functionConfig);
             admin.functions().updateFunction(convert(functionConfig), userCodeFile);
             print("Updated successfully");
         }
@@ -720,7 +710,7 @@ void runCmd() throws Exception {
         @Override
         void runCmd() throws Exception {
             if (triggerFile == null && triggerValue == null) {
-                throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
+                throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified");
             }
             String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
             System.out.println(retval);
@@ -850,36 +840,6 @@ private static FunctionConfig loadConfig(File file) throws IOException {
         return mapper.readValue(file, FunctionConfig.class);
     }
 
-    private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
-        if (inputTopics.contains(outputTopic)) {
-            throw new IllegalArgumentException(
-                    String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
-                            outputTopic));
-        }
-    }
-
-    private static void checkRequiredFields(FunctionConfig config) throws IllegalArgumentException {
-        if (isNull(config.getTenant())) {
-            throw new IllegalArgumentException("You must specify a tenant for the function");
-        }
-
-        if (isNull(config.getNamespace())) {
-            throw new IllegalArgumentException("You must specify a namespace for the function");
-        }
-
-        if (isNull(config.getName())) {
-            throw new IllegalArgumentException("You must specify a name for the function");
-        }
-
-        if (isNull(config.getClassName())) {
-            throw new IllegalArgumentException("You must specify a class name for the function");
-        }
-
-        if (config.getInputs().isEmpty() && config.getCustomSerdeInputs().isEmpty()) {
-            throw new IllegalArgumentException("You must specify one or more input topics for the function");
-        }
-    }
-
     private static FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) {
             if (type.name().equals(runtime.name())) {
@@ -912,7 +872,7 @@ private static ProcessingGuarantees convertProcessingGuarantee(
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
-            throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
+            throw new ParameterException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
         } else {
             functionConfig.setTenant(args[0]);
             functionConfig.setNamespace(args[1]);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services