You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/30 00:24:33 UTC
[pulsar] 02/02: [Kinesis]Fix kinesis sink can not retry to send
messages (#10420)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit baf04c40795f9617f3489579c519aee2fe70fa73
Author: Zike Yang <ar...@armail.top>
AuthorDate: Thu Apr 29 23:45:00 2021 +0800
[Kinesis]Fix kinesis sink can not retry to send messages (#10420)
### Motivation
Currently, when the kinesis sink connector fails to send a message, it will not retry. In this case, if `retainOrdering` is enabled, it will lead to subsequent messages can not be sent like the following:
> 17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain ordering with previous failed message prod_extapi.reply.message-Optional[26380226003034]
### Modifications
* Add retry logic for the kinesis sink connector. When sending a message fails, it will retry to send.
(cherry picked from commit 345cd33c6977c94a5c425e412a9977b4bd144a84)
---
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 52 +++++++++++++++-------
.../pulsar/io/kinesis/KinesisSinkConfig.java | 12 +++++
2 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 8682332..0b2a41a 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -39,11 +39,14 @@ import io.netty.util.Recycler.Handle;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
@@ -96,6 +99,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
private static final String defaultPartitionedKey = "default";
private static final int maxPartitionedKeyLength = 256;
private SinkContext sinkContext;
+ private ScheduledExecutorService scheduledExecutor;
//
private static final int FALSE = 0;
private static final int TRUE = 1;
@@ -107,11 +111,16 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_";
public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_";
public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_";
-
-
+
+ private void sendUserRecord(ProducerSendCallback producerSendCallback) {
+ ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
+ producerSendCallback.partitionedKey, producerSendCallback.data);
+ addCallback(addRecordResult, producerSendCallback, directExecutor());
+ }
+
@Override
public void write(Record<byte[]> record) throws Exception {
- // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering
+ // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering
if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) {
LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName,
record.getRecordSequence());
@@ -122,10 +131,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
- ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
- partitionedKey, data);
- addCallback(addRecordResult,
- ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor());
+ sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), partitionedKey, data));
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
@@ -146,6 +152,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
kinesisSinkConfig = KinesisSinkConfig.load(config);
this.sinkContext = sinkContext;
@@ -180,16 +187,25 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
private KinesisSink kinesisSink;
+ private Backoff backoff;
+ private String partitionedKey;
+ private ByteBuffer data;
private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime) {
+ static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer data) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.resultContext = resultContext;
sendCallback.kinesisSink = kinesisSink;
sendCallback.startTime = startTime;
+ sendCallback.partitionedKey = partitionedKey;
+ sendCallback.data = data;
+ if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && sendCallback.backoff == null) {
+ sendCallback.backoff = new Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), TimeUnit.MILLISECONDS,
+ kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), TimeUnit.MILLISECONDS, 0, TimeUnit.SECONDS);
+ }
return sendCallback;
}
@@ -197,6 +213,10 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
resultContext = null;
kinesisSink = null;
startTime = 0;
+ if (backoff != null)
+ backoff.reset();
+ partitionedKey = null;
+ data = null;
recyclerHandle.recycle(this);
}
@@ -216,12 +236,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
if (kinesisSink.sinkContext != null) {
kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
}
- if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed == TRUE) {
- LOG.warn("Skip acking message to retain ordering with previous failed message {}-{} on shard {}",
- kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId());
- } else {
- this.resultContext.ack();
- }
+ kinesisSink.previousPublishFailed = FALSE;
recycle();
}
@@ -244,7 +259,14 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
if (kinesisSink.sinkContext != null) {
kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
}
- recycle();
+ if (backoff != null) {
+ long nextDelay = backoff.next();
+ LOG.info("[{}] Retry to publish message for replicator of {}-{} after {} ms.", kinesisSink.streamName,
+ resultContext.getPartitionId(), resultContext.getRecordSequence(), nextDelay);
+ kinesisSink.scheduledExecutor.schedule(() -> kinesisSink.sendUserRecord(this), nextDelay, TimeUnit.MICROSECONDS);
+ } else {
+ recycle();
+ }
}
}
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ec21dd5..fa00550 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -64,6 +64,18 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar to Kinesis")
private boolean retainOrdering = false;
+ @FieldDoc(
+ required = false,
+ defaultValue = "100",
+ help = "The initial delay(in milliseconds) between retries.")
+ private long retryInitialDelayInMillis = 100;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "60000",
+ help = "The maximum delay(in milliseconds) between retries.")
+ private long retryMaxDelayInMillis = 60000;
+
public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);