You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/08/03 17:59:47 UTC
[pulsar] branch master updated: Fix various issues with batch
source (#7716)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6e5d66c Fix various issues with batch source (#7716)
6e5d66c is described below
commit 6e5d66c39d1597a99256665b14a1209379bcbc28
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Aug 3 10:59:24 2020 -0700
Fix various issues with batch source (#7716)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../functions/instance/JavaInstanceRunnable.java | 15 +-
.../source}/batch/BatchSourceExecutor.java | 31 +-
.../source/batch/BatchSourceExecutorTest.java | 366 +++++++++++++++++++++
.../pulsar/functions/utils/FunctionCommon.java | 18 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 47 ++-
.../rest/api/v3/SourceApiV3ResourceTest.java | 21 +-
pulsar-io/batch-data-generator/pom.xml | 12 -
pulsar-io/batch/pom.xml | 52 ---
.../pulsar/io/batch/BatchSourceExecutorTest.java | 366 ---------------------
pulsar-io/pom.xml | 2 -
10 files changed, 458 insertions(+), 472 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 06dbf19..29cb71f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
@@ -745,9 +746,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
} else {
- object = Reflections.createInstance(
- sourceSpec.getClassName(),
- this.functionClassLoader);
+
+ // check if source is a batch source
+ if (sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName())) {
+ object = Reflections.createInstance(
+ sourceSpec.getClassName(),
+ this.instanceClassLoader);
+ } else {
+ object = Reflections.createInstance(
+ sourceSpec.getClassName(),
+ this.functionClassLoader);
+ }
}
Class<?>[] typeArgs;
diff --git a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
similarity index 94%
rename from pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 04d87fa..d1cac03 100644
--- a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -16,18 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.batch;
+package org.apache.pulsar.functions.source.batch;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.BatchSourceConfig;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.io.core.*;
+import org.apache.pulsar.io.core.BatchSource;
+import org.apache.pulsar.io.core.BatchSourceTriggerer;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
import java.util.HashMap;
import java.util.Map;
@@ -162,20 +169,6 @@ public class BatchSourceExecutor<T> implements Source<T> {
}
}
- public org.apache.pulsar.functions.api.Record<T> readInternal() {
- try {
- Record<T> record = batchSource.readNext();
- log.info("Record: {}", record);
- if (record != null) {
- return record;
- }
- } catch (Exception e) {
- log.error("Error on read", e);
- throw new RuntimeException(e);
- }
- return null;
- }
-
@Override
public void close() throws Exception {
this.stop();
@@ -235,6 +228,6 @@ public class BatchSourceExecutor<T> implements Source<T> {
currentTask = intermediateTopicConsumer.receive();
return;
}
-
+
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
new file mode 100644
index 0000000..feeb200
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source.batch;
+
+
+import com.google.gson.Gson;
+import lombok.Getter;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.io.BatchSourceConfig;
+import org.apache.pulsar.functions.api.Record;
+
+import org.apache.pulsar.io.core.BatchPushSource;
+import org.apache.pulsar.io.core.BatchSource;
+import org.apache.pulsar.io.core.BatchSourceTriggerer;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.function.Consumer;
+
+/**
+ * Unit tests for {@link org.apache.pulsar.functions.source.batch.BatchSourceExecutor}
+ */
+public class BatchSourceExecutorTest {
+
+ public static class TestBatchSource implements BatchSource<String> {
+ @Getter
+ private static int prepareCount;
+ @Getter
+ private static int discoverCount;
+ @Getter
+ private static int recordCount;
+ private Record record = Mockito.mock(Record.class);
+ public TestBatchSource() { }
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext context) throws Exception {
+ if (!config.containsKey("foo")) {
+ throw new IllegalArgumentException("Bad config passed to TestBatchSource");
+ }
+ }
+
+ @Override
+ public void discover(Consumer<byte[]> taskEater) throws Exception {
+ byte[] retval = new byte[10];
+ discoverCount++;
+ taskEater.accept(retval);
+ }
+
+ @Override
+ public void prepare(byte[] task) throws Exception {
+ prepareCount++;
+ }
+
+ @Override
+ public Record<String> readNext() throws Exception {
+ if (++recordCount % 5 == 0) {
+ return null;
+ } else {
+ return record;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+
+ public static class TestBatchPushSource extends BatchPushSource<String> {
+ @Getter
+ private static int prepareCount;
+ @Getter
+ private static int discoverCount;
+ @Getter
+ private static int recordCount;
+ private Record record = Mockito.mock(Record.class);
+ public TestBatchPushSource() { }
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext context) throws Exception {
+ if (!config.containsKey("foo")) {
+ throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
+ }
+ }
+
+ @Override
+ public void discover(Consumer<byte[]> taskEater) throws Exception {
+ byte[] retval = new byte[10];
+ discoverCount++;
+ taskEater.accept(retval);
+ }
+
+ @Override
+ public void prepare(byte[] task) throws Exception {
+ prepareCount++;
+ for (int i = 0; i < 5; ++i) {
+ consume(record);
+ ++recordCount;
+ }
+ consume(null);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+
+ public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
+ private Consumer<String> trigger;
+ private Thread thread;
+
+ public TestDiscoveryTriggerer() { }
+
+ @Override
+ public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+ if (!config.containsKey("DELAY_MS")) {
+ throw new IllegalArgumentException("Bad config passed to TestTriggerer");
+ }
+ }
+
+ @Override
+ public void start(Consumer<String> trigger) {
+ this.trigger = trigger;
+ thread = new Thread(() -> {
+ while(true) {
+ try {
+ Thread.sleep(100);
+ trigger.accept("Triggered");
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ thread.start();
+ }
+
+ @Override
+ public void stop() {
+ if (thread != null) {
+ thread.interrupt();
+ try {
+ thread.join();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ private TestBatchSource testBatchSource;
+ private TestBatchPushSource testBatchPushSource;
+ private BatchSourceConfig testBatchConfig;
+ private Map<String, Object> config;
+ private Map<String, Object> pushConfig;
+ private BatchSourceExecutor<String> batchSourceExecutor;
+ private SourceContext context;
+ private ConsumerBuilder consumerBuilder;
+ private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+ private TypedMessageBuilder<byte[]> messageBuilder;
+ private CyclicBarrier discoveryBarrier;
+ private Message<byte[]> discoveredTask;
+
+ private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
+ Map<String, Object> config = new HashMap<>();
+ config.put("foo", "bar");
+ config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
+ config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
+ return config;
+ }
+
+ private static BatchSourceConfig createBatchSourceConfig() {
+ BatchSourceConfig testBatchConfig = new BatchSourceConfig();
+ testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
+ Map<String, Object> triggererConfig = new HashMap<>();
+ triggererConfig.put("DELAY_MS", 500);
+ testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
+ return testBatchConfig;
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ testBatchSource = new TestBatchSource();
+ testBatchPushSource = new TestBatchPushSource();
+ batchSourceExecutor = new BatchSourceExecutor<>();
+ testBatchConfig = createBatchSourceConfig();
+ config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
+ pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
+ context = Mockito.mock(SourceContext.class);
+ Mockito.doReturn("test-function").when(context).getSourceName();
+ Mockito.doReturn("test-namespace").when(context).getNamespace();
+ Mockito.doReturn("test-tenant").when(context).getTenant();
+ Mockito.doReturn(0).when(context).getInstanceId();
+ consumerBuilder = Mockito.mock(ConsumerBuilder.class);
+ Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any());
+ Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any());
+ Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
+ discoveredTask = Mockito.mock(Message.class);
+ consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
+ Mockito.doReturn(discoveredTask).when(consumer).receive();
+ Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+ Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES);
+ messageBuilder = Mockito.mock(TypedMessageBuilder.class);
+ Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any());
+ Mockito.doReturn(messageBuilder).when(messageBuilder).properties(Mockito.any());
+ Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any());
+
+ // Discovery
+ discoveryBarrier = new CyclicBarrier(2);
+ Mockito.doAnswer(new Answer<MessageId>() {
+ @Override public MessageId answer(InvocationOnMock invocation) {
+ try {
+ discoveryBarrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ return null;
+ }
+ }).when(messageBuilder).send();
+ }
+
+ @AfterMethod
+ public void cleanUp() throws Exception {
+ batchSourceExecutor.close();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
+ public void testWithoutRightConfig() throws Exception {
+ config.clear();
+ batchSourceExecutor.open(config, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
+ public void testPushWithoutRightConfig() throws Exception {
+ pushConfig.clear();
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
+ public void testWithoutRightTriggerer() throws Exception {
+ testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
+ config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+ batchSourceExecutor.open(config, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
+ public void testPushWithoutRightTriggerer() throws Exception {
+ testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
+ pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
+ public void testWithoutRightTriggererConfig() throws Exception {
+ Map<String, Object> badConfig = new HashMap<>();
+ badConfig.put("something", "else");
+ testBatchConfig.setDiscoveryTriggererConfig(badConfig);
+ config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+ batchSourceExecutor.open(config, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
+ public void testPushWithoutRightTriggererConfig() throws Exception {
+ Map<String, Object> badConfig = new HashMap<>();
+ badConfig.put("something", "else");
+ testBatchConfig.setDiscoveryTriggererConfig(badConfig);
+ pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
+ public void testWithoutRightSource() throws Exception {
+ config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
+ batchSourceExecutor.open(config, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
+ public void testPushWithoutRightSource() throws Exception {
+ pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
+ public void testWithoutRightSourceConfig() throws Exception {
+ config.remove("foo");
+ config.put("something", "else");
+ batchSourceExecutor.open(config, context);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
+ public void testPushWithoutRightSourceConfig() throws Exception {
+ pushConfig.remove("foo");
+ pushConfig.put("something", "else");
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test
+ public void testOpenWithRightSource() throws Exception {
+ batchSourceExecutor.open(config, context);
+ }
+
+ @Test
+ public void testPushOpenWithRightSource() throws Exception {
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test
+ public void testLifeCycle() throws Exception {
+ batchSourceExecutor.open(config, context);
+ Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
+ discoveryBarrier.await();
+ Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
+ Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
+ for (int i = 0; i < 5; ++i) {
+ batchSourceExecutor.read();
+ }
+ Assert.assertEquals(testBatchSource.getRecordCount(), 6);
+ Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
+ Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
+ discoveryBarrier.await();
+ Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
+ Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
+ }
+
+ @Test
+ public void testPushLifeCycle() throws Exception {
+ batchSourceExecutor.open(pushConfig, context);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
+ discoveryBarrier.await();
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+ for (int i = 0; i < 5; ++i) {
+ batchSourceExecutor.read();
+ }
+ Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+ discoveryBarrier.await();
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 5a68855..b3d8fdf 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
+import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -186,14 +187,21 @@ public class FunctionCommon {
throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
}
-
public static Class<?> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
+ return getSourceType(classLoader.loadClass(className));
+ }
- Class userClass = classLoader.loadClass(className);
-
- Class<?> typeArg = TypeResolver.resolveRawArgument(Source.class, userClass);
+ public static Class<?>getSourceType(Class sourceClass) {
- return typeArg;
+ if (Source.class.isAssignableFrom(sourceClass)) {
+ return TypeResolver.resolveRawArgument(Source.class, sourceClass);
+ } else if (BatchSource.class.isAssignableFrom(sourceClass)) {
+ return TypeResolver.resolveRawArgument(BatchSource.class, sourceClass);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Source class %s does not implement the correct interface",
+ sourceClass.getName()));
+ }
}
public static Class<?> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 0296bcf..c8e0764 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -26,6 +26,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.BatchSourceConfig;
@@ -41,6 +42,8 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.io.core.BatchSource;
+import org.apache.pulsar.io.core.Source;
import java.io.File;
import java.io.IOException;
@@ -112,7 +115,7 @@ public class SourceConfigUtils {
if (sourceConfig.getBatchSourceConfig() != null) {
configs.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(sourceConfig.getBatchSourceConfig()));
configs.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, sourceSpecBuilder.getClassName());
- sourceSpecBuilder.setClassName("org.apache.pulsar.io.batch.BatchSourceExecutor");
+ sourceSpecBuilder.setClassName("org.apache.pulsar.functions.source.batch.BatchSourceExecutor");
}
sourceSpecBuilder.setConfigs(new Gson().toJson(configs));
@@ -296,7 +299,7 @@ public class SourceConfigUtils {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
try {
- typeArg = getSourceType(sourceClassName, narClassLoader);
+ narClassLoader.loadClass(sourceClassName);
classLoader = narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
@@ -307,13 +310,14 @@ public class SourceConfigUtils {
// if source class name is provided, we need to try to load it as a JAR and as a NAR.
if (jarClassLoader != null) {
try {
- typeArg = getSourceType(sourceClassName, jarClassLoader);
+ jarClassLoader.loadClass(sourceClassName);
classLoader = jarClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// class not found in JAR try loading as a NAR and searching for the class
if (narClassLoader != null) {
+
try {
- typeArg = getSourceType(sourceClassName, narClassLoader);
+ narClassLoader.loadClass(sourceClassName);
classLoader = narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
@@ -332,7 +336,7 @@ public class SourceConfigUtils {
validateConnectorConfig(sourceConfig, (NarClassLoader) narClassLoader);
}
try {
- typeArg = getSourceType(sourceClassName, narClassLoader);
+ narClassLoader.loadClass(sourceClassName);
classLoader = narClassLoader;
} catch (ClassNotFoundException | NoClassDefFoundError e1) {
throw new IllegalArgumentException(
@@ -354,6 +358,34 @@ public class SourceConfigUtils {
}
}
+ // check if source implements the correct interfaces
+ Class sourceClass;
+ try {
+ sourceClass = classLoader.loadClass(sourceClassName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ String.format("Source class %s not found in class loader", sourceClassName, e));
+ }
+
+ if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
+ throw new IllegalArgumentException(
+ String.format("Source class %s does not implement the correct interface",
+ sourceClass.getName()));
+ }
+
+ if (BatchSource.class.isAssignableFrom(sourceClass)) {
+ if (sourceConfig.getBatchSourceConfig() != null) {
+ validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Source class %s implements %s but batch source source config is not specified",
+ sourceClass.getName(), BatchSource.class.getName()));
+ }
+ }
+
+ // extract type from source class
+ typeArg = getSourceType(sourceClass);
+
// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
@@ -366,8 +398,9 @@ public class SourceConfigUtils {
ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, classLoader, false);
}
- if (sourceConfig.getBatchSourceConfig() != null) {
- validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
+ if (typeArg.equals(TypeResolver.Unknown.class)) {
+ throw new IllegalArgumentException(
+ String.format("Failed to resolve type for Source class %s", sourceClassName));
}
return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index fe9a117..cf6945f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -815,6 +815,9 @@ public class SourceApiV3ResourceTest {
Integer parallelism,
String expectedError) throws Exception {
+ NarClassLoader classLoader = mock(NarClassLoader.class);
+ doReturn(TwitterFireHose.class).when(classLoader).loadClass(eq(TwitterFireHose.class.getName()));
+
mockStatic(ConnectorUtils.class);
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
@@ -824,9 +827,9 @@ public class SourceApiV3ResourceTest {
mockStatic(FunctionCommon.class);
PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
doReturn(String.class).when(FunctionCommon.class);
- FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+ FunctionCommon.getSourceType(eq(TwitterFireHose.class));
- doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ doReturn(classLoader).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(), any(), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -884,6 +887,9 @@ public class SourceApiV3ResourceTest {
sourceConfig.setTopicName(outputTopic);
sourceConfig.setSerdeClassName(outputSerdeClassName);
+ NarClassLoader classLoader = mock(NarClassLoader.class);
+ doReturn(TwitterFireHose.class).when(classLoader).loadClass(eq(TwitterFireHose.class.getName()));
+
mockStatic(ConnectorUtils.class);
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
@@ -893,9 +899,9 @@ public class SourceApiV3ResourceTest {
mockStatic(FunctionCommon.class);
PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
doReturn(String.class).when(FunctionCommon.class);
- FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+ FunctionCommon.getSourceType(eq(TwitterFireHose.class));
- doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ doReturn(classLoader).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(), any(File.class), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -974,6 +980,9 @@ public class SourceApiV3ResourceTest {
sourceConfig.setClassName(className);
sourceConfig.setParallelism(parallelism);
+ NarClassLoader classLoader = mock(NarClassLoader.class);
+ doReturn(TwitterFireHose.class).when(classLoader).loadClass(eq(TwitterFireHose.class.getName()));
+
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
mockStatic(ConnectorUtils.class);
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
@@ -983,10 +992,10 @@ public class SourceApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
- FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+ FunctionCommon.getSourceType(eq(TwitterFireHose.class));
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
- doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+ doReturn(classLoader).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(), any(), any());
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
diff --git a/pulsar-io/batch-data-generator/pom.xml b/pulsar-io/batch-data-generator/pom.xml
index a4d8b80a2..91bc6aa 100644
--- a/pulsar-io/batch-data-generator/pom.xml
+++ b/pulsar-io/batch-data-generator/pom.xml
@@ -40,23 +40,11 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-common</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-batch</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
<groupId>io.codearte.jfairy</groupId>
<artifactId>jfairy</artifactId>
<version>0.5.9</version>
diff --git a/pulsar-io/batch/pom.xml b/pulsar-io/batch/pom.xml
deleted file mode 100644
index 4558030..0000000
--- a/pulsar-io/batch/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io</artifactId>
- <version>2.7.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-io-batch</artifactId>
- <name>Pulsar IO :: Batch</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-functions-utils</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- </dependencies>
-
-</project>
diff --git a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
deleted file mode 100644
index a9d9ef3..0000000
--- a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.batch;
-
-
-import com.google.gson.Gson;
-import lombok.Getter;
-import org.apache.pulsar.client.api.*;
-import org.apache.pulsar.common.io.BatchSourceConfig;
-import org.apache.pulsar.functions.api.Record;
-
-import org.apache.pulsar.io.core.BatchPushSource;
-import org.apache.pulsar.io.core.BatchSource;
-import org.apache.pulsar.io.core.BatchSourceTriggerer;
-import org.apache.pulsar.io.core.SourceContext;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CyclicBarrier;
-import java.util.function.Consumer;
-
-/**
- * Unit tests for {@link org.apache.pulsar.io.batch.BatchSourceExecutor}
- */
-public class BatchSourceExecutorTest {
-
- public static class TestBatchSource implements BatchSource<String> {
- @Getter
- private static int prepareCount;
- @Getter
- private static int discoverCount;
- @Getter
- private static int recordCount;
- private Record record = Mockito.mock(Record.class);
- public TestBatchSource() { }
-
- @Override
- public void open(Map<String, Object> config, SourceContext context) throws Exception {
- if (!config.containsKey("foo")) {
- throw new IllegalArgumentException("Bad config passed to TestBatchSource");
- }
- }
-
- @Override
- public void discover(Consumer<byte[]> taskEater) throws Exception {
- byte[] retval = new byte[10];
- discoverCount++;
- taskEater.accept(retval);
- }
-
- @Override
- public void prepare(byte[] task) throws Exception {
- prepareCount++;
- }
-
- @Override
- public Record<String> readNext() throws Exception {
- if (++recordCount % 5 == 0) {
- return null;
- } else {
- return record;
- }
- }
-
- @Override
- public void close() throws Exception {
-
- }
- }
-
- public static class TestBatchPushSource extends BatchPushSource<String> {
- @Getter
- private static int prepareCount;
- @Getter
- private static int discoverCount;
- @Getter
- private static int recordCount;
- private Record record = Mockito.mock(Record.class);
- public TestBatchPushSource() { }
-
- @Override
- public void open(Map<String, Object> config, SourceContext context) throws Exception {
- if (!config.containsKey("foo")) {
- throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
- }
- }
-
- @Override
- public void discover(Consumer<byte[]> taskEater) throws Exception {
- byte[] retval = new byte[10];
- discoverCount++;
- taskEater.accept(retval);
- }
-
- @Override
- public void prepare(byte[] task) throws Exception {
- prepareCount++;
- for (int i = 0; i < 5; ++i) {
- consume(record);
- ++recordCount;
- }
- consume(null);
- }
-
- @Override
- public void close() throws Exception {
-
- }
- }
-
- public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
- private Consumer<String> trigger;
- private Thread thread;
-
- public TestDiscoveryTriggerer() { }
-
- @Override
- public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
- if (!config.containsKey("DELAY_MS")) {
- throw new IllegalArgumentException("Bad config passed to TestTriggerer");
- }
- }
-
- @Override
- public void start(Consumer<String> trigger) {
- this.trigger = trigger;
- thread = new Thread(() -> {
- while(true) {
- try {
- Thread.sleep(100);
- trigger.accept("Triggered");
- } catch (InterruptedException e) {
- break;
- }
- }
- });
- thread.start();
- }
-
- @Override
- public void stop() {
- if (thread != null) {
- thread.interrupt();
- try {
- thread.join();
- } catch (Exception e) {
- }
- }
- }
- }
-
- private TestBatchSource testBatchSource;
- private TestBatchPushSource testBatchPushSource;
- private BatchSourceConfig testBatchConfig;
- private Map<String, Object> config;
- private Map<String, Object> pushConfig;
- private BatchSourceExecutor<String> batchSourceExecutor;
- private SourceContext context;
- private ConsumerBuilder consumerBuilder;
- private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
- private TypedMessageBuilder<byte[]> messageBuilder;
- private CyclicBarrier discoveryBarrier;
- private Message<byte[]> discoveredTask;
-
- private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
- Map<String, Object> config = new HashMap<>();
- config.put("foo", "bar");
- config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
- config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
- return config;
- }
-
- private static BatchSourceConfig createBatchSourceConfig() {
- BatchSourceConfig testBatchConfig = new BatchSourceConfig();
- testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
- Map<String, Object> triggererConfig = new HashMap<>();
- triggererConfig.put("DELAY_MS", 500);
- testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
- return testBatchConfig;
- }
-
- @BeforeMethod
- public void setUp() throws Exception {
- testBatchSource = new TestBatchSource();
- testBatchPushSource = new TestBatchPushSource();
- batchSourceExecutor = new BatchSourceExecutor<>();
- testBatchConfig = createBatchSourceConfig();
- config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
- pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
- context = Mockito.mock(SourceContext.class);
- Mockito.doReturn("test-function").when(context).getSourceName();
- Mockito.doReturn("test-namespace").when(context).getNamespace();
- Mockito.doReturn("test-tenant").when(context).getTenant();
- Mockito.doReturn(0).when(context).getInstanceId();
- consumerBuilder = Mockito.mock(ConsumerBuilder.class);
- Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any());
- Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any());
- Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
- discoveredTask = Mockito.mock(Message.class);
- consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
- Mockito.doReturn(discoveredTask).when(consumer).receive();
- Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
- Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES);
- messageBuilder = Mockito.mock(TypedMessageBuilder.class);
- Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any());
- Mockito.doReturn(messageBuilder).when(messageBuilder).properties(Mockito.any());
- Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any());
-
- // Discovery
- discoveryBarrier = new CyclicBarrier(2);
- Mockito.doAnswer(new Answer<MessageId>() {
- @Override public MessageId answer(InvocationOnMock invocation) {
- try {
- discoveryBarrier.await();
- } catch (Exception e) {
- throw new RuntimeException();
- }
- return null;
- }
- }).when(messageBuilder).send();
- }
-
- @AfterMethod
- public void cleanUp() throws Exception {
- batchSourceExecutor.close();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
- public void testWithoutRightConfig() throws Exception {
- config.clear();
- batchSourceExecutor.open(config, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
- public void testPushWithoutRightConfig() throws Exception {
- pushConfig.clear();
- batchSourceExecutor.open(pushConfig, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
- public void testWithoutRightTriggerer() throws Exception {
- testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
- config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
- batchSourceExecutor.open(config, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
- public void testPushWithoutRightTriggerer() throws Exception {
- testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
- pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
- batchSourceExecutor.open(pushConfig, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
- public void testWithoutRightTriggererConfig() throws Exception {
- Map<String, Object> badConfig = new HashMap<>();
- badConfig.put("something", "else");
- testBatchConfig.setDiscoveryTriggererConfig(badConfig);
- config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
- batchSourceExecutor.open(config, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
- public void testPushWithoutRightTriggererConfig() throws Exception {
- Map<String, Object> badConfig = new HashMap<>();
- badConfig.put("something", "else");
- testBatchConfig.setDiscoveryTriggererConfig(badConfig);
- pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
- batchSourceExecutor.open(pushConfig, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
- public void testWithoutRightSource() throws Exception {
- config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
- batchSourceExecutor.open(config, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
- public void testPushWithoutRightSource() throws Exception {
- pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
- batchSourceExecutor.open(pushConfig, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
- public void testWithoutRightSourceConfig() throws Exception {
- config.remove("foo");
- config.put("something", "else");
- batchSourceExecutor.open(config, context);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
- public void testPushWithoutRightSourceConfig() throws Exception {
- pushConfig.remove("foo");
- pushConfig.put("something", "else");
- batchSourceExecutor.open(pushConfig, context);
- }
-
- @Test
- public void testOpenWithRightSource() throws Exception {
- batchSourceExecutor.open(config, context);
- }
-
- @Test
- public void testPushOpenWithRightSource() throws Exception {
- batchSourceExecutor.open(pushConfig, context);
- }
-
- @Test
- public void testLifeCycle() throws Exception {
- batchSourceExecutor.open(config, context);
- Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
- for (int i = 0; i < 5; ++i) {
- batchSourceExecutor.read();
- }
- Assert.assertEquals(testBatchSource.getRecordCount(), 6);
- Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
- Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
- }
-
- @Test
- public void testPushLifeCycle() throws Exception {
- batchSourceExecutor.open(pushConfig, context);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
- for (int i = 0; i < 5; ++i) {
- batchSourceExecutor.read();
- }
- Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
- discoveryBarrier.await();
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
- Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
- }
-}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6cea5d7..fd566de 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -39,7 +39,6 @@
</activation>
<modules>
<module>core</module>
- <module>batch</module>
<module>batch-discovery-triggerers</module>
<module>batch-data-generator</module>
<module>common</module>
@@ -75,7 +74,6 @@
<id>core-modules</id>
<modules>
<module>core</module>
- <module>batch</module>
<module>batch-discovery-triggerers</module>
<module>common</module>
<module>twitter</module>