You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/10 13:11:06 UTC

[pulsar] branch master updated: Do not create any producer if the output type of a function is void (#2756)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 18c26e0  Do not create any producer if the output type of a function is void (#2756)
18c26e0 is described below

commit 18c26e07202ed8d234bffd91c4e12163460ea8c9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Oct 10 06:11:00 2018 -0700

    Do not create any producer if the output type of a function is void (#2756)
    
    * Do not create any producer if the output type of a function is void
    
    * Do not write if the record is null
    
    * Revert the check for null
---
 .../java/org/apache/pulsar/functions/sink/PulsarSink.java    |  6 +++++-
 .../org/apache/pulsar/functions/sink/PulsarSinkTest.java     | 12 +++---------
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 024638b..8c0c29f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -218,6 +218,10 @@ public class PulsarSink<T> implements Sink<T> {
         log.info("Opening pulsar sink with config: {}", pulsarSinkConfig);
 
         Schema<T> schema = initializeSchema();
+        if (schema == null) {
+            log.info("Since output type is null, not creating any real sink");
+            return;
+        }
 
         FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
         switch (processingGuarantees) {
@@ -283,7 +287,7 @@ public class PulsarSink<T> implements Sink<T> {
 
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
-            return (Schema<T>) Schema.BYTES;
+            return null;
         }
 
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 4722c6f..7dfe6a7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -42,14 +42,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
@@ -169,7 +162,8 @@ public class PulsarSinkTest {
         PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
 
         try {
-            pulsarSink.initializeSchema();
+            Schema schema = pulsarSink.initializeSchema();
+            assertEquals(schema, (Schema)null);
         } catch (Exception ex) {
             ex.printStackTrace();
             assertEquals(ex, null);