You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/30 00:24:31 UTC

[pulsar] branch branch-2.7 updated (b68606a -> baf04c4)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from b68606a  [Java Client] Fix concurrency issue in incrementing epoch (#10278) (#10436)
     new 6c01ab4  Fix null error messages of onFailure exception in KinesisSink. (#10416)
     new baf04c4  [Kinesis]Fix kinesis sink can not retry to send messages (#10420)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 68 ++++++++++++++++------
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 12 ++++
 2 files changed, 63 insertions(+), 17 deletions(-)

[pulsar] 02/02: [Kinesis]Fix kinesis sink can not retry to send messages (#10420)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit baf04c40795f9617f3489579c519aee2fe70fa73
Author: Zike Yang <ar...@armail.top>
AuthorDate: Thu Apr 29 23:45:00 2021 +0800

    [Kinesis]Fix kinesis sink can not retry to send messages (#10420)
    
    ### Motivation
    
    Currently, when the kinesis sink connector fails to send a message, it will not retry. In this case, if `retainOrdering` is enabled, it will lead to subsequent messages can not be sent like the following:
    > 17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN  org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain ordering with previous failed message prod_extapi.reply.message-Optional[26380226003034]
    
    
    ### Modifications
    
    * Add retry logic for the kinesis sink connector. When sending a message fails, it will retry to send.
    
    (cherry picked from commit 345cd33c6977c94a5c425e412a9977b4bd144a84)
---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 52 +++++++++++++++-------
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 12 +++++
 2 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 8682332..0b2a41a 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -39,11 +39,14 @@ import io.netty.util.Recycler.Handle;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
@@ -96,6 +99,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
+    private ScheduledExecutorService scheduledExecutor;
     // 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -107,11 +111,16 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
     public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_";
     public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_";
     public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_";
-         
-    
+
+    private void sendUserRecord(ProducerSendCallback producerSendCallback) {
+        ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
+                producerSendCallback.partitionedKey, producerSendCallback.data);
+        addCallback(addRecordResult, producerSendCallback, directExecutor());
+    }
+
     @Override
     public void write(Record<byte[]> record) throws Exception {
-        // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering 
+        // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering
         if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) {
             LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName,
                     record.getRecordSequence());
@@ -122,10 +131,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least one, and at most 256
         ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
-        ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
-                partitionedKey, data);
-        addCallback(addRecordResult,
-                ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor());
+        sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), partitionedKey, data));
         if (sinkContext != null) {
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
@@ -146,6 +152,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
         kinesisSinkConfig = KinesisSinkConfig.load(config);
         this.sinkContext = sinkContext;
 
@@ -180,16 +187,25 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
         private KinesisSink kinesisSink;
+        private Backoff backoff;
+        private String partitionedKey;
+        private ByteBuffer data;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime) {
+        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer data) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
             sendCallback.kinesisSink = kinesisSink;
             sendCallback.startTime = startTime;
+            sendCallback.partitionedKey = partitionedKey;
+            sendCallback.data = data;
+            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && sendCallback.backoff == null) {
+                sendCallback.backoff = new Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), TimeUnit.MILLISECONDS,
+                        kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), TimeUnit.MILLISECONDS, 0, TimeUnit.SECONDS);
+            }
             return sendCallback;
         }
 
@@ -197,6 +213,10 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
             resultContext = null;
             kinesisSink = null;
             startTime = 0;
+            if (backoff != null)
+                backoff.reset();
+            partitionedKey = null;
+            data = null;
             recyclerHandle.recycle(this);
         }
 
@@ -216,12 +236,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
             }
-            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed == TRUE) {
-                LOG.warn("Skip acking message to retain ordering with previous failed message {}-{} on shard {}",
-                        kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId());
-            } else {
-                this.resultContext.ack();
-            }
+            kinesisSink.previousPublishFailed = FALSE;
             recycle();
         }
 
@@ -244,7 +259,14 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
             }
-            recycle();
+            if (backoff != null) {
+                long nextDelay = backoff.next();
+                LOG.info("[{}] Retry to publish message for replicator of {}-{} after {} ms.", kinesisSink.streamName,
+                        resultContext.getPartitionId(), resultContext.getRecordSequence(), nextDelay);
+                kinesisSink.scheduledExecutor.schedule(() -> kinesisSink.sendUserRecord(this), nextDelay, TimeUnit.MICROSECONDS);
+            } else {
+                recycle();
+            }
         }
     }
 
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ec21dd5..fa00550 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -64,6 +64,18 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable
         help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar to Kinesis")
     private boolean retainOrdering = false;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "100",
+            help = "The initial delay(in milliseconds) between retries.")
+    private long retryInitialDelayInMillis = 100;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "60000",
+            help = "The maximum delay(in milliseconds) between retries.")
+    private long retryMaxDelayInMillis = 60000;
+
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);

[pulsar] 01/02: Fix null error messages of onFailure exception in KinesisSink. (#10416)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6c01ab4cc90ffccc70b9c8e177c0d28ea75a69d1
Author: Zike Yang <ar...@armail.top>
AuthorDate: Wed Apr 28 18:06:00 2021 +0800

    Fix null error messages of onFailure exception in KinesisSink. (#10416)
    
    
    (cherry picked from commit 89a808cf88d10c8380e69ef129ad9f593d0b5eae)
---
 .../java/org/apache/pulsar/io/kinesis/KinesisSink.java   | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index bf58cfa..8682332 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -28,6 +28,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -226,8 +227,19 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
 
         @Override
         public void onFailure(Throwable exception) {
-            LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", kinesisSink.streamName,
-                    resultContext.getPartitionId(), resultContext.getRecordSequence(), exception.getMessage());
+            if (exception instanceof UserRecordFailedException) {
+                // If the exception is UserRecordFailedException, we need to extract it to see real error messages.
+                UserRecordFailedException failedException = (UserRecordFailedException) exception;
+                StringBuffer stringBuffer = new StringBuffer();
+                failedException.getResult().getAttempts().forEach(attempt ->
+                        stringBuffer.append(String.format("errorMessage:%s, errorCode:%s, delay:%d, duration:%d;",
+                                attempt.getErrorMessage(), attempt.getErrorCode(), attempt.getDelay(), attempt.getDuration())));
+                LOG.error("[{}] Failed to published message for replicator of {}-{}: Attempts:{}", kinesisSink.streamName,
+                        resultContext.getPartitionId(), resultContext.getRecordSequence(), stringBuffer.toString());
+            } else {
+                LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", kinesisSink.streamName,
+                        resultContext.getPartitionId(), resultContext.getRecordSequence(), exception.getMessage());
+            }
             kinesisSink.previousPublishFailed = TRUE;
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);