You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2022/07/06 07:53:06 UTC

[pinot] branch master updated: Adding constructor override for KinesisDataProducer (#8975)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 04cc62b363 Adding constructor override for KinesisDataProducer (#8975)
04cc62b363 is described below

commit 04cc62b363b3d9360b00d6174af04b7bf0088ba0
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Wed Jul 6 00:52:58 2022 -0700

    Adding constructor override for KinesisDataProducer (#8975)
    
    * Adding constructor overrides for KinesisDataProducer
    
    * Addressing feedback
---
 .../stream/kafka20/server/KafkaDataProducer.java   |  6 +-
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 17 ++++++
 .../stream/kinesis/server/KinesisDataProducer.java | 65 +++++++++++++---------
 3 files changed, 59 insertions(+), 29 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
index ed019d40d2..dd9790ec92 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
@@ -58,15 +58,13 @@ public class KafkaDataProducer implements StreamDataProducer {
 
   @Override
   public void produce(String topic, byte[] payload) {
-    ProducerRecord<byte[], byte[]> record = new ProducerRecord(topic, payload);
-    _producer.send(record);
+    _producer.send(new ProducerRecord<>(topic, payload));
     _producer.flush();
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    ProducerRecord<byte[], byte[]> record = new ProducerRecord(topic, key, payload);
-    _producer.send(record);
+    _producer.send(new ProducerRecord<>(topic, key, payload));
     _producer.flush();
   }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index fa344aa065..95221ff3bf 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -62,6 +62,23 @@ public class KinesisConfig {
     _endpoint = props.get(ENDPOINT);
   }
 
+  public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
+      String secretKey, String endpoint) {
+    this(streamTopicName, awsRegion, shardIteratorType, accessKey, secretKey, Integer.parseInt(DEFAULT_MAX_RECORDS),
+        endpoint);
+  }
+
+  public KinesisConfig(String streamTopicName, String awsRegion, ShardIteratorType shardIteratorType, String accessKey,
+      String secretKey, int maxRecords, String endpoint) {
+    _streamTopicName = streamTopicName;
+    _awsRegion = awsRegion;
+    _shardIteratorType = shardIteratorType;
+    _accessKey = accessKey;
+    _secretKey = secretKey;
+    _numMaxRecordsToFetch = maxRecords;
+    _endpoint = endpoint;
+  }
+
   public String getStreamTopicName() {
     return _streamTopicName;
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
index a8e057ff24..6133a8c4f3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -64,37 +64,52 @@ public class KinesisDataProducer implements StreamDataProducer {
   private KinesisClient _kinesisClient;
   private RetryPolicy _retryPolicy;
 
+  public KinesisDataProducer() { }
+
+  public KinesisDataProducer(KinesisClient kinesisClient) {
+    this(kinesisClient, new FixedDelayRetryPolicy(
+        Integer.parseInt(DEFAULT_NUM_RETRIES + 1), Integer.parseInt(DEFAULT_RETRY_DELAY_MILLIS)));
+  }
+
+  public KinesisDataProducer(KinesisClient kinesisClient, RetryPolicy retryPolicy) {
+    _kinesisClient = kinesisClient;
+    _retryPolicy = retryPolicy;
+  }
+
   @Override
   public void init(Properties props) {
-    try {
-      KinesisClientBuilder kinesisClientBuilder;
-      if (props.containsKey(ACCESS) && props.containsKey(SECRET)) {
-        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-            .credentialsProvider(getLocalAWSCredentials(props))
-            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
-      } else {
-        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-            .credentialsProvider(DefaultCredentialsProvider.create())
-            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
-      }
+    if (_kinesisClient == null) {
+      try {
+        KinesisClientBuilder kinesisClientBuilder;
+        if (props.containsKey(ACCESS) && props.containsKey(SECRET)) {
+          kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+              .credentialsProvider(getLocalAWSCredentials(props))
+              .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        } else {
+          kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+              .credentialsProvider(DefaultCredentialsProvider.create())
+              .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        }
 
-      if (props.containsKey(ENDPOINT)) {
-        String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
-        try {
-          kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
-        } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
-              e);
+        if (props.containsKey(ENDPOINT)) {
+          String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
+          try {
+            kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
+          } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+                e);
+          }
         }
-      }
 
-      _kinesisClient = kinesisClientBuilder.build();
+        _kinesisClient = kinesisClientBuilder.build();
 
-      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
-      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
-      _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
-    } catch (Exception e) {
-      _kinesisClient = null;
+        int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+        long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+        _retryPolicy = new FixedDelayRetryPolicy(numRetries + 1, retryDelayMs);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to create a kinesis client due to ", e);
+        _kinesisClient = null;
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org