You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/04/28 23:24:45 UTC
[pinot] branch master updated: Add support for retry in kinesis producer (#8609)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c373cb86e5 Add support for retry in kinesis producer (#8609)
c373cb86e5 is described below
commit c373cb86e590e5f4a3e845c8b23d2d1c4f2b738c
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Apr 29 04:54:38 2022 +0530
Add support for retry in kinesis producer (#8609)
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../stream/kinesis/server/KinesisDataProducer.java | 105 ++++++++++++++++++---
1 file changed, 90 insertions(+), 15 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
index 150ac78151..4b67797e80 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -20,9 +20,17 @@ package org.apache.pinot.plugin.stream.kinesis.server;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.FixedDelayRetryPolicy;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
@@ -34,16 +42,27 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+
public class KinesisDataProducer implements StreamDataProducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataProducer.class);
+
public static final String ENDPOINT = "endpoint";
public static final String REGION = "region";
public static final String ACCESS = "access";
public static final String SECRET = "secret";
- public static final String DEFAULT_PORT = "4566";
+ public static final String NUM_RETRIES = "num_retries";
+ public static final String RETRY_DELAY_MILLIS = "retry_delay_millis";
+
public static final String DEFAULT_ENDPOINT = "http://localhost:4566";
+ public static final String DEFAULT_RETRY_DELAY_MILLIS = "10000L";
+ public static final String DEFAULT_NUM_RETRIES = "0";
private KinesisClient _kinesisClient;
+ private RetryPolicy _retryPolicy;
@Override
public void init(Properties props) {
@@ -54,10 +73,9 @@ public class KinesisDataProducer implements StreamDataProducer {
.credentialsProvider(getLocalAWSCredentials(props))
.httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
} else {
- kinesisClientBuilder =
- KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
- .credentialsProvider(DefaultCredentialsProvider.create())
- .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+ kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
}
if (props.containsKey(ENDPOINT)) {
@@ -65,12 +83,16 @@ public class KinesisDataProducer implements StreamDataProducer {
try {
kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
} catch (URISyntaxException e) {
- throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
- + kinesisEndpoint, e);
+ throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+ e);
}
}
_kinesisClient = kinesisClientBuilder.build();
+
+ int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+ long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+ _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
} catch (Exception e) {
_kinesisClient = null;
}
@@ -78,18 +100,35 @@ public class KinesisDataProducer implements StreamDataProducer {
@Override
public void produce(String topic, byte[] payload) {
- PutRecordRequest putRecordRequest =
- PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
- .partitionKey(UUID.randomUUID().toString()).build();
- PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+ try {
+ _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+ } catch (AttemptsExceededException ae) {
+ LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+ } catch (RetriableOperationException roe) {
+ LOGGER.error("Error occurred while pushing records in stream {}", topic, roe);
+ }
}
@Override
public void produce(String topic, byte[] key, byte[] payload) {
- PutRecordRequest putRecordRequest =
- PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key))
- .build();
- PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+ try {
+ _retryPolicy.attempt(() -> putRecord(topic, key, payload));
+ } catch (AttemptsExceededException ae) {
+ LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+ } catch (RetriableOperationException roe) {
+ LOGGER.error("Error occurred while pushing records in stream {}", topic, roe);
+ }
+ }
+
+ @Override
+ public void produceBatch(String topic, List<byte[]> rows) {
+ try {
+ _retryPolicy.attempt(() -> putRecordBatch(topic, rows));
+ } catch (AttemptsExceededException ae) {
+ LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+ } catch (RetriableOperationException roe) {
+ LOGGER.error("Error occurred while pushing records in stream {}", topic, roe);
+ }
}
@Override
@@ -101,4 +140,40 @@ public class KinesisDataProducer implements StreamDataProducer {
return StaticCredentialsProvider.create(
AwsBasicCredentials.create(props.getProperty(ACCESS), props.getProperty(SECRET)));
}
+
+ private boolean putRecordBatch(String topic, List<byte[]> rows) {
+ try {
+ List<PutRecordsRequestEntry> putRecordsRequestEntries = new ArrayList<>();
+ for (byte[] row : rows) {
+ putRecordsRequestEntries.add(PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(row))
+ .partitionKey(UUID.randomUUID().toString()).build());
+ }
+ PutRecordsRequest putRecordsRequest =
+ PutRecordsRequest.builder().streamName(topic).records(putRecordsRequestEntries).build();
+
+ PutRecordsResponse putRecordsResponse = _kinesisClient.putRecords(putRecordsRequest);
+ return putRecordsResponse.sdkHttpResponse().isSuccessful();
+ } catch (Exception e) {
+ LOGGER.warn("Exception occurred while pushing record to Kinesis {}", e.getMessage());
+ return false;
+ }
+ }
+
+ private boolean putRecord(String topic, byte[] key, byte[] payload) {
+ try {
+ PutRecordRequest.Builder putRecordRequestBuilder =
+ PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload));
+
+ if (key != null) {
+ putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(new String(key));
+ } else {
+ putRecordRequestBuilder = putRecordRequestBuilder.partitionKey(UUID.randomUUID().toString());
+ }
+ PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequestBuilder.build());
+ return putRecordResponse.sdkHttpResponse().isSuccessful();
+ } catch (Exception e) {
+ LOGGER.warn("Exception occurred while pushing record to Kinesis {}", e.getMessage());
+ return false;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org