You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/23 19:57:42 UTC
[incubator-pulsar] branch master updated: fixing problem of
functions that have an array as input or output (#1837)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a2ca79d fixing problem of functions that have an array as input or output (#1837)
a2ca79d is described below
commit a2ca79dad28ecee4e9123678e5d9ed4652e9e13c
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 23 12:57:38 2018 -0700
fixing problem of functions that have an array as input or output (#1837)
* fixing problem of functions that have an array as input or output
* add additional null checks
---
pom.xml | 7 +++++++
pulsar-functions/instance/pom.xml | 5 +++++
.../functions/instance/JavaInstanceRunnable.java | 21 +++++++++++++--------
.../apache/pulsar/functions/sink/PulsarSink.java | 11 +++++++----
.../pulsar/functions/source/PulsarSource.java | 9 +++++++--
pulsar-functions/runtime-shaded/pom.xml | 3 +++
6 files changed, 42 insertions(+), 14 deletions(-)
diff --git a/pom.xml b/pom.xml
index 22506ca..28e769b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.3.7</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
+ <jboss-reflect.version>2.2.1.SP1</jboss-reflect.version>
<protobuf2.version>2.4.1</protobuf2.version>
<protobuf3.version>3.5.1</protobuf3.version>
<grpc.version>1.5.0</grpc.version>
@@ -690,6 +691,12 @@ flexible messaging model and an intuitive client API.</description>
</dependency>
<dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jboss-reflect</artifactId>
+ <version>${jboss-reflect.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index bcdf780..b6dfb74 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -84,6 +84,11 @@
<artifactId>typetools</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jboss-reflect</artifactId>
+ </dependency>
+
</dependencies>
</project>
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 5b5e943..a96c703 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -341,16 +341,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
@Override
public void close() {
- try {
- source.close();
- } catch (Exception e) {
- log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ if (source != null) {
+ try {
+ source.close();
+ } catch (Exception e) {
+ log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+
+ }
}
- try {
- sink.close();
- } catch (Exception e) {
- log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ if (sink != null) {
+ try {
+ sink.close();
+ } catch (Exception e) {
+ log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ }
}
if (null != javaInstance) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4fccb54..8b246cf 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.io.core.RecordContext;
import org.apache.pulsar.io.core.Sink;
+import org.jboss.util.Classes;
import java.util.Base64;
import java.util.Map;
@@ -232,14 +233,16 @@ public class PulsarSink<T> implements Sink<T> {
@Override
public void close() throws Exception {
- this.pulsarSinkProcessor.close();
-
+ if (this.pulsarSinkProcessor != null) {
+ this.pulsarSinkProcessor.close();
+ }
}
@VisibleForTesting
void setupSerDe() throws ClassNotFoundException {
- Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(
- this.pulsarSinkConfig.getTypeClassName());
+
+ Class<?> typeArg = Classes.loadClass(this.pulsarSinkConfig.getTypeClassName(),
+ Thread.currentThread().getContextClassLoader());
if (!Void.class.equals(typeArg)) { // return type is not `Void.class`
if (this.pulsarSinkConfig.getSerDeClassName() == null
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 5cae902..25b0874 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Source;
+import org.jboss.util.Classes;
import java.util.ArrayList;
import java.util.HashMap;
@@ -124,13 +125,17 @@ public class PulsarSource<T> implements Source<T> {
@Override
public void close() throws Exception {
- this.inputConsumer.close();
+ if (this.inputConsumer != null) {
+ this.inputConsumer.close();
+ }
}
@VisibleForTesting
void setupSerDe() throws ClassNotFoundException {
- Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarSourceConfig.getTypeClassName());
+ Class<?> typeArg = Classes.loadClass(this.pulsarSourceConfig.getTypeClassName(),
+ Thread.currentThread().getContextClassLoader());
+
if (Void.class.equals(typeArg)) {
throw new RuntimeException("Input type of Pulsar Function cannot be Void");
}
diff --git a/pulsar-functions/runtime-shaded/pom.xml b/pulsar-functions/runtime-shaded/pom.xml
index 1536e21..f50e3d9 100644
--- a/pulsar-functions/runtime-shaded/pom.xml
+++ b/pulsar-functions/runtime-shaded/pom.xml
@@ -132,6 +132,9 @@
<include>com.google.googlejavaformat:google-java-format</include>
<include>com.google.errorprone:javac</include>
<include>net.jodah:typetools</include>
+ <include>org.jboss:jboss-reflect</include>
+ <include>org.jboss.logging:jboss-logging-spi</include>
+ <include>org.jboss:jboss-common-core</include>
<include>com.beust:jcommander</include>
<include>com.fasterxml.jackson.dataformat:jackson-dataformat-yaml</include>
<include>org.yaml:snakeyaml</include>
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.