You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/08/03 17:59:47 UTC

[pulsar] branch master updated: Fix various issues with batch source (#7716)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e5d66c  Fix various issues with batch source (#7716)
6e5d66c is described below

commit 6e5d66c39d1597a99256665b14a1209379bcbc28
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Aug 3 10:59:24 2020 -0700

    Fix various issues with batch source (#7716)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../functions/instance/JavaInstanceRunnable.java   |  15 +-
 .../source}/batch/BatchSourceExecutor.java         |  31 +-
 .../source/batch/BatchSourceExecutorTest.java      | 366 +++++++++++++++++++++
 .../pulsar/functions/utils/FunctionCommon.java     |  18 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  47 ++-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  21 +-
 pulsar-io/batch-data-generator/pom.xml             |  12 -
 pulsar-io/batch/pom.xml                            |  52 ---
 .../pulsar/io/batch/BatchSourceExecutorTest.java   | 366 ---------------------
 pulsar-io/pom.xml                                  |   2 -
 10 files changed, 458 insertions(+), 472 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 06dbf19..29cb71f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -70,6 +70,7 @@ import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
 import org.apache.pulsar.common.util.Reflections;
+import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.io.core.Sink;
@@ -745,9 +746,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             }
             object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
         } else {
-            object = Reflections.createInstance(
-                    sourceSpec.getClassName(),
-                    this.functionClassLoader);
+
+            // check if source is a batch source
+            if (sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName())) {
+                object = Reflections.createInstance(
+                  sourceSpec.getClassName(),
+                  this.instanceClassLoader);
+            } else {
+                object = Reflections.createInstance(
+                  sourceSpec.getClassName(),
+                  this.functionClassLoader);
+            }
         }
 
         Class<?>[] typeArgs;
diff --git a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
similarity index 94%
rename from pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 04d87fa..d1cac03 100644
--- a/pulsar-io/batch/src/main/java/org/apache/pulsar/io/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -16,18 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.batch;
+package org.apache.pulsar.functions.source.batch;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.io.BatchSourceConfig;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.io.core.*;
+import org.apache.pulsar.io.core.BatchSource;
+import org.apache.pulsar.io.core.BatchSourceTriggerer;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -162,20 +169,6 @@ public class BatchSourceExecutor<T> implements Source<T> {
     }
   }
 
-  public org.apache.pulsar.functions.api.Record<T> readInternal() {
-    try {
-      Record<T> record = batchSource.readNext();
-      log.info("Record: {}", record);
-      if (record != null) {
-        return record;
-      }
-    } catch (Exception e) {
-      log.error("Error on read", e);
-      throw new RuntimeException(e);
-    }
-    return null;
-  }
-
   @Override
   public void close() throws Exception {
     this.stop();
@@ -235,6 +228,6 @@ public class BatchSourceExecutor<T> implements Source<T> {
     currentTask = intermediateTopicConsumer.receive();
     return;
   }
-  
+
 }
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
new file mode 100644
index 0000000..feeb200
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source.batch;
+
+
+import com.google.gson.Gson;
+import lombok.Getter;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.io.BatchSourceConfig;
+import org.apache.pulsar.functions.api.Record;
+
+import org.apache.pulsar.io.core.BatchPushSource;
+import org.apache.pulsar.io.core.BatchSource;
+import org.apache.pulsar.io.core.BatchSourceTriggerer;
+import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.function.Consumer;
+
+/**
+ * Unit tests for {@link org.apache.pulsar.functions.source.batch.BatchSourceExecutor}
+ */
+public class BatchSourceExecutorTest {
+
+  public static class TestBatchSource implements BatchSource<String> {
+    @Getter
+    private static int prepareCount;
+    @Getter
+    private static int discoverCount;
+    @Getter
+    private static int recordCount;
+    private Record record = Mockito.mock(Record.class);
+    public TestBatchSource() { }
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext context) throws Exception {
+      if (!config.containsKey("foo")) {
+        throw new IllegalArgumentException("Bad config passed to TestBatchSource");
+      }
+    }
+
+    @Override
+    public void discover(Consumer<byte[]> taskEater) throws Exception {
+      byte[] retval = new byte[10];
+      discoverCount++;
+      taskEater.accept(retval);
+    }
+
+    @Override
+    public void prepare(byte[] task) throws Exception {
+      prepareCount++;
+    }
+
+    @Override
+    public Record<String> readNext() throws Exception {
+      if (++recordCount % 5 == 0) {
+        return null;
+      } else {
+        return record;
+      }
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+  }
+
+  public static class TestBatchPushSource extends BatchPushSource<String> {
+    @Getter
+    private static int prepareCount;
+    @Getter
+    private static int discoverCount;
+    @Getter
+    private static int recordCount;
+    private Record record = Mockito.mock(Record.class);
+    public TestBatchPushSource() { }
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext context) throws Exception {
+      if (!config.containsKey("foo")) {
+        throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
+      }
+    }
+
+    @Override
+    public void discover(Consumer<byte[]> taskEater) throws Exception {
+      byte[] retval = new byte[10];
+      discoverCount++;
+      taskEater.accept(retval);
+    }
+
+    @Override
+    public void prepare(byte[] task) throws Exception {
+      prepareCount++;
+      for (int i = 0; i < 5; ++i) {
+        consume(record);
+        ++recordCount;
+      }
+      consume(null);
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+  }
+
+  public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
+    private Consumer<String> trigger;
+    private Thread thread;
+
+    public TestDiscoveryTriggerer() { }
+
+    @Override
+    public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+      if (!config.containsKey("DELAY_MS")) {
+        throw new IllegalArgumentException("Bad config passed to TestTriggerer");
+      }
+    }
+
+    @Override
+    public void start(Consumer<String> trigger) {
+      this.trigger = trigger;
+      thread = new Thread(() -> {
+        while(true) {
+          try {
+            Thread.sleep(100);
+            trigger.accept("Triggered");
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      });
+      thread.start();
+    }
+
+    @Override
+    public void stop() {
+      if (thread != null) {
+        thread.interrupt();
+        try {
+          thread.join();
+        } catch (Exception e) {
+        }
+      }
+    }
+  }
+
+  private TestBatchSource testBatchSource;
+  private TestBatchPushSource testBatchPushSource;
+  private BatchSourceConfig testBatchConfig;
+  private Map<String, Object> config;
+  private Map<String, Object> pushConfig;
+  private BatchSourceExecutor<String> batchSourceExecutor;
+  private SourceContext context;
+  private ConsumerBuilder consumerBuilder;
+  private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+  private TypedMessageBuilder<byte[]> messageBuilder;
+  private CyclicBarrier discoveryBarrier;
+  private Message<byte[]> discoveredTask;
+
+  private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
+    Map<String, Object> config = new HashMap<>();
+    config.put("foo", "bar");
+    config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
+    config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
+    return config;
+  }
+
+  private static BatchSourceConfig createBatchSourceConfig() {
+    BatchSourceConfig testBatchConfig = new BatchSourceConfig();
+    testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
+    Map<String, Object> triggererConfig = new HashMap<>();
+    triggererConfig.put("DELAY_MS", 500);
+    testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
+    return testBatchConfig;
+  }
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    testBatchSource = new TestBatchSource();
+    testBatchPushSource = new TestBatchPushSource();
+    batchSourceExecutor = new BatchSourceExecutor<>();
+    testBatchConfig = createBatchSourceConfig();
+    config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
+    pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
+    context = Mockito.mock(SourceContext.class);
+    Mockito.doReturn("test-function").when(context).getSourceName();
+    Mockito.doReturn("test-namespace").when(context).getNamespace();
+    Mockito.doReturn("test-tenant").when(context).getTenant();
+    Mockito.doReturn(0).when(context).getInstanceId();
+    consumerBuilder = Mockito.mock(ConsumerBuilder.class);
+    Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any());
+    Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any());
+    Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
+    discoveredTask = Mockito.mock(Message.class);
+    consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
+    Mockito.doReturn(discoveredTask).when(consumer).receive();
+    Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+    Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES);
+    messageBuilder = Mockito.mock(TypedMessageBuilder.class);
+    Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any());
+    Mockito.doReturn(messageBuilder).when(messageBuilder).properties(Mockito.any());
+    Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any());
+
+    // Discovery
+    discoveryBarrier = new CyclicBarrier(2);
+    Mockito.doAnswer(new Answer<MessageId>() {
+      @Override public MessageId answer(InvocationOnMock invocation) {
+        try {
+          discoveryBarrier.await();
+        } catch (Exception e) {
+          throw new RuntimeException();
+        }
+        return null;
+      }
+    }).when(messageBuilder).send();
+  }
+
+  @AfterMethod
+  public void cleanUp() throws Exception {
+    batchSourceExecutor.close();
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
+  public void testWithoutRightConfig() throws Exception {
+    config.clear();
+    batchSourceExecutor.open(config, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
+  public void testPushWithoutRightConfig() throws Exception {
+    pushConfig.clear();
+    batchSourceExecutor.open(pushConfig, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
+  public void testWithoutRightTriggerer() throws Exception {
+    testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
+    config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+    batchSourceExecutor.open(config, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
+  public void testPushWithoutRightTriggerer() throws Exception {
+    testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
+    pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+    batchSourceExecutor.open(pushConfig, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
+  public void testWithoutRightTriggererConfig() throws Exception {
+    Map<String, Object> badConfig = new HashMap<>();
+    badConfig.put("something", "else");
+    testBatchConfig.setDiscoveryTriggererConfig(badConfig);
+    config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+    batchSourceExecutor.open(config, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
+  public void testPushWithoutRightTriggererConfig() throws Exception {
+    Map<String, Object> badConfig = new HashMap<>();
+    badConfig.put("something", "else");
+    testBatchConfig.setDiscoveryTriggererConfig(badConfig);
+    pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
+    batchSourceExecutor.open(pushConfig, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
+  public void testWithoutRightSource() throws Exception {
+    config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
+    batchSourceExecutor.open(config, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
+  public void testPushWithoutRightSource() throws Exception {
+    pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
+    batchSourceExecutor.open(pushConfig, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
+  public void testWithoutRightSourceConfig() throws Exception {
+    config.remove("foo");
+    config.put("something", "else");
+    batchSourceExecutor.open(config, context);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
+  public void testPushWithoutRightSourceConfig() throws Exception {
+    pushConfig.remove("foo");
+    pushConfig.put("something", "else");
+    batchSourceExecutor.open(pushConfig, context);
+  }
+
+  @Test
+  public void testOpenWithRightSource() throws Exception {
+    batchSourceExecutor.open(config, context);
+  }
+
+  @Test
+  public void testPushOpenWithRightSource() throws Exception {
+    batchSourceExecutor.open(pushConfig, context);
+  }
+
+  @Test
+  public void testLifeCycle() throws Exception {
+    batchSourceExecutor.open(config, context);
+    Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
+    discoveryBarrier.await();
+    Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
+    Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
+    for (int i = 0; i < 5; ++i) {
+      batchSourceExecutor.read();
+    }
+    Assert.assertEquals(testBatchSource.getRecordCount(), 6);
+    Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
+    Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
+    discoveryBarrier.await();
+    Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
+    Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
+  }
+
+  @Test
+  public void testPushLifeCycle() throws Exception {
+    batchSourceExecutor.open(pushConfig, context);
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
+    discoveryBarrier.await();
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+    for (int i = 0; i < 5; ++i) {
+      batchSourceExecutor.read();
+    }
+    Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
+    discoveryBarrier.await();
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
+    Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
+  }
+}
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 5a68855..b3d8fdf 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.WindowFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
+import org.apache.pulsar.io.core.BatchSource;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 
@@ -186,14 +187,21 @@ public class FunctionCommon {
         throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
     }
 
-
     public static Class<?> getSourceType(String className, ClassLoader classLoader) throws ClassNotFoundException {
+        return getSourceType(classLoader.loadClass(className));
+    }
 
-        Class userClass = classLoader.loadClass(className);
-
-        Class<?> typeArg = TypeResolver.resolveRawArgument(Source.class, userClass);
+    public static Class<?>getSourceType(Class sourceClass) {
 
-        return typeArg;
+        if (Source.class.isAssignableFrom(sourceClass)) {
+            return TypeResolver.resolveRawArgument(Source.class, sourceClass);
+        } else if (BatchSource.class.isAssignableFrom(sourceClass)) {
+            return TypeResolver.resolveRawArgument(BatchSource.class, sourceClass);
+        } else {
+            throw new IllegalArgumentException(
+              String.format("Source class %s does not implement the correct interface",
+                sourceClass.getName()));
+        }
     }
 
     public static Class<?> getSinkType(String className, ClassLoader classLoader) throws ClassNotFoundException {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 0296bcf..c8e0764 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -26,6 +26,7 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.BatchSourceConfig;
@@ -41,6 +42,8 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.io.core.BatchSource;
+import org.apache.pulsar.io.core.Source;
 
 import java.io.File;
 import java.io.IOException;
@@ -112,7 +115,7 @@ public class SourceConfigUtils {
         if (sourceConfig.getBatchSourceConfig() != null) {
             configs.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(sourceConfig.getBatchSourceConfig()));
             configs.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, sourceSpecBuilder.getClassName());
-            sourceSpecBuilder.setClassName("org.apache.pulsar.io.batch.BatchSourceExecutor");
+            sourceSpecBuilder.setClassName("org.apache.pulsar.functions.source.batch.BatchSourceExecutor");
         }
 
         sourceSpecBuilder.setConfigs(new Gson().toJson(configs));
@@ -296,7 +299,7 @@ public class SourceConfigUtils {
                 validateConnectorConfig(sourceConfig, (NarClassLoader)  narClassLoader);
             }
             try {
-                typeArg = getSourceType(sourceClassName, narClassLoader);
+                narClassLoader.loadClass(sourceClassName);
                 classLoader = narClassLoader;
             } catch (ClassNotFoundException | NoClassDefFoundError e) {
                 throw new IllegalArgumentException(
@@ -307,13 +310,14 @@ public class SourceConfigUtils {
             // if source class name is provided, we need to try to load it as a JAR and as a NAR.
             if (jarClassLoader != null) {
                 try {
-                    typeArg = getSourceType(sourceClassName, jarClassLoader);
+                    jarClassLoader.loadClass(sourceClassName);
                     classLoader = jarClassLoader;
                 } catch (ClassNotFoundException | NoClassDefFoundError e) {
                     // class not found in JAR try loading as a NAR and searching for the class
                     if (narClassLoader != null) {
+
                         try {
-                            typeArg = getSourceType(sourceClassName, narClassLoader);
+                            narClassLoader.loadClass(sourceClassName);
                             classLoader = narClassLoader;
                         } catch (ClassNotFoundException | NoClassDefFoundError e1) {
                             throw new IllegalArgumentException(
@@ -332,7 +336,7 @@ public class SourceConfigUtils {
                     validateConnectorConfig(sourceConfig, (NarClassLoader)  narClassLoader);
                 }
                 try {
-                    typeArg = getSourceType(sourceClassName, narClassLoader);
+                    narClassLoader.loadClass(sourceClassName);
                     classLoader = narClassLoader;
                 } catch (ClassNotFoundException | NoClassDefFoundError e1) {
                     throw new IllegalArgumentException(
@@ -354,6 +358,34 @@ public class SourceConfigUtils {
             }
         }
 
+        // check if source implements the correct interfaces
+        Class sourceClass;
+        try {
+            sourceClass = classLoader.loadClass(sourceClassName);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException(
+              String.format("Source class %s not found in class loader", sourceClassName, e));
+        }
+
+        if (!Source.class.isAssignableFrom(sourceClass) && !BatchSource.class.isAssignableFrom(sourceClass)) {
+            throw new IllegalArgumentException(
+              String.format("Source class %s does not implement the correct interface",
+                sourceClass.getName()));
+        }
+
+        if (BatchSource.class.isAssignableFrom(sourceClass)) {
+            if (sourceConfig.getBatchSourceConfig() != null) {
+                validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
+            } else {
+                throw new IllegalArgumentException(
+                  String.format("Source class %s implements %s but batch source source config is not specified",
+                    sourceClass.getName(), BatchSource.class.getName()));
+            }
+        }
+
+        // extract type from source class
+        typeArg = getSourceType(sourceClass);
+
         // Only one of serdeClassName or schemaType should be set
         if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
             throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
@@ -366,8 +398,9 @@ public class SourceConfigUtils {
             ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, classLoader, false);
         }
 
-        if (sourceConfig.getBatchSourceConfig() != null) {
-            validateBatchSourceConfig(sourceConfig.getBatchSourceConfig());
+        if (typeArg.equals(TypeResolver.Unknown.class)) {
+            throw new IllegalArgumentException(
+              String.format("Failed to resolve type for Source class %s", sourceClassName));
         }
 
         return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index fe9a117..cf6945f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -815,6 +815,9 @@ public class SourceApiV3ResourceTest {
             Integer parallelism,
             String expectedError) throws Exception {
 
+        NarClassLoader classLoader = mock(NarClassLoader.class);
+        doReturn(TwitterFireHose.class).when(classLoader).loadClass(eq(TwitterFireHose.class.getName()));
+
         mockStatic(ConnectorUtils.class);
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
@@ -824,9 +827,9 @@ public class SourceApiV3ResourceTest {
         mockStatic(FunctionCommon.class);
         PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
         doReturn(String.class).when(FunctionCommon.class);
-        FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+        FunctionCommon.getSourceType(eq(TwitterFireHose.class));
 
-        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        doReturn(classLoader).when(FunctionCommon.class);
         FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -884,6 +887,9 @@ public class SourceApiV3ResourceTest {
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
 
+        NarClassLoader classLoader = mock(NarClassLoader.class);
+        doReturn(TwitterFireHose.class).when(classLoader).loadClass(eq(TwitterFireHose.class.getName()));
+
         mockStatic(ConnectorUtils.class);
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
@@ -893,9 +899,9 @@ public class SourceApiV3ResourceTest {
         mockStatic(FunctionCommon.class);
         PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
         doReturn(String.class).when(FunctionCommon.class);
-        FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+        FunctionCommon.getSourceType(eq(TwitterFireHose.class));
 
-        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        doReturn(classLoader).when(FunctionCommon.class);
         FunctionCommon.extractNarClassLoader(any(), any(File.class), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -974,6 +980,9 @@ public class SourceApiV3ResourceTest {
         sourceConfig.setClassName(className);
         sourceConfig.setParallelism(parallelism);
 
+        NarClassLoader classLoader = mock(NarClassLoader.class);
+        doReturn(TwitterFireHose.class).when(classLoader).loadClass(eq(TwitterFireHose.class.getName()));
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
         mockStatic(ConnectorUtils.class);
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
@@ -983,10 +992,10 @@ public class SourceApiV3ResourceTest {
 
         mockStatic(FunctionCommon.class);
         doReturn(String.class).when(FunctionCommon.class);
-        FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+        FunctionCommon.getSourceType(eq(TwitterFireHose.class));
         PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
-        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        doReturn(classLoader).when(FunctionCommon.class);
         FunctionCommon.extractNarClassLoader(any(), any(), any());
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
diff --git a/pulsar-io/batch-data-generator/pom.xml b/pulsar-io/batch-data-generator/pom.xml
index a4d8b80a2..91bc6aa 100644
--- a/pulsar-io/batch-data-generator/pom.xml
+++ b/pulsar-io/batch-data-generator/pom.xml
@@ -40,23 +40,11 @@
 
         <dependency>
             <groupId>${project.groupId}</groupId>
-            <artifactId>pulsar-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>${project.groupId}</groupId>
             <artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>pulsar-io-batch</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>io.codearte.jfairy</groupId>
             <artifactId>jfairy</artifactId>
             <version>0.5.9</version>
diff --git a/pulsar-io/batch/pom.xml b/pulsar-io/batch/pom.xml
deleted file mode 100644
index 4558030..0000000
--- a/pulsar-io/batch/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-io</artifactId>
-    <version>2.7.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-io-batch</artifactId>
-  <name>Pulsar IO :: Batch</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-functions-utils</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-  </dependencies>
-
-</project>
diff --git a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java b/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
deleted file mode 100644
index a9d9ef3..0000000
--- a/pulsar-io/batch/src/test/java/org/apache/pulsar/io/batch/BatchSourceExecutorTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.batch;
-
-
-import com.google.gson.Gson;
-import lombok.Getter;
-import org.apache.pulsar.client.api.*;
-import org.apache.pulsar.common.io.BatchSourceConfig;
-import org.apache.pulsar.functions.api.Record;
-
-import org.apache.pulsar.io.core.BatchPushSource;
-import org.apache.pulsar.io.core.BatchSource;
-import org.apache.pulsar.io.core.BatchSourceTriggerer;
-import org.apache.pulsar.io.core.SourceContext;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CyclicBarrier;
-import java.util.function.Consumer;
-
-/**
- * Unit tests for {@link org.apache.pulsar.io.batch.BatchSourceExecutor}
- */
-public class BatchSourceExecutorTest {
-
-    public static class TestBatchSource implements BatchSource<String> {
-        @Getter
-        private static int prepareCount;
-        @Getter
-        private static int discoverCount;
-        @Getter
-        private static int recordCount;
-        private Record record = Mockito.mock(Record.class);
-        public TestBatchSource() { }
-
-        @Override
-        public void open(Map<String, Object> config, SourceContext context) throws Exception {
-            if (!config.containsKey("foo")) {
-                throw new IllegalArgumentException("Bad config passed to TestBatchSource");
-            }
-        }
-
-        @Override
-        public void discover(Consumer<byte[]> taskEater) throws Exception {
-            byte[] retval = new byte[10];
-            discoverCount++;
-            taskEater.accept(retval);
-        }
-
-        @Override
-        public void prepare(byte[] task) throws Exception {
-            prepareCount++;
-        }
-
-        @Override
-        public Record<String> readNext() throws Exception {
-            if (++recordCount % 5 == 0) {
-                return null;
-            } else {
-                return record;
-            }
-        }
-
-        @Override
-        public void close() throws Exception {
-
-        }
-    }
-
-    public static class TestBatchPushSource extends BatchPushSource<String> {
-        @Getter
-        private static int prepareCount;
-        @Getter
-        private static int discoverCount;
-        @Getter
-        private static int recordCount;
-        private Record record = Mockito.mock(Record.class);
-        public TestBatchPushSource() { }
-
-        @Override
-        public void open(Map<String, Object> config, SourceContext context) throws Exception {
-            if (!config.containsKey("foo")) {
-                throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
-            }
-        }
-
-        @Override
-        public void discover(Consumer<byte[]> taskEater) throws Exception {
-            byte[] retval = new byte[10];
-            discoverCount++;
-            taskEater.accept(retval);
-        }
-
-        @Override
-        public void prepare(byte[] task) throws Exception {
-            prepareCount++;
-            for (int i = 0; i < 5; ++i) {
-                consume(record);
-                ++recordCount;
-            }
-            consume(null);
-        }
-
-        @Override
-        public void close() throws Exception {
-
-        }
-    }
-
-    public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
-        private Consumer<String> trigger;
-        private Thread thread;
-
-        public TestDiscoveryTriggerer() { }
-
-        @Override
-        public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-            if (!config.containsKey("DELAY_MS")) {
-                throw new IllegalArgumentException("Bad config passed to TestTriggerer");
-            }
-        }
-
-        @Override
-        public void start(Consumer<String> trigger) {
-            this.trigger = trigger;
-            thread = new Thread(() -> {
-                while(true) {
-                    try {
-                        Thread.sleep(100);
-                        trigger.accept("Triggered");
-                    } catch (InterruptedException e) {
-                        break;
-                    }
-                }
-            });
-            thread.start();
-        }
-
-        @Override
-        public void stop() {
-            if (thread != null) {
-                thread.interrupt();
-                try {
-                    thread.join();
-                } catch (Exception e) {
-                }
-            }
-        }
-    }
-
-    private TestBatchSource testBatchSource;
-    private TestBatchPushSource testBatchPushSource;
-    private BatchSourceConfig testBatchConfig;
-    private Map<String, Object> config;
-    private Map<String, Object> pushConfig;
-    private BatchSourceExecutor<String> batchSourceExecutor;
-    private SourceContext context;
-    private ConsumerBuilder consumerBuilder;
-    private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
-    private TypedMessageBuilder<byte[]> messageBuilder;
-    private CyclicBarrier discoveryBarrier;
-    private Message<byte[]> discoveredTask;
-
-    private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
-        Map<String, Object> config = new HashMap<>();
-        config.put("foo", "bar");
-        config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
-        config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
-        return config;
-    }
-
-    private static BatchSourceConfig createBatchSourceConfig() {
-        BatchSourceConfig testBatchConfig = new BatchSourceConfig();
-        testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
-        Map<String, Object> triggererConfig = new HashMap<>();
-        triggererConfig.put("DELAY_MS", 500);
-        testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
-        return testBatchConfig;
-    }
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        testBatchSource = new TestBatchSource();
-        testBatchPushSource = new TestBatchPushSource();
-        batchSourceExecutor = new BatchSourceExecutor<>();
-        testBatchConfig = createBatchSourceConfig();
-        config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
-        pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
-        context = Mockito.mock(SourceContext.class);
-        Mockito.doReturn("test-function").when(context).getSourceName();
-        Mockito.doReturn("test-namespace").when(context).getNamespace();
-        Mockito.doReturn("test-tenant").when(context).getTenant();
-        Mockito.doReturn(0).when(context).getInstanceId();
-        consumerBuilder = Mockito.mock(ConsumerBuilder.class);
-        Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any());
-        Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any());
-        Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
-        discoveredTask = Mockito.mock(Message.class);
-        consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
-        Mockito.doReturn(discoveredTask).when(consumer).receive();
-        Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
-        Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES);
-        messageBuilder = Mockito.mock(TypedMessageBuilder.class);
-        Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any());
-        Mockito.doReturn(messageBuilder).when(messageBuilder).properties(Mockito.any());
-        Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any());
-
-        // Discovery
-        discoveryBarrier = new CyclicBarrier(2);
-        Mockito.doAnswer(new Answer<MessageId>() {
-            @Override public MessageId answer(InvocationOnMock invocation) {
-                try {
-                    discoveryBarrier.await();
-                } catch (Exception e) {
-                    throw new RuntimeException();
-                }
-                return null;
-            }
-        }).when(messageBuilder).send();
-    }
-
-    @AfterMethod
-    public void cleanUp() throws Exception {
-        batchSourceExecutor.close();
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
-    public void testWithoutRightConfig() throws Exception {
-        config.clear();
-        batchSourceExecutor.open(config, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
-    public void testPushWithoutRightConfig() throws Exception {
-        pushConfig.clear();
-        batchSourceExecutor.open(pushConfig, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
-    public void testWithoutRightTriggerer() throws Exception {
-        testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
-        config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
-        batchSourceExecutor.open(config, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
-    public void testPushWithoutRightTriggerer() throws Exception {
-        testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
-        pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
-        batchSourceExecutor.open(pushConfig, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
-    public void testWithoutRightTriggererConfig() throws Exception {
-        Map<String, Object> badConfig = new HashMap<>();
-        badConfig.put("something", "else");
-        testBatchConfig.setDiscoveryTriggererConfig(badConfig);
-        config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
-        batchSourceExecutor.open(config, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
-    public void testPushWithoutRightTriggererConfig() throws Exception {
-        Map<String, Object> badConfig = new HashMap<>();
-        badConfig.put("something", "else");
-        testBatchConfig.setDiscoveryTriggererConfig(badConfig);
-        pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
-        batchSourceExecutor.open(pushConfig, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
-    public void testWithoutRightSource() throws Exception {
-        config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
-        batchSourceExecutor.open(config, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
-    public void testPushWithoutRightSource() throws Exception {
-        pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
-        batchSourceExecutor.open(pushConfig, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
-    public void testWithoutRightSourceConfig() throws Exception {
-        config.remove("foo");
-        config.put("something", "else");
-        batchSourceExecutor.open(config, context);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
-    public void testPushWithoutRightSourceConfig() throws Exception {
-        pushConfig.remove("foo");
-        pushConfig.put("something", "else");
-        batchSourceExecutor.open(pushConfig, context);
-    }
-
-    @Test
-    public void testOpenWithRightSource() throws Exception {
-        batchSourceExecutor.open(config, context);
-    }
-
-    @Test
-    public void testPushOpenWithRightSource() throws Exception {
-        batchSourceExecutor.open(pushConfig, context);
-    }
-
-    @Test
-    public void testLifeCycle() throws Exception {
-        batchSourceExecutor.open(config, context);
-        Assert.assertTrue(testBatchSource.getDiscoverCount() < 1);
-        discoveryBarrier.await();
-        Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
-        Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
-        for (int i = 0; i < 5; ++i) {
-            batchSourceExecutor.read();
-        }
-        Assert.assertEquals(testBatchSource.getRecordCount(), 6);
-        Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1);
-        Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2);
-        discoveryBarrier.await();
-        Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
-        Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
-    }
-
-    @Test
-    public void testPushLifeCycle() throws Exception {
-        batchSourceExecutor.open(pushConfig, context);
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
-        discoveryBarrier.await();
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
-        for (int i = 0; i < 5; ++i) {
-            batchSourceExecutor.read();
-        }
-        Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
-        discoveryBarrier.await();
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
-        Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
-    }
-}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6cea5d7..fd566de 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -39,7 +39,6 @@
       </activation>
       <modules>
         <module>core</module>
-        <module>batch</module>
         <module>batch-discovery-triggerers</module>
         <module>batch-data-generator</module>
         <module>common</module>
@@ -75,7 +74,6 @@
       <id>core-modules</id>
       <modules>
         <module>core</module>
-        <module>batch</module>
         <module>batch-discovery-triggerers</module>
         <module>common</module>
         <module>twitter</module>