You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/23 19:57:42 UTC

[incubator-pulsar] branch master updated: fixing problem of functions that have an array as input or output (#1837)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ca79d  fixing problem of functions that have an array as input or output (#1837)
a2ca79d is described below

commit a2ca79dad28ecee4e9123678e5d9ed4652e9e13c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 23 12:57:38 2018 -0700

    fixing problem of functions that have an array as input or output (#1837)
    
    * fixing problem of functions that have an array as input or output
    
    * add additional null checks
---
 pom.xml                                             |  7 +++++++
 pulsar-functions/instance/pom.xml                   |  5 +++++
 .../functions/instance/JavaInstanceRunnable.java    | 21 +++++++++++++--------
 .../apache/pulsar/functions/sink/PulsarSink.java    | 11 +++++++----
 .../pulsar/functions/source/PulsarSource.java       |  9 +++++++--
 pulsar-functions/runtime-shaded/pom.xml             |  3 +++
 6 files changed, 42 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 22506ca..28e769b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@ flexible messaging model and an intuitive client API.</description>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <dockerfile-maven.version>1.3.7</dockerfile-maven.version>
     <typetools.version>0.5.0</typetools.version>
+    <jboss-reflect.version>2.2.1.SP1</jboss-reflect.version>
     <protobuf2.version>2.4.1</protobuf2.version>
     <protobuf3.version>3.5.1</protobuf3.version>
     <grpc.version>1.5.0</grpc.version>
@@ -690,6 +691,12 @@ flexible messaging model and an intuitive client API.</description>
       </dependency>
 
       <dependency>
+        <groupId>org.jboss</groupId>
+        <artifactId>jboss-reflect</artifactId>
+        <version>${jboss-reflect.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-all</artifactId>
         <version>${grpc.version}</version>
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index bcdf780..b6dfb74 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -84,6 +84,11 @@
       <artifactId>typetools</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.jboss</groupId>
+      <artifactId>jboss-reflect</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 5b5e943..a96c703 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -341,16 +341,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     @Override
     public void close() {
-        try {
-            source.close();
-        } catch (Exception e) {
-            log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+        if (source != null) {
+            try {
+                source.close();
+            } catch (Exception e) {
+                log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+
+            }
         }
 
-        try {
-            sink.close();
-        } catch (Exception e) {
-            log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+        if (sink != null) {
+            try {
+                sink.close();
+            } catch (Exception e) {
+                log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+            }
         }
 
         if (null != javaInstance) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4fccb54..8b246cf 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.io.core.RecordContext;
 import org.apache.pulsar.io.core.Sink;
+import org.jboss.util.Classes;
 
 import java.util.Base64;
 import java.util.Map;
@@ -232,14 +233,16 @@ public class PulsarSink<T> implements Sink<T> {
 
     @Override
     public void close() throws Exception {
-        this.pulsarSinkProcessor.close();
-
+        if (this.pulsarSinkProcessor != null) {
+            this.pulsarSinkProcessor.close();
+        }
     }
 
     @VisibleForTesting
     void setupSerDe() throws ClassNotFoundException {
-        Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(
-                this.pulsarSinkConfig.getTypeClassName());
+
+        Class<?> typeArg = Classes.loadClass(this.pulsarSinkConfig.getTypeClassName(),
+                Thread.currentThread().getContextClassLoader());
 
         if (!Void.class.equals(typeArg)) { // return type is not `Void.class`
             if (this.pulsarSinkConfig.getSerDeClassName() == null
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 5cae902..25b0874 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Source;
+import org.jboss.util.Classes;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -124,13 +125,17 @@ public class PulsarSource<T> implements Source<T> {
 
     @Override
     public void close() throws Exception {
-        this.inputConsumer.close();
+        if (this.inputConsumer != null) {
+            this.inputConsumer.close();
+        }
     }
 
     @VisibleForTesting
     void setupSerDe() throws ClassNotFoundException {
 
-        Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarSourceConfig.getTypeClassName());
+        Class<?> typeArg = Classes.loadClass(this.pulsarSourceConfig.getTypeClassName(),
+                Thread.currentThread().getContextClassLoader());
+
         if (Void.class.equals(typeArg)) {
             throw new RuntimeException("Input type of Pulsar Function cannot be Void");
         }
diff --git a/pulsar-functions/runtime-shaded/pom.xml b/pulsar-functions/runtime-shaded/pom.xml
index 1536e21..f50e3d9 100644
--- a/pulsar-functions/runtime-shaded/pom.xml
+++ b/pulsar-functions/runtime-shaded/pom.xml
@@ -132,6 +132,9 @@
                   <include>com.google.googlejavaformat:google-java-format</include>
                   <include>com.google.errorprone:javac</include>
                   <include>net.jodah:typetools</include>
+                  <include>org.jboss:jboss-reflect</include>
+                  <include>org.jboss.logging:jboss-logging-spi</include>
+                  <include>org.jboss:jboss-common-core</include>
                   <include>com.beust:jcommander</include>
                   <include>com.fasterxml.jackson.dataformat:jackson-dataformat-yaml</include>
                   <include>org.yaml:snakeyaml</include>

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.