You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/17 22:20:03 UTC
[pulsar] branch master updated: Allow null consume in
BatchPushSource (#7573)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4e1a677 Allow null consume in BatchPushSource (#7573)
4e1a677 is described below
commit 4e1a677171ab40504bd6306dee832cf900ed4472
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Jul 17 15:19:36 2020 -0700
Allow null consume in BatchPushSource (#7573)
* Added upgrade notes
* Allow null message to be passed
* More private impl
* Fix unittest
* Address comments
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../pulsar/io/batch/BatchSourceExecutorTest.java | 145 +++++++++++++++++++--
.../org/apache/pulsar/io/core/BatchPushSource.java | 25 +++-
2 files changed, 151 insertions(+), 19 deletions(-)
diff --git a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
index d9d4112..a9d9ef3 100644
--- a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
+++ b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.SourceContext;
@@ -90,6 +91,46 @@ public class BatchSourceExecutorTest {
}
}
+ public static class TestBatchPushSource extends BatchPushSource<String> {
+ @Getter
+ private static int prepareCount;
+ @Getter
+ private static int discoverCount;
+ @Getter
+ private static int recordCount;
+ private Record record = Mockito.mock(Record.class);
+ public TestBatchPushSource() { }
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext context) throws Exception {
+ if (!config.containsKey("foo")) {
+ throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
+ }
+ }
+
+ @Override
+ public void discover(Consumer<byte[]> taskEater) throws Exception {
+ byte[] retval = new byte[10];
+ discoverCount++;
+ taskEater.accept(retval);
+ }
+
+ @Override
+ public void prepare(byte[] task) throws Exception {
+ prepareCount++;
+ for (int i = 0; i < 5; ++i) {
+ consume(record);
+ ++recordCount;
+ }
+ consume(null);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+
public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
private Consumer<String> trigger;
private Thread thread;
@@ -121,17 +162,21 @@ public class BatchSourceExecutorTest {
@Override
public void stop() {
- thread.interrupt();
- try {
- thread.join();
- } catch (Exception e) {
+ if (thread != null) {
+ thread.interrupt();
+ try {
+ thread.join();
+ } catch (Exception e) {
+ }
}
}
}
private TestBatchSource testBatchSource;
+ private TestBatchPushSource testBatchPushSource;
private BatchSourceConfig testBatchConfig;
private Map<String, Object> config;
+ private Map<String, Object> pushConfig;
private BatchSourceExecutor<String> batchSourceExecutor;
private SourceContext context;
private ConsumerBuilder consumerBuilder;
@@ -140,20 +185,32 @@ public class BatchSourceExecutorTest {
private CyclicBarrier discoveryBarrier;
private Message<byte[]> discoveredTask;
- @BeforeMethod
- public void setUp() throws Exception {
- testBatchSource = new TestBatchSource();
- batchSourceExecutor = new BatchSourceExecutor<>();
- context = Mockito.mock(SourceContext.class);
- config = new HashMap<>();
+ private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
+ Map<String, Object> config = new HashMap<>();
config.put("foo", "bar");
- testBatchConfig = new BatchSourceConfig();
+ config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
+ config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
+ return config;
+ }
+
+ private static BatchSourceConfig createBatchSourceConfig() {
+ BatchSourceConfig testBatchConfig = new BatchSourceConfig();
testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
Map<String, Object> triggererConfig = new HashMap<>();
triggererConfig.put("DELAY_MS", 500);
testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
- config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
- config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestBatchSource.class.getName());
+ return testBatchConfig;
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ testBatchSource = new TestBatchSource();
+ testBatchPushSource = new TestBatchPushSource();
+ batchSourceExecutor = new BatchSourceExecutor<>();
+ testBatchConfig = createBatchSourceConfig();
+ config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
+ pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
+ context = Mockito.mock(SourceContext.class);
Mockito.doReturn("test-function").when(context).getSourceName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
@@ -187,7 +244,9 @@ public class BatchSourceExecutorTest {
}
@AfterMethod
- public void cleanUp() { }
+ public void cleanUp() throws Exception {
+ batchSourceExecutor.close();
+ }
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
public void testWithoutRightConfig() throws Exception {
@@ -195,6 +254,12 @@ public class BatchSourceExecutorTest {
batchSourceExecutor.open(config, context);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
+ public void testPushWithoutRightConfig() throws Exception {
+ pushConfig.clear();
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
public void testWithoutRightTriggerer() throws Exception {
testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
@@ -202,6 +267,13 @@ public class BatchSourceExecutorTest {
batchSourceExecutor.open(config, context);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
+ public void testPushWithoutRightTriggerer() throws Exception {
+ testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
+ pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
public void testWithoutRightTriggererConfig() throws Exception {
Map<String, Object> badConfig = new HashMap<>();
@@ -211,12 +283,27 @@ public class BatchSourceExecutorTest {
batchSourceExecutor.open(config, context);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
+ public void testPushWithoutRightTriggererConfig() throws Exception {
+ Map<String, Object> badConfig = new HashMap<>();
+ badConfig.put("something", "else");
+ testBatchConfig.setDiscoveryTriggererConfig(badConfig);
+ pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
public void testWithoutRightSource() throws Exception {
config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
batchSourceExecutor.open(config, context);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
+ public void testPushWithoutRightSource() throws Exception {
+ pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
public void testWithoutRightSourceConfig() throws Exception {
config.remove("foo");
@@ -224,12 +311,24 @@ public class BatchSourceExecutorTest {
batchSourceExecutor.open(config, context);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
+ public void testPushWithoutRightSourceConfig() throws Exception {
+ pushConfig.remove("foo");
+ pushConfig.put("something", "else");
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
@Test
public void testOpenWithRightSource() throws Exception {
batchSourceExecutor.open(config, context);
}
@Test
+ public void testPushOpenWithRightSource() throws Exception {
+ batchSourceExecutor.open(pushConfig, context);
+ }
+
+ @Test
public void testLifeCycle() throws Exception {
batchSourceExecutor.open(config, context);
Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
@@ -246,4 +345,22 @@ public class BatchSourceExecutorTest {
Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
}
+
+ @Test
+ public void testPushLifeCycle() throws Exception {
+ batchSourceExecutor.open(pushConfig, context);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
+ discoveryBarrier.await();
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+ for (int i = 0; i < 5; ++i) {
+ batchSourceExecutor.read();
+ }
+ Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+ discoveryBarrier.await();
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
+ Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
+ }
}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
index b056238..9f44fd5 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
@@ -20,9 +20,7 @@ package org.apache.pulsar.io.core;
import org.apache.pulsar.functions.api.Record;
-import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
/**
* Pulsar's Batch Push Source interface. Batch Push Sources have the same lifecycle
@@ -32,8 +30,16 @@ import java.util.function.Consumer;
*/
public abstract class BatchPushSource<T> implements BatchSource<T> {
+ private static class NullRecord implements Record {
+ @Override
+ public Object getValue() {
+ return null;
+ }
+ }
+
private LinkedBlockingQueue<Record<T>> queue;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
+ private final NullRecord nullRecord = new NullRecord();
public BatchPushSource() {
this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
@@ -41,17 +47,26 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
@Override
public Record<T> readNext() throws Exception {
- return queue.take();
+ Record<T> record = queue.take();
+ if (record instanceof NullRecord) {
+ return null;
+ } else {
+ return record;
+ }
}
/**
* Send this message to be written to Pulsar.
- *
+ * Pass null if you you are done with this task
* @param record next message from source which should be sent to a Pulsar topic
*/
public void consume(Record<T> record) {
try {
- queue.put(record);
+ if (record != null) {
+ queue.put(record);
+ } else {
+ queue.put(nullRecord);
+ }
} catch (InterruptedException e) {
throw new RuntimeException(e);
}