You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/05/14 16:48:30 UTC
[pulsar] branch master updated: Added more unit tests to the
JavaInstanceTest class (#10369)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c4098d6 Added more unit tests to the JavaInstanceTest class (#10369)
c4098d6 is described below
commit c4098d6845194ca78f3ddbc6748ec3c3bb313cb9
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Fri May 14 09:47:37 2021 -0700
Added more unit tests to the JavaInstanceTest class (#10369)
---
.../functions/instance/JavaInstanceTest.java | 96 ++++++++++++++++++++++
1 file changed, 96 insertions(+)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 59f36d9..b1e7cc7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.functions.instance;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -54,6 +56,35 @@ public class JavaInstanceTest {
assertEquals(new String(testString + "-lambda"), result.get().getResult());
instance.close();
}
+
+ @Test
+ public void testNullReturningFunction() throws Exception {
+ JavaInstance instance = new JavaInstance(
+ mock(ContextImpl.class),
+ (Function<String, String>) (input, context) -> null,
+ new InstanceConfig());
+ String testString = "ABC123";
+ CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+ assertNull(result.get().getResult());
+ instance.close();
+ }
+
+ @Test
+ public void testUserExceptionThrowingFunction() throws Exception {
+ final UserException userException = new UserException("Boom");
+ Function<String, String> func = (input, context) -> {
+ throw userException;
+ };
+
+ JavaInstance instance = new JavaInstance(
+ mock(ContextImpl.class),
+ func,
+ new InstanceConfig());
+ String testString = "ABC123";
+ CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+ assertSame(userException, result.get().getUserException());
+ instance.close();
+ }
@Test
public void testAsyncFunction() throws Exception {
@@ -86,6 +117,64 @@ public class JavaInstanceTest {
assertEquals(new String(testString + "-lambda"), result.get().getResult());
instance.close();
}
+
+ @Test
+ public void testNullReturningAsyncFunction() throws Exception {
+ InstanceConfig instanceConfig = new InstanceConfig();
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ Function<String, CompletableFuture<String>> function = (input, context) -> {
+ log.info("input string: {}", input);
+ CompletableFuture<String> result = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ Thread.sleep(500);
+ result.complete(null);
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ });
+
+ return result;
+ };
+
+ JavaInstance instance = new JavaInstance(
+ mock(ContextImpl.class),
+ function,
+ instanceConfig);
+ String testString = "ABC123";
+ CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+ assertNull(result.get().getResult());
+ instance.close();
+ }
+
+ @Test
+ public void testUserExceptionThrowingAsyncFunction() throws Exception {
+ final UserException userException = new UserException("Boom");
+ InstanceConfig instanceConfig = new InstanceConfig();
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ Function<String, CompletableFuture<String>> function = (input, context) -> {
+ log.info("input string: {}", input);
+ CompletableFuture<String> result = new CompletableFuture<>();
+ executor.submit(() -> {
+ result.completeExceptionally(userException);
+ });
+
+ return result;
+ };
+
+ JavaInstance instance = new JavaInstance(
+ mock(ContextImpl.class),
+ function,
+ instanceConfig);
+ String testString = "ABC123";
+ CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+ assertSame(userException, result.get().getUserException().getCause());
+ instance.close();
+ }
@Test
public void testAsyncFunctionMaxPending() throws Exception {
@@ -137,4 +226,11 @@ public class JavaInstanceTest {
log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
instance.close();
}
+
+ @SuppressWarnings("serial")
+ private static class UserException extends Exception {
+ public UserException(String msg) {
+ super(msg);
+ }
+ }
}