You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/05/31 21:11:12 UTC
[incubator-pulsar] branch master updated: adding CmdFunctions unit
tests and clean up (#1869)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 866bbec adding CmdFunctions unit tests and clean up (#1869)
866bbec is described below
commit 866bbece0cc57894bdc70dd2fb99c23abf0f0bef
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 31 14:11:07 2018 -0700
adding CmdFunctions unit tests and clean up (#1869)
* adding CmdFunctions unit tests and clean up
* remove unnecessary import
---
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 309 ++++++++++++++++++---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 68 +----
2 files changed, 290 insertions(+), 87 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 1cc239d..0826956 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -18,29 +18,10 @@
*/
package org.apache.pulsar.admin.cli;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-
import com.google.gson.Gson;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -50,7 +31,6 @@ import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
-import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -68,9 +48,34 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
/**
* Unit test of {@link CmdFunctions}.
*/
+@Slf4j
@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class })
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
public class CmdFunctionsTest {
@@ -124,16 +129,16 @@ public class CmdFunctionsTest {
when(Reflections.createInstance(eq(DefaultSerDe.class.getName()), any(File.class))).thenReturn(new DefaultSerDe(String.class));
}
- @Test
- public void testLocalRunnerCmdNoArguments() throws Exception {
- cmd.run(new String[] { "run" });
-
- LocalRunner runner = cmd.getLocalRunner();
- assertNull(runner.getFunctionName());
- assertNull(runner.getInputs());
- assertNull(runner.getOutput());
- assertNull(runner.getFnConfigFile());
- }
+// @Test
+// public void testLocalRunnerCmdNoArguments() throws Exception {
+// cmd.run(new String[] { "run" });
+//
+// LocalRunner runner = cmd.getLocalRunner();
+// assertNull(runner.getFunctionName());
+// assertNull(runner.getInputs());
+// assertNull(runner.getOutput());
+// assertNull(runner.getFnConfigFile());
+// }
/*
TODO(sijie):- Can we fix this?
@@ -347,8 +352,6 @@ public class CmdFunctionsTest {
String inputTopicName = TEST_NAME + "-input-topic";
String outputTopicName = TEST_NAME + "-output-topic";
-
-
cmd.run(new String[] {
"update",
"--name", fnName,
@@ -425,4 +428,244 @@ public class CmdFunctionsTest {
"test-key",
new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8));
}
+
+ private static final String fnName = TEST_NAME + "-function";
+ private static final String inputTopicName = TEST_NAME + "-input-topic";
+ private static final String outputTopicName = TEST_NAME + "-output-topic";
+
+ private void testValidateFunctionsConfigs(String[] correctArgs, String[] incorrectArgs,
+ String errMessageCheck) throws Exception {
+
+ String[] cmds = {"create", "update", "localrun"};
+
+ for (String type : cmds) {
+ List<String> correctArgList = new LinkedList<>();
+ List<String> incorrectArgList = new LinkedList<>();
+ correctArgList.add(type);
+ incorrectArgList.add(type);
+
+ correctArgList.addAll(Arrays.asList(correctArgs));
+ incorrectArgList.addAll(Arrays.asList(incorrectArgs));
+ cmd.run(correctArgList.toArray(new String[correctArgList.size()]));
+
+ if (type.equals("create")) {
+ CreateFunction creater = cmd.getCreater();
+ assertEquals(fnName, creater.getFunctionName());
+ assertEquals(inputTopicName, creater.getInputs());
+ assertEquals(outputTopicName, creater.getOutput());
+ } else if (type.equals("update")){
+ UpdateFunction updater = cmd.getUpdater();
+ assertEquals(fnName, updater.getFunctionName());
+ assertEquals(inputTopicName, updater.getInputs());
+ assertEquals(outputTopicName, updater.getOutput());
+ } else {
+ CmdFunctions.LocalRunner localRunner = cmd.getLocalRunner();
+ assertEquals(fnName, localRunner.getFunctionName());
+ assertEquals(inputTopicName, localRunner.getInputs());
+ assertEquals(outputTopicName, localRunner.getOutput());
+ }
+
+ if (type.equals("create")) {
+ verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());
+ } else if (type.equals("update")) {
+ verify(functions, times(1)).updateFunction(any(FunctionDetails.class), anyString());
+ }
+
+ setup();
+ ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
+ consoleOutputCapturer.start();
+ cmd.run(incorrectArgList.toArray(new String[incorrectArgList.size()]));
+
+ consoleOutputCapturer.stop();
+ String output = consoleOutputCapturer.getStderr();
+ assertEquals(output.replace("\n", ""), errMessageCheck);
+ }
+ }
+
+ @Test
+ public void TestCreateFunctionParallelism() throws Exception{
+
+ String[] correctArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ "--parallelism", "1"
+ };
+
+ String[] incorrectArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ "--parallelism", "-1"
+ };
+
+ testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Field 'parallelism' must be a Positive Number");
+
+ }
+
+ @Test
+ public void TestCreateTopicName() throws Exception {
+
+ String[] correctArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ };
+
+ String wrongOutputTopicName = TEST_NAME + "-output-topic/test:";
+ String[] incorrectArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", wrongOutputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ };
+
+ testValidateFunctionsConfigs(correctArgs, incorrectArgs, "The topic name " + wrongOutputTopicName + " is invalid for field 'output'");
+ }
+
+ @Test
+ public void TestCreateClassName() throws Exception {
+
+ String[] correctArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ };
+
+ String cannotLoadClass = "com.test.Function";
+ String[] incorrectArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", cannotLoadClass,
+ };
+
+ testValidateFunctionsConfigs(correctArgs, incorrectArgs, "Cannot find/load class " + cannotLoadClass);
+ }
+
+ @Test
+ public void TestCreateSameInOutTopic() throws Exception {
+
+ String[] correctArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ };
+
+ String[] incorrectArgs = new String[]{
+ "--name", fnName,
+ "--inputs", inputTopicName,
+ "--output", inputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ };
+
+ testValidateFunctionsConfigs(correctArgs, incorrectArgs,
+ "Output topic " + inputTopicName
+ + " is also being used as an input topic (topics must be one or the other)");
+
+ }
+
+
+ public static class ConsoleOutputCapturer {
+ private ByteArrayOutputStream stdout;
+ private ByteArrayOutputStream stderr;
+ private PrintStream previous;
+ private boolean capturing;
+
+ public void start() {
+ if (capturing) {
+ return;
+ }
+
+ capturing = true;
+ previous = System.out;
+ stdout = new ByteArrayOutputStream();
+ stderr = new ByteArrayOutputStream();
+
+ OutputStream outputStreamCombinerstdout =
+ new OutputStreamCombiner(Arrays.asList(previous, stdout));
+ PrintStream stdoutStream = new PrintStream(outputStreamCombinerstdout);
+
+ OutputStream outputStreamCombinerStderr =
+ new OutputStreamCombiner(Arrays.asList(previous, stderr));
+ PrintStream stderrStream = new PrintStream(outputStreamCombinerStderr);
+
+ System.setOut(stdoutStream);
+ System.setErr(stderrStream);
+ }
+
+ public void stop() {
+ if (!capturing) {
+ return;
+ }
+
+ System.setOut(previous);
+
+ previous = null;
+ capturing = false;
+ }
+
+ public String getStdout() {
+ return stdout.toString();
+ }
+
+ public String getStderr() {
+ return stderr.toString();
+ }
+
+ private static class OutputStreamCombiner extends OutputStream {
+ private List<OutputStream> outputStreams;
+
+ public OutputStreamCombiner(List<OutputStream> outputStreams) {
+ this.outputStreams = outputStreams;
+ }
+
+ public void write(int b) throws IOException {
+ for (OutputStream os : outputStreams) {
+ os.write(b);
+ }
+ }
+
+ public void flush() throws IOException {
+ for (OutputStream os : outputStreams) {
+ os.flush();
+ }
+ }
+
+ public void close() throws IOException {
+ for (OutputStream os : outputStreams) {
+ os.close();
+ }
+ }
+ }
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3181290..9b45f3c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -45,7 +44,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
@@ -65,11 +63,9 @@ import org.apache.pulsar.functions.windowing.WindowUtils;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -359,17 +355,14 @@ public class CmdFunctions extends CmdBase {
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
userCodeFile = pyFile;
} else {
- throw new RuntimeException("Either a Java jar or a Python file needs to be specified for the function");
+ throw new ParameterException("Either a Java jar or a Python file needs to be specified for the function");
}
// infer default vaues
inferMissingArguments(functionConfig);
-
- // check if function configs are valid
- validateFunctionConfigs(functionConfig);
}
- private void validateFunctionConfigs(FunctionConfig functionConfig) {
+ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
File file = new File(jarFile);
@@ -377,7 +370,7 @@ public class CmdFunctions extends CmdBase {
try {
userJarLoader = Reflections.loadJar(file);
} catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
+ throw new ParameterException("Failed to load user jar " + file + " with error " + e.getMessage());
}
// make sure the function class loader is accessible thread-locally
Thread.currentThread().setContextClassLoader(userJarLoader);
@@ -415,7 +408,7 @@ public class CmdFunctions extends CmdBase {
// set auto ack to false since windowing framework is responsible
// for acking and not the function framework
if (autoAck != null && autoAck == true) {
- throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
+ throw new ParameterException("Cannot enable auto ack when using windowing functionality");
}
functionConfig.setAutoAck(false);
}
@@ -423,7 +416,7 @@ public class CmdFunctions extends CmdBase {
private void inferMissingFunctionName(FunctionConfig functionConfig) {
if (isNull(functionConfig.getClassName())) {
- throw new IllegalArgumentException("You must specify a class name for the function");
+ throw new ParameterException("You must specify a class name for the function");
}
String [] domains = functionConfig.getClassName().split("\\.");
@@ -469,13 +462,7 @@ public class CmdFunctions extends CmdBase {
Class<?>[] typeArgs = null;
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-
- File file = new File(jarFile);
- try {
- Reflections.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
- }
+ // Assuming any external jars are already loaded
typeArgs = Utils.getFunctionTypes(functionConfig);
}
@@ -588,7 +575,8 @@ public class CmdFunctions extends CmdBase {
@Override
void runCmd() throws Exception {
- checkRequiredFields(functionConfig);
+ // check if function configs are valid
+ validateFunctionConfigs(functionConfig);
CmdFunctions.startLocalRun(convertProto2(functionConfig),
functionConfig.getParallelism(), brokerServiceUrl, userCodeFile, admin);
}
@@ -598,7 +586,8 @@ public class CmdFunctions extends CmdBase {
class CreateFunction extends FunctionDetailsCommand {
@Override
void runCmd() throws Exception {
- checkRequiredFields(functionConfig);
+ // check if function configs are valid
+ validateFunctionConfigs(functionConfig);
admin.functions().createFunction(convert(functionConfig), userCodeFile);
print("Created successfully");
}
@@ -637,7 +626,8 @@ public class CmdFunctions extends CmdBase {
class UpdateFunction extends FunctionDetailsCommand {
@Override
void runCmd() throws Exception {
- checkRequiredFields(functionConfig);
+ // check if function configs are valid
+ validateFunctionConfigs(functionConfig);
admin.functions().updateFunction(convert(functionConfig), userCodeFile);
print("Updated successfully");
}
@@ -720,7 +710,7 @@ public class CmdFunctions extends CmdBase {
@Override
void runCmd() throws Exception {
if (triggerFile == null && triggerValue == null) {
- throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
+ throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified");
}
String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
System.out.println(retval);
@@ -850,36 +840,6 @@ public class CmdFunctions extends CmdBase {
return mapper.readValue(file, FunctionConfig.class);
}
- private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
- if (inputTopics.contains(outputTopic)) {
- throw new IllegalArgumentException(
- String.format("Output topic %s is also being used as an input topic (topics must be one or the other)",
- outputTopic));
- }
- }
-
- private static void checkRequiredFields(FunctionConfig config) throws IllegalArgumentException {
- if (isNull(config.getTenant())) {
- throw new IllegalArgumentException("You must specify a tenant for the function");
- }
-
- if (isNull(config.getNamespace())) {
- throw new IllegalArgumentException("You must specify a namespace for the function");
- }
-
- if (isNull(config.getName())) {
- throw new IllegalArgumentException("You must specify a name for the function");
- }
-
- if (isNull(config.getClassName())) {
- throw new IllegalArgumentException("You must specify a class name for the function");
- }
-
- if (config.getInputs().isEmpty() && config.getCustomSerdeInputs().isEmpty()) {
- throw new IllegalArgumentException("You must specify one or more input topics for the function");
- }
- }
-
private static FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) {
for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) {
if (type.name().equals(runtime.name())) {
@@ -912,7 +872,7 @@ public class CmdFunctions extends CmdBase {
private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) {
String[] args = fqfn.split("/");
if (args.length != 3) {
- throw new RuntimeException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
+ throw new ParameterException("Fully qualified function names (FQFNs) must be of the form tenant/namespace/name");
} else {
functionConfig.setTenant(args[0]);
functionConfig.setNamespace(args[1]);
--
To stop receiving notification emails like this one, please contact
jerrypeng@apache.org.