You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:22 UTC
[incubator-pinot] 16/23: Handle closed connections
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit bb8e08fda6da218f57882c348ca61ddc5a53e451
Author: KKcorps <kh...@gmail.com>
AuthorDate: Mon Dec 21 14:21:55 2020 +0530
Handle closed connections
---
.../plugin/stream/kinesis/KinesisConnectionHandler.java | 12 +++++++++---
.../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 8 ++++++++
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index ba94b0a..3607787 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -49,9 +49,7 @@ public class KinesisConnectionHandler {
public KinesisConnectionHandler(String stream, String awsRegion) {
_stream = stream;
_awsRegion = awsRegion;
- _kinesisClient =
- KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
- .build();
+ createConnection();
}
public List<Shard> getShards() {
@@ -60,9 +58,17 @@ public class KinesisConnectionHandler {
return listShardsResponse.shards();
}
+ public void createConnection(){
+ if(_kinesisClient == null) {
+ _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+ .build();
+ }
+ }
+
public void close(){
if(_kinesisClient != null) {
_kinesisClient.close();
+ _kinesisClient = null;
}
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 24810ba..fd48a92 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -79,6 +79,10 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
try {
+ if(_kinesisClient == null){
+ createConnection();
+ }
+
String shardIterator = getShardIterator(kinesisStartCheckpoint);
String kinesisEndSequenceNumber = null;
@@ -176,4 +180,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
return getShardIteratorResponse.shardIterator();
}
+ @Override
+ public void close() {
+ super.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org