You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/25 04:42:48 UTC

[pulsar] branch master updated: fix: batch source able to be submitted (#7659)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee39e40  fix: batch source able to be submitted (#7659)
ee39e40 is described below

commit ee39e40aa83c7d6779ac35509f0ae38ee167d23e
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jul 24 21:42:25 2020 -0700

    fix: batch source able to be submitted (#7659)
    
    * fix: batch source able to be submitted
    
    * fix logic
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index fcddf5c..bdcab9d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.io.core.BatchSource;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -65,9 +66,9 @@ public class ConnectorUtils {
         try {
             // Try to load source class and check it implements Source interface
             Class sourceClass = ncl.loadClass(conf.getSourceClass());
-            if (!(Source.class.isAssignableFrom(sourceClass))) {
-                throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
-                        + Source.class.getName());
+            if (!(Source.class.isAssignableFrom(sourceClass) || BatchSource.class.isAssignableFrom(sourceClass))) {
+                throw new IOException(String.format("Class %s does not implement interface %s or %s",
+                  conf.getSourceClass(), Source.class.getName(), BatchSource.class.getName()));
             }
         } catch (Throwable t) {
             Exceptions.rethrowIOException(t);