You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/30 00:24:33 UTC

[pulsar] 02/02: [Kinesis]Fix kinesis sink can not retry to send messages (#10420)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit baf04c40795f9617f3489579c519aee2fe70fa73
Author: Zike Yang <ar...@armail.top>
AuthorDate: Thu Apr 29 23:45:00 2021 +0800

    [Kinesis]Fix kinesis sink can not retry to send messages (#10420)
    
    ### Motivation
    
    Currently, when the kinesis sink connector fails to send a message, it will not retry. In this case, if `retainOrdering` is enabled, it will lead to subsequent messages can not be sent like the following:
    > 17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN  org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain ordering with previous failed message prod_extapi.reply.message-Optional[26380226003034]
    
    
    ### Modifications
    
    * Add retry logic for the kinesis sink connector. When sending a message fails, it will retry to send.
    
    (cherry picked from commit 345cd33c6977c94a5c425e412a9977b4bd144a84)
---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 52 +++++++++++++++-------
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 12 +++++
 2 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 8682332..0b2a41a 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -39,11 +39,14 @@ import io.netty.util.Recycler.Handle;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
@@ -96,6 +99,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
+    private ScheduledExecutorService scheduledExecutor;
     // 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -107,11 +111,16 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
     public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_";
     public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_";
     public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_";
-         
-    
+
+    private void sendUserRecord(ProducerSendCallback producerSendCallback) {
+        ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
+                producerSendCallback.partitionedKey, producerSendCallback.data);
+        addCallback(addRecordResult, producerSendCallback, directExecutor());
+    }
+
     @Override
     public void write(Record<byte[]> record) throws Exception {
-        // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering 
+        // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering
         if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) {
             LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName,
                     record.getRecordSequence());
@@ -122,10 +131,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least one, and at most 256
         ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
-        ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
-                partitionedKey, data);
-        addCallback(addRecordResult,
-                ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor());
+        sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), partitionedKey, data));
         if (sinkContext != null) {
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
@@ -146,6 +152,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
         kinesisSinkConfig = KinesisSinkConfig.load(config);
         this.sinkContext = sinkContext;
 
@@ -180,16 +187,25 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
         private KinesisSink kinesisSink;
+        private Backoff backoff;
+        private String partitionedKey;
+        private ByteBuffer data;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime) {
+        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer data) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
             sendCallback.kinesisSink = kinesisSink;
             sendCallback.startTime = startTime;
+            sendCallback.partitionedKey = partitionedKey;
+            sendCallback.data = data;
+            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && sendCallback.backoff == null) {
+                sendCallback.backoff = new Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), TimeUnit.MILLISECONDS,
+                        kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), TimeUnit.MILLISECONDS, 0, TimeUnit.SECONDS);
+            }
             return sendCallback;
         }
 
@@ -197,6 +213,10 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
             resultContext = null;
             kinesisSink = null;
             startTime = 0;
+            if (backoff != null)
+                backoff.reset();
+            partitionedKey = null;
+            data = null;
             recyclerHandle.recycle(this);
         }
 
@@ -216,12 +236,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
             }
-            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed == TRUE) {
-                LOG.warn("Skip acking message to retain ordering with previous failed message {}-{} on shard {}",
-                        kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId());
-            } else {
-                this.resultContext.ack();
-            }
+            kinesisSink.previousPublishFailed = FALSE;
             recycle();
         }
 
@@ -244,7 +259,14 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
             }
-            recycle();
+            if (backoff != null) {
+                long nextDelay = backoff.next();
+                LOG.info("[{}] Retry to publish message for replicator of {}-{} after {} ms.", kinesisSink.streamName,
+                        resultContext.getPartitionId(), resultContext.getRecordSequence(), nextDelay);
+                kinesisSink.scheduledExecutor.schedule(() -> kinesisSink.sendUserRecord(this), nextDelay, TimeUnit.MICROSECONDS);
+            } else {
+                recycle();
+            }
         }
     }
 
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ec21dd5..fa00550 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -64,6 +64,18 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
         help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar to Kinesis")
     private boolean retainOrdering = false;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "100",
+            help = "The initial delay(in milliseconds) between retries.")
+    private long retryInitialDelayInMillis = 100;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "60000",
+            help = "The maximum delay(in milliseconds) between retries.")
+    private long retryMaxDelayInMillis = 60000;
+
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);