You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/10 13:11:06 UTC
[pulsar] branch master updated: Do not create any producer if the
output type of a function is void (#2756)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 18c26e0 Do not create any producer if the output type of a function is void (#2756)
18c26e0 is described below
commit 18c26e07202ed8d234bffd91c4e12163460ea8c9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Oct 10 06:11:00 2018 -0700
Do not create any producer if the output type of a function is void (#2756)
* Do not create any producer if the output type of a function is void
* Do not write if the record is null
* Revert the check for null
---
.../java/org/apache/pulsar/functions/sink/PulsarSink.java | 6 +++++-
.../org/apache/pulsar/functions/sink/PulsarSinkTest.java | 12 +++---------
2 files changed, 8 insertions(+), 10 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 024638b..8c0c29f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -218,6 +218,10 @@ public class PulsarSink<T> implements Sink<T> {
log.info("Opening pulsar sink with config: {}", pulsarSinkConfig);
Schema<T> schema = initializeSchema();
+ if (schema == null) {
+ log.info("Since output type is null, not creating any real sink");
+ return;
+ }
FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
switch (processingGuarantees) {
@@ -283,7 +287,7 @@ public class PulsarSink<T> implements Sink<T> {
if (Void.class.equals(typeArg)) {
// return type is 'void', so there's no schema to check
- return (Schema<T>) Schema.BYTES;
+ return null;
}
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 4722c6f..7dfe6a7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -42,14 +42,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
@@ -169,7 +162,8 @@ public class PulsarSinkTest {
PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
try {
- pulsarSink.initializeSchema();
+ Schema schema = pulsarSink.initializeSchema();
+ assertEquals(schema, (Schema)null);
} catch (Exception ex) {
ex.printStackTrace();
assertEquals(ex, null);