You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:15 UTC

[incubator-pinot] 09/23: Reformat code

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2463bd1f68591cdc81ec6ee14d89d066bd3f5f97
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:27:05 2020 +0530

    Reformat code
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  3 +--
 .../stream/kinesis/KinesisConnectionHandler.java   | 14 ++++++-----
 .../plugin/stream/kinesis/KinesisConsumer.java     | 28 ++++++++++++----------
 .../stream/kinesis/KinesisConsumerFactory.java     |  3 ++-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  6 ++---
 .../kinesis/KinesisPartitionGroupMetadataMap.java  |  5 ++--
 6 files changed, 32 insertions(+), 27 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index aa80b17..89043ea 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -6,7 +6,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
 
-  public KinesisCheckpoint(String sequenceNumber){
+  public KinesisCheckpoint(String sequenceNumber) {
     _sequenceNumber = sequenceNumber;
   }
 
@@ -24,5 +24,4 @@ public class KinesisCheckpoint implements Checkpoint {
     //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index d8888fa..554cca6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -24,19 +24,21 @@ public class KinesisConnectionHandler {
   private String _awsRegion;
   KinesisClient _kinesisClient;
 
-  public KinesisConnectionHandler(){
+  public KinesisConnectionHandler() {
 
   }
 
-  public KinesisConnectionHandler(String stream, String awsRegion){
+  public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
     _awsRegion = awsRegion;
-    _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
+    _kinesisClient =
+        KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+            .build();
   }
 
-  public List<Shard> getShards(){
-    ListShardsResponse listShardsResponse =  _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
+  public List<Shard> getShards() {
+    ListShardsResponse listShardsResponse =
+        _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
     return listShardsResponse.shards();
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index d896d67..1181d14 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -39,7 +39,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
     String kinesisEndSequenceNumber = null;
 
-    if(end != null) {
+    if (end != null) {
       KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
       kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
     }
@@ -47,32 +47,34 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     String nextStartSequenceNumber = null;
     Long startTimestamp = System.currentTimeMillis();
 
-    while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){
+    while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
       GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
       GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
-      if(getRecordsResponse.records().size() > 0){
+      if (getRecordsResponse.records().size() > 0) {
         recordList.addAll(getRecordsResponse.records());
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
 
-        if(kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ){
+        if (kinesisEndSequenceNumber != null
+            && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
           nextStartSequenceNumber = kinesisEndSequenceNumber;
           break;
         }
 
-        if(recordList.size() >= _maxRecords) break;
+        if (recordList.size() >= _maxRecords) {
+          break;
+        }
       }
 
       shardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    if(nextStartSequenceNumber == null && recordList.size() > 0){
+    if (nextStartSequenceNumber == null && recordList.size() > 0) {
       nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
     }
 
     KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
-        recordList);
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
     return kinesisFetchResult;
   }
@@ -80,14 +82,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
     GetShardIteratorResponse getShardIteratorResponse;
 
-    if(kinesisStartCheckpoint.getSequenceNumber() != null) {
+    if (kinesisStartCheckpoint.getSequenceNumber() != null) {
       String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+              .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
               .startingSequenceNumber(kinesisStartSequenceNumber).build());
-    } else{
+    } else {
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+              .shardIteratorType(ShardIteratorType.LATEST).build());
     }
 
     return getShardIteratorResponse.shardIterator();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 0608118..5e06a01 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -21,7 +21,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(),
+        _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global"));
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 2996b28..2801a09 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -11,9 +11,9 @@ public class KinesisFetchResult implements FetchResult<Record> {
   private final KinesisCheckpoint _kinesisCheckpoint;
   private final List<Record> _recordList;
 
-  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList){
-     _kinesisCheckpoint = kinesisCheckpoint;
-     _recordList = recordList;
+  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList) {
+    _kinesisCheckpoint = kinesisCheckpoint;
+    _recordList = recordList;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 700ec3f..05d95de 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -12,10 +12,10 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
-  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) {
     super(stream, awsRegion);
     List<Shard> shardList = getShards();
-    for(Shard shard : shardList){
+    for (Shard shard : shardList) {
       String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
@@ -34,5 +34,4 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
   public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
     return _stringPartitionGroupMetadataIndex.get(index);
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org