You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/02 06:22:16 UTC

[pulsar] branch master updated: Fail the source record if the write fails (#3706)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc8b380  Fail the source record if the write fails (#3706)
bc8b380 is described below

commit bc8b380e584a4d91a80c7afb72d5525bf845176f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Mar 1 22:22:11 2019 -0800

    Fail the source record if the write fails (#3706)
---
 .../apache/pulsar/functions/sink/PulsarSink.java   | 26 ++++++++++++----------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8c32608..3f0e1a9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -67,9 +67,9 @@ public class PulsarSink<T> implements Sink<T> {
 
     private interface PulsarSinkProcessor<T> {
 
-        TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;
+        TypedMessageBuilder<T> newMessage(Record<T> record);
 
-        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception;
+        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record);
 
         void close() throws Exception;
     }
@@ -136,11 +136,14 @@ public class PulsarSink<T> implements Sink<T> {
             }
         }
 
-        public Function<Throwable, Void> getPublishErrorHandler(Record<T> record) {
+        public Function<Throwable, Void> getPublishErrorHandler(Record<T> record, boolean failSource) {
 
             return throwable -> {
                 SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
                 Record<T> srcRecord = sinkRecord.getSourceRecord();
+                if (failSource) {
+                    srcRecord.fail();
+                }
 
                 String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
 
@@ -177,10 +180,10 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
             msg.sendAsync().thenAccept(messageId -> {
                 //no op
-            }).exceptionally(getPublishErrorHandler(record));
+            }).exceptionally(getPublishErrorHandler(record, false));
         }
     }
 
@@ -191,10 +194,10 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
             msg.sendAsync()
                     .thenAccept(messageId -> record.ack())
-                    .exceptionally(getPublishErrorHandler(record));
+                    .exceptionally(getPublishErrorHandler(record, true));
         }
     }
 
@@ -206,7 +209,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception {
+        public TypedMessageBuilder<T> newMessage(Record<T> record) {
             if (!record.getPartitionId().isPresent()) {
                 throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
             }
@@ -219,8 +222,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record)
-                throws Exception {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
 
             if (!record.getRecordSequence().isPresent()) {
                 throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
@@ -230,7 +232,7 @@ public class PulsarSink<T> implements Sink<T> {
             msg.sequenceId(record.getRecordSequence().get());
             CompletableFuture<MessageId> future = msg.sendAsync();
 
-            future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record));
+            future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record, true));
             future.join();
         }
     }
@@ -268,7 +270,7 @@ public class PulsarSink<T> implements Sink<T> {
     }
 
     @Override
-    public void write(Record<T> record) throws Exception {
+    public void write(Record<T> record) {
         TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
         if (record.getKey().isPresent()) {
             msg.key(record.getKey().get());