You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/17 22:20:03 UTC

[pulsar] branch master updated: Allow null consume in BatchPushSource (#7573)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e1a677  Allow null consume in BatchPushSource (#7573)
4e1a677 is described below

commit 4e1a677171ab40504bd6306dee832cf900ed4472
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Jul 17 15:19:36 2020 -0700

    Allow null consume in BatchPushSource (#7573)
    
    * Added upgrade notes
    
    * Allow null message to be passed
    
    * More private impl
    
    * Fix unittest
    
    * Address comments
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../pulsar/io/batch/BatchSourceExecutorTest.java   | 145 +++++++++++++++++++--
 .../org/apache/pulsar/io/core/BatchPushSource.java |  25 +++-
 2 files changed, 151 insertions(+), 19 deletions(-)

diff --git a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
index d9d4112..a9d9ef3 100644
--- a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
+++ b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.functions.api.Record;
 
+import org.apache.pulsar.io.core.BatchPushSource;
 import org.apache.pulsar.io.core.BatchSource;
 import org.apache.pulsar.io.core.BatchSourceTriggerer;
 import org.apache.pulsar.io.core.SourceContext;
@@ -90,6 +91,46 @@ public class BatchSourceExecutorTest {
         }
     }
 
+    public static class TestBatchPushSource extends BatchPushSource<String> {
+        @Getter
+        private static int prepareCount;
+        @Getter
+        private static int discoverCount;
+        @Getter
+        private static int recordCount;
+        private Record record = Mockito.mock(Record.class);
+        public TestBatchPushSource() { }
+
+        @Override
+        public void open(Map<String, Object> config, SourceContext context) throws Exception {
+            if (!config.containsKey("foo")) {
+                throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
+            }
+        }
+
+        @Override
+        public void discover(Consumer<byte[]> taskEater) throws Exception {
+            byte[] retval = new byte[10];
+            discoverCount++;
+            taskEater.accept(retval);
+        }
+
+        @Override
+        public void prepare(byte[] task) throws Exception {
+            prepareCount++;
+            for (int i = 0; i < 5; ++i) {
+                consume(record);
+                ++recordCount;
+            }
+            consume(null);
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+
     public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
         private Consumer<String> trigger;
         private Thread thread;
@@ -121,17 +162,21 @@ public class BatchSourceExecutorTest {
 
         @Override
         public void stop() {
-            thread.interrupt();
-            try {
-                thread.join();
-            } catch (Exception e) {
+            if (thread != null) {
+                thread.interrupt();
+                try {
+                    thread.join();
+                } catch (Exception e) {
+                }
             }
         }
     }
 
     private TestBatchSource testBatchSource;
+    private TestBatchPushSource testBatchPushSource;
     private BatchSourceConfig testBatchConfig;
     private Map<String, Object> config;
+    private Map<String, Object> pushConfig;
     private BatchSourceExecutor<String> batchSourceExecutor;
     private SourceContext context;
     private ConsumerBuilder consumerBuilder;
@@ -140,20 +185,32 @@ public class BatchSourceExecutorTest {
     private CyclicBarrier discoveryBarrier;
     private Message<byte[]> discoveredTask;
 
-    @BeforeMethod
-    public void setUp() throws Exception {
-        testBatchSource = new TestBatchSource();
-        batchSourceExecutor = new BatchSourceExecutor<>();
-        context = Mockito.mock(SourceContext.class);
-        config = new HashMap<>();
+    private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
+        Map<String, Object> config = new HashMap<>();
         config.put("foo", "bar");
-        testBatchConfig = new BatchSourceConfig();
+        config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
+        config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
+        return config;
+    }
+
+    private static BatchSourceConfig createBatchSourceConfig() {
+        BatchSourceConfig testBatchConfig = new BatchSourceConfig();
         testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
         Map<String, Object> triggererConfig = new HashMap<>();
         triggererConfig.put("DELAY_MS", 500);
         testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
-        config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
-        config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestBatchSource.class.getName());
+        return testBatchConfig;
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        testBatchSource = new TestBatchSource();
+        testBatchPushSource = new TestBatchPushSource();
+        batchSourceExecutor = new BatchSourceExecutor<>();
+        testBatchConfig = createBatchSourceConfig();
+        config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
+        pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
+        context = Mockito.mock(SourceContext.class);
         Mockito.doReturn("test-function").when(context).getSourceName();
         Mockito.doReturn("test-namespace").when(context).getNamespace();
         Mockito.doReturn("test-tenant").when(context).getTenant();
@@ -187,7 +244,9 @@ public class BatchSourceExecutorTest {
     }
 
     @AfterMethod
-    public void cleanUp() { }
+    public void cleanUp() throws Exception {
+        batchSourceExecutor.close();
+    }
 
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
     public void testWithoutRightConfig() throws Exception {
@@ -195,6 +254,12 @@ public class BatchSourceExecutorTest {
         batchSourceExecutor.open(config, context);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
+    public void testPushWithoutRightConfig() throws Exception {
+        pushConfig.clear();
+        batchSourceExecutor.open(pushConfig, context);
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
     public void testWithoutRightTriggerer() throws Exception {
         testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
@@ -202,6 +267,13 @@ public class BatchSourceExecutorTest {
         batchSourceExecutor.open(config, context);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
+    public void testPushWithoutRightTriggerer() throws Exception {
+        testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
+        pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+        batchSourceExecutor.open(pushConfig, context);
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
     public void testWithoutRightTriggererConfig() throws Exception {
         Map<String, Object> badConfig = new HashMap<>();
@@ -211,12 +283,27 @@ public class BatchSourceExecutorTest {
         batchSourceExecutor.open(config, context);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
+    public void testPushWithoutRightTriggererConfig() throws Exception {
+        Map<String, Object> badConfig = new HashMap<>();
+        badConfig.put("something", "else");
+        testBatchConfig.setDiscoveryTriggererConfig(badConfig);
+        pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+        batchSourceExecutor.open(pushConfig, context);
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
     public void testWithoutRightSource() throws Exception {
         config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
         batchSourceExecutor.open(config, context);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
+    public void testPushWithoutRightSource() throws Exception {
+        pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
+        batchSourceExecutor.open(pushConfig, context);
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
     public void testWithoutRightSourceConfig() throws Exception {
         config.remove("foo");
@@ -224,12 +311,24 @@ public class BatchSourceExecutorTest {
         batchSourceExecutor.open(config, context);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
+    public void testPushWithoutRightSourceConfig() throws Exception {
+        pushConfig.remove("foo");
+        pushConfig.put("something", "else");
+        batchSourceExecutor.open(pushConfig, context);
+    }
+
     @Test
     public void testOpenWithRightSource() throws Exception {
         batchSourceExecutor.open(config, context);
     }
 
     @Test
+    public void testPushOpenWithRightSource() throws Exception {
+        batchSourceExecutor.open(pushConfig, context);
+    }
+
+    @Test
     public void testLifeCycle() throws Exception {
         batchSourceExecutor.open(config, context);
         Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
@@ -246,4 +345,22 @@ public class BatchSourceExecutorTest {
         Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
         Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
     }
+
+    @Test
+    public void testPushLifeCycle() throws Exception {
+        batchSourceExecutor.open(pushConfig, context);
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
+        discoveryBarrier.await();
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+        for (int i = 0; i < 5; ++i) {
+            batchSourceExecutor.read();
+        }
+        Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+        discoveryBarrier.await();
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
+        Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
+    }
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
index b056238..9f44fd5 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
@@ -20,9 +20,7 @@ package org.apache.pulsar.io.core;
 
 import org.apache.pulsar.functions.api.Record;
 
-import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
 
 /**
  * Pulsar's Batch Push Source interface. Batch Push Sources have the same lifecycle
@@ -32,8 +30,16 @@ import java.util.function.Consumer;
  */
 public abstract class BatchPushSource<T> implements BatchSource<T> {
 
+    private static class NullRecord implements Record {
+        @Override
+        public Object getValue() {
+            return null;
+        }
+    }
+
     private LinkedBlockingQueue<Record<T>> queue;
     private static final int DEFAULT_QUEUE_LENGTH = 1000;
+    private final NullRecord nullRecord = new NullRecord();
 
     public BatchPushSource() {
         this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
@@ -41,17 +47,26 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
 
     @Override
     public Record<T> readNext() throws Exception {
-        return queue.take();
+        Record<T> record = queue.take();
+        if (record instanceof NullRecord) {
+            return null;
+        } else {
+            return record;
+        }
     }
 
     /**
      * Send this message to be written to Pulsar.
-     *
+     * Pass null if you you are done with this task
      * @param record next message from source which should be sent to a Pulsar topic
      */
     public void consume(Record<T> record) {
         try {
-            queue.put(record);
+            if (record != null) {
+                queue.put(record);
+            } else {
+                queue.put(nullRecord);
+            }
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }