You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/07/06 07:53:06 UTC
[pinot] branch master updated: Adding constructor override for KinesisDataProducer (#8975)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 04cc62b363 Adding constructor override for KinesisDataProducer (#8975)
04cc62b363 is described below
commit 04cc62b363b3d9360b00d6174af04b7bf0088ba0
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Wed Jul 6 00:52:58 2022 -0700
Adding constructor override for KinesisDataProducer (#8975)
* Adding constructor overrides for KinesisDataProducer
* Addressing feedback
---
.../stream/kafka20/server/KafkaDataProducer.java | 6 +-
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 17 ++++++
.../stream/kinesis/server/KinesisDataProducer.java | 65 +++++++++++++---------
3 files changed, 59 insertions(+), 29 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
index ed019d40d2..dd9790ec92 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
@@ -58,15 +58,13 @@ public class KafkaDataProducer implements StreamDataProducer {
@Override
public void produce(String topic, byte[] payload) {
- ProducerRecord<byte[], byte[]> record = new ProducerRecord(topic, payload);
- _producer.send(record);
+ _producer.send(new ProducerRecord<>(topic, payload));
_producer.flush();
}
@Override
public void produce(String topic, byte[] key, byte[] payload) {
- ProducerRecord<byte[], byte[]> record = new ProducerRecord(topic, key, payload);
- _producer.send(record);
+ _producer.send(new ProducerRecord<>(topic, key, payload));
_producer.flush();
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index fa344aa065..95221ff3bf 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -62,6 +62,23 @@ public class KinesisConfig {
_endpoint = props.get(ENDPOINT);
}
+ public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
+ String secretKey, String endpoint) {
+ this(streamTopicName, awsRegion, shardIteratorType, accessKey, secretKey, Integer.parseInt(DEFAULT_MAX_RECORDS),
+ endpoint);
+ }
+
+ public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
+ String secretKey, int maxRecords, String endpoint) {
+ _streamTopicName = streamTopicName;
+ _awsRegion = awsRegion;
+ _shardIteratorType = shardIteratorType;
+ _accessKey = accessKey;
+ _secretKey = secretKey;
+ _numMaxRecordsToFetch = maxRecords;
+ _endpoint = endpoint;
+ }
+
public String getStreamTopicName() {
return _streamTopicName;
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
index a8e057ff24..6133a8c4f3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -64,37 +64,52 @@ public class KinesisDataProducer implements StreamDataProducer {
private KinesisClient _kinesisClient;
private RetryPolicy _retryPolicy;
+ public KinesisDataProducer() { }
+
+ public KinesisDataProducer(KinesisClient kinesisClient) {
+ this(kinesisClient, new FixedDelayRetryPolicy(
+ Integer.parseInt(DEFAULT_NUM_RETRIES + 1), Integer.parseInt(DEFAULT_RETRY_DELAY_MILLIS)));
+ }
+
+ public KinesisDataProducer(KinesisClient kinesisClient, RetryPolicy retryPolicy) {
+ _kinesisClient = kinesisClient;
+ _retryPolicy = retryPolicy;
+ }
+
@Override
public void init(Properties props) {
- try {
- KinesisClientBuilder kinesisClientBuilder;
- if (props.containsKey(ACCESS) && props.containsKey(SECRET)) {
- kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
- .credentialsProvider(getLocalAWSCredentials(props))
- .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
- } else {
- kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
- .credentialsProvider(DefaultCredentialsProvider.create())
- .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
- }
+ if (_kinesisClient == null) {
+ try {
+ KinesisClientBuilder kinesisClientBuilder;
+ if (props.containsKey(ACCESS) && props.containsKey(SECRET)) {
+ kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+ .credentialsProvider(getLocalAWSCredentials(props))
+ .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+ } else {
+ kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+ }
- if (props.containsKey(ENDPOINT)) {
- String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
- try {
- kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
- e);
+ if (props.containsKey(ENDPOINT)) {
+ String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
+ try {
+ kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+ e);
+ }
}
- }
- _kinesisClient = kinesisClientBuilder.build();
+ _kinesisClient = kinesisClientBuilder.build();
- int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
- long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
- _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
- } catch (Exception e) {
- _kinesisClient = null;
+ int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+ long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+ _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to create a kinesis client due to ", e);
+ _kinesisClient = null;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org