You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/02 06:22:16 UTC
[pulsar] branch master updated: Fail the source record if the write
fails (#3706)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bc8b380 Fail the source record if the write fails (#3706)
bc8b380 is described below
commit bc8b380e584a4d91a80c7afb72d5525bf845176f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Mar 1 22:22:11 2019 -0800
Fail the source record if the write fails (#3706)
---
.../apache/pulsar/functions/sink/PulsarSink.java | 26 ++++++++++++----------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8c32608..3f0e1a9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -67,9 +67,9 @@ public class PulsarSink<T> implements Sink<T> {
private interface PulsarSinkProcessor<T> {
- TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;
+ TypedMessageBuilder<T> newMessage(Record<T> record);
- void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception;
+ void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record);
void close() throws Exception;
}
@@ -136,11 +136,14 @@ public class PulsarSink<T> implements Sink<T> {
}
}
- public Function<Throwable, Void> getPublishErrorHandler(Record<T> record) {
+ public Function<Throwable, Void> getPublishErrorHandler(Record<T> record, boolean failSource) {
return throwable -> {
SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
Record<T> srcRecord = sinkRecord.getSourceRecord();
+ if (failSource) {
+ srcRecord.fail();
+ }
String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
@@ -177,10 +180,10 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
msg.sendAsync().thenAccept(messageId -> {
//no op
- }).exceptionally(getPublishErrorHandler(record));
+ }).exceptionally(getPublishErrorHandler(record, false));
}
}
@@ -191,10 +194,10 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
msg.sendAsync()
.thenAccept(messageId -> record.ack())
- .exceptionally(getPublishErrorHandler(record));
+ .exceptionally(getPublishErrorHandler(record, true));
}
}
@@ -206,7 +209,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception {
+ public TypedMessageBuilder<T> newMessage(Record<T> record) {
if (!record.getPartitionId().isPresent()) {
throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
}
@@ -219,8 +222,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record)
- throws Exception {
+ public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
if (!record.getRecordSequence().isPresent()) {
throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
@@ -230,7 +232,7 @@ public class PulsarSink<T> implements Sink<T> {
msg.sequenceId(record.getRecordSequence().get());
CompletableFuture<MessageId> future = msg.sendAsync();
- future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record));
+ future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record, true));
future.join();
}
}
@@ -268,7 +270,7 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void write(Record<T> record) throws Exception {
+ public void write(Record<T> record) {
TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
if (record.getKey().isPresent()) {
msg.key(record.getKey().get());