You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:16 UTC

[incubator-pinot] 26/47: Handle closed connections

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 0b8bb670bfd91e3a455a989a584388f15065b6a3
Author: KKcorps <kh...@gmail.com>
AuthorDate: Mon Dec 21 14:21:55 2020 +0530

    Handle closed connections
---
 .../plugin/stream/kinesis/KinesisConnectionHandler.java      | 12 +++++++++---
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  |  8 ++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index ba94b0a..3607787 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -49,9 +49,7 @@ public class KinesisConnectionHandler {
   public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
     _awsRegion = awsRegion;
-    _kinesisClient =
-        KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
-            .build();
+    createConnection();
   }
 
   public List<Shard> getShards() {
@@ -60,9 +58,17 @@ public class KinesisConnectionHandler {
     return listShardsResponse.shards();
   }
 
+  public void createConnection(){
+    if(_kinesisClient == null) {
+      _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+          .build();
+    }
+  }
+
   public void close(){
     if(_kinesisClient != null) {
       _kinesisClient.close();
+      _kinesisClient = null;
     }
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 24810ba..fd48a92 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -79,6 +79,10 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
     try {
 
+      if(_kinesisClient == null){
+        createConnection();
+      }
+
       String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
       String kinesisEndSequenceNumber = null;
@@ -176,4 +180,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     return getShardIteratorResponse.shardIterator();
   }
 
+  @Override
+  public void close() {
+    super.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org