You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/04/28 23:24:45 UTC

[pinot] branch master updated: Add support for retry in kinesis producer (#8609)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c373cb86e5 Add support for retry in kinesis producer (#8609)
c373cb86e5 is described below

commit c373cb86e590e5f4a3e845c8b23d2d1c4f2b738c
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Apr 29 04:54:38 2022 +0530

    Add support for retry in kinesis producer (#8609)
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../stream/kinesis/server/KinesisDataProducer.java | 105 ++++++++++++++++++---
 1 file changed, 90 insertions(+), 15 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
index 150ac78151..4b67797e80 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -20,9 +20,17 @@ package org.apache.pinot.plugin.stream.kinesis.server;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.FixedDelayRetryPolicy;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -34,16 +42,27 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
 import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+
 
 public class KinesisDataProducer implements StreamDataProducer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataProducer.class);
+
   public static final String ENDPOINT = "endpoint";
   public static final String REGION = "region";
   public static final String ACCESS = "access";
   public static final String SECRET = "secret";
-  public static final String DEFAULT_PORT = "4566";
+  public static final String NUM_RETRIES = "num_retries";
+  public static final String RETRY_DELAY_MILLIS = "retry_delay_millis";
+
   public static final String DEFAULT_ENDPOINT = "http://localhost:4566";
+  public static final String DEFAULT_RETRY_DELAY_MILLIS = "10000L";
+  public static final String DEFAULT_NUM_RETRIES = "0";
 
   private KinesisClient _kinesisClient;
+  private RetryPolicy _retryPolicy;
 
   @Override
   public void init(Properties props) {
@@ -54,10 +73,9 @@ public class KinesisDataProducer implements StreamDataProducer {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
@@ -65,12 +83,16 @@ public class KinesisDataProducer implements StreamDataProducer {
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
@@ -78,18 +100,35 @@ public class KinesisDataProducer implements StreamDataProducer {
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (AttemptsExceededException ae) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    } catch (RetriableOperationException roe) {
+      LOGGER.error("Error occurred while pushing records in stream {}", topic, roe);
+    }
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key))
-            .build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, key, payload));
+    } catch (AttemptsExceededException ae) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    } catch (RetriableOperationException roe) {
+      LOGGER.error("Error occurred while pushing records in stream {}", topic, roe);
+    }
+  }
+
+  @Override
+  public void produceBatch(String topic, List<byte[]> rows) {
+    try {
+      _retryPolicy.attempt(() -> putRecordBatch(topic, rows));
+    } catch (AttemptsExceededException ae) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    } catch (RetriableOperationException roe) {
+      LOGGER.error("Error occurred while pushing records in stream {}", topic, roe);
+    }
   }
 
   @Override
@@ -101,4 +140,40 @@ public class KinesisDataProducer implements StreamDataProducer {
     return StaticCredentialsProvider.create(
         AwsBasicCredentials.create(props.getProperty(ACCESS), props.getProperty(SECRET)));
   }
+
+  private boolean putRecordBatch(String topic, List<byte[]> rows) {
+    try {
+      List<PutRecordsRequestEntry> putRecordsRequestEntries = new ArrayList<>();
+      for (byte[] row : rows) {
+        putRecordsRequestEntries.add(PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(row))
+            .partitionKey(UUID.randomUUID().toString()).build());
+      }
+      PutRecordsRequest putRecordsRequest =
+          PutRecordsRequest.builder().streamName(topic).records(putRecordsRequestEntries).build();
+
+      PutRecordsResponse putRecordsResponse = _kinesisClient.putRecords(putRecordsRequest);
+      return putRecordsResponse.sdkHttpResponse().isSuccessful();
+    } catch (Exception e) {
+      LOGGER.warn("Exception occurred while pushing record to Kinesis {}", e.getMessage());
+      return false;
+    }
+  }
+
+  private boolean putRecord(String topic, byte[] key, byte[] payload) {
+    try {
+      PutRecordRequest.Builder putRecordRequestBuilder =
+          PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload));
+
+      if (key != null) {
+        putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(new String(key));
+      } else {
+        putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(UUID.randomUUID().toString());
+      }
+      PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequestBuilder.build());
+      return putRecordResponse.sdkHttpResponse().isSuccessful();
+    } catch (Exception e) {
+      LOGGER.warn("Exception occurred while pushing record to Kinesis {}", e.getMessage());
+      return false;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org