You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:15 UTC
[incubator-pinot] 09/23: Reformat code
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 2463bd1f68591cdc81ec6ee14d89d066bd3f5f97
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:27:05 2020 +0530
Reformat code
---
.../plugin/stream/kinesis/KinesisCheckpoint.java | 3 +--
.../stream/kinesis/KinesisConnectionHandler.java | 14 ++++++-----
.../plugin/stream/kinesis/KinesisConsumer.java | 28 ++++++++++++----------
.../stream/kinesis/KinesisConsumerFactory.java | 3 ++-
.../plugin/stream/kinesis/KinesisFetchResult.java | 6 ++---
.../kinesis/KinesisPartitionGroupMetadataMap.java | 5 ++--
6 files changed, 32 insertions(+), 27 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index aa80b17..89043ea 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -6,7 +6,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
public class KinesisCheckpoint implements Checkpoint {
String _sequenceNumber;
- public KinesisCheckpoint(String sequenceNumber){
+ public KinesisCheckpoint(String sequenceNumber) {
_sequenceNumber = sequenceNumber;
}
@@ -24,5 +24,4 @@ public class KinesisCheckpoint implements Checkpoint {
//TODO: Implement SerDe
return new KinesisCheckpoint(new String(blob));
}
-
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index d8888fa..554cca6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -24,19 +24,21 @@ public class KinesisConnectionHandler {
private String _awsRegion;
KinesisClient _kinesisClient;
- public KinesisConnectionHandler(){
+ public KinesisConnectionHandler() {
}
- public KinesisConnectionHandler(String stream, String awsRegion){
+ public KinesisConnectionHandler(String stream, String awsRegion) {
_stream = stream;
_awsRegion = awsRegion;
- _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
+ _kinesisClient =
+ KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+ .build();
}
- public List<Shard> getShards(){
- ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
+ public List<Shard> getShards() {
+ ListShardsResponse listShardsResponse =
+ _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
return listShardsResponse.shards();
}
-
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index d896d67..1181d14 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -39,7 +39,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
String kinesisEndSequenceNumber = null;
- if(end != null) {
+ if (end != null) {
KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
}
@@ -47,32 +47,34 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
String nextStartSequenceNumber = null;
Long startTimestamp = System.currentTimeMillis();
- while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){
+ while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
- if(getRecordsResponse.records().size() > 0){
+ if (getRecordsResponse.records().size() > 0) {
recordList.addAll(getRecordsResponse.records());
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
- if(kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ){
+ if (kinesisEndSequenceNumber != null
+ && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
nextStartSequenceNumber = kinesisEndSequenceNumber;
break;
}
- if(recordList.size() >= _maxRecords) break;
+ if (recordList.size() >= _maxRecords) {
+ break;
+ }
}
shardIterator = getRecordsResponse.nextShardIterator();
}
- if(nextStartSequenceNumber == null && recordList.size() > 0){
+ if (nextStartSequenceNumber == null && recordList.size() > 0) {
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
}
KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
- KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
- recordList);
+ KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
return kinesisFetchResult;
}
@@ -80,14 +82,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
GetShardIteratorResponse getShardIteratorResponse;
- if(kinesisStartCheckpoint.getSequenceNumber() != null) {
+ if (kinesisStartCheckpoint.getSequenceNumber() != null) {
String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
getShardIteratorResponse = _kinesisClient.getShardIterator(
- GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+ .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.startingSequenceNumber(kinesisStartSequenceNumber).build());
- } else{
+ } else {
getShardIteratorResponse = _kinesisClient.getShardIterator(
- GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+ GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+ .shardIteratorType(ShardIteratorType.LATEST).build());
}
return getShardIteratorResponse.shardIterator();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 0608118..5e06a01 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -21,7 +21,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
@Override
public PartitionGroupMetadataMap getPartitionGroupsMetadata(
PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
- return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+ return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(),
+ _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global"));
}
@Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 2996b28..2801a09 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -11,9 +11,9 @@ public class KinesisFetchResult implements FetchResult<Record> {
private final KinesisCheckpoint _kinesisCheckpoint;
private final List<Record> _recordList;
- public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList){
- _kinesisCheckpoint = kinesisCheckpoint;
- _recordList = recordList;
+ public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList) {
+ _kinesisCheckpoint = kinesisCheckpoint;
+ _recordList = recordList;
}
@Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 700ec3f..05d95de 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -12,10 +12,10 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
- public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+ public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) {
super(stream, awsRegion);
List<Shard> shardList = getShards();
- for(Shard shard : shardList){
+ for (Shard shard : shardList) {
String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
@@ -34,5 +34,4 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
return _stringPartitionGroupMetadataIndex.get(index);
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org