You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:23 UTC
[incubator-pinot] 33/47: fixing compilation
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6cb0ebb1b775959c166cbcdeadec74ae3349e4ad
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Jan 2 17:14:31 2021 -0800
fixing compilation
---
pinot-distribution/pinot-assembly.xml | 4 ++
pinot-distribution/pom.xml | 4 ++
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 64 ++++++++++++++++++++--
.../plugin/stream/kinesis/KinesisCheckpoint.java | 1 +
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 23 ++++----
.../stream/kinesis/KinesisConnectionHandler.java | 26 +++------
.../plugin/stream/kinesis/KinesisConsumer.java | 50 +++++++----------
.../stream/kinesis/KinesisConsumerFactory.java | 4 +-
.../plugin/stream/kinesis/KinesisFetchResult.java | 3 -
.../kinesis/KinesisPartitionGroupMetadataMap.java | 7 +--
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 18 ++++++
.../stream/kinesis/KinesisShardMetadata.java | 13 ++---
.../plugin/stream/kinesis/KinesisConsumerTest.java | 39 +++++++------
13 files changed, 152 insertions(+), 104 deletions(-)
diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml
index 2dfb36e..de7329f 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -55,6 +55,10 @@
<source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/target/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</source>
<destName>plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</destName>
</file>
+ <file>
+ <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/target/pinot-kinesis-${project.version}-shaded.jar</source>
+ <destName>plugins/pinot-stream-ingestion/pinot-kinesis/pinot-kinesis-${project.version}-shaded.jar</destName>
+ </file>
<!-- End Include Pinot Stream Ingestion Plugins-->
<!-- Start Include Pinot Batch Ingestion Plugins-->
<file>
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index 1a3f106..f29cae0 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -86,6 +86,10 @@
</exclusion>
<exclusion>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kinesis</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-batch-ingestion-standalone</artifactId>
</exclusion>
<exclusion>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 0c9ae0b..4fce169 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -19,19 +19,20 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>pinot-stream-ingestion</artifactId>
<groupId>org.apache.pinot</groupId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>pinot-kinesis</artifactId>
-
+ <name>Pinot Kinesis</name>
+ <url>https://pinot.apache.org/</url>
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<phase.prop>package</phase.prop>
@@ -43,6 +44,32 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<version>${aws.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
@@ -52,8 +79,33 @@
</dependency>
<dependency>
- <groupId>org.apache.pinot</groupId>
- <artifactId>pinot-spi</artifactId>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ <version>1.0.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ <version>4.1.42.Final</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>4.1.42.Final</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>4.1.42.Final</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>4.1.42.Final</version>
</dependency>
</dependencies>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 54e26d0..f3a7a49 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
import org.apache.pinot.spi.stream.v2.Checkpoint;
+
public class KinesisCheckpoint implements Checkpoint {
String _sequenceNumber;
Boolean _isEndOfPartition = false;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 82fc438..529f34f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -24,16 +24,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class KinesisConfig {
- private final Map<String, String> _props;
-
public static final String STREAM = "stream";
- private static final String AWS_REGION = "aws-region";
- private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
-
- private static final String DEFAULT_AWS_REGION = "us-central-1";
- private static final String DEFAULT_MAX_RECORDS = "20";
- private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST";
+ public static final String AWS_REGION = "aws-region";
+ public static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+ public static final String DEFAULT_AWS_REGION = "us-central-1";
+ public static final String DEFAULT_MAX_RECORDS = "20";
+ public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString();
+ private final Map<String, String> _props;
public KinesisConfig(StreamConfig streamConfig) {
_props = streamConfig.getStreamConfigsMap();
@@ -43,20 +41,19 @@ public class KinesisConfig {
_props = props;
}
- public String getStream(){
+ public String getStream() {
return _props.get(STREAM);
}
- public String getAwsRegion(){
+ public String getAwsRegion() {
return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
}
- public Integer maxRecordsToFetch(){
+ public Integer maxRecordsToFetch() {
return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
}
- public ShardIteratorType getShardIteratorType(){
+ public ShardIteratorType getShardIteratorType() {
return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
}
-
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 0cf4787..4d968f6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -19,28 +19,18 @@
package org.apache.pinot.plugin.stream.kinesis;
import java.util.List;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
public class KinesisConnectionHandler {
+ KinesisClient _kinesisClient;
private String _stream;
private String _awsRegion;
- KinesisClient _kinesisClient;
public KinesisConnectionHandler() {
@@ -58,18 +48,18 @@ public class KinesisConnectionHandler {
return listShardsResponse.shards();
}
- public void createConnection(){
- if(_kinesisClient == null) {
- _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
- .build();
+ public void createConnection() {
+ if (_kinesisClient == null) {
+ _kinesisClient =
+ KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+ .build();
}
}
- public void close(){
- if(_kinesisClient != null) {
+ public void close() {
+ if (_kinesisClient != null) {
_kinesisClient.close();
_kinesisClient = null;
}
}
-
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 336468a..fb414f0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -19,18 +19,13 @@
package org.apache.pinot.plugin.stream.kinesis;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.FetchResult;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +33,6 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
@@ -46,13 +40,14 @@ import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+ private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
String _stream;
Integer _maxRecords;
String _shardId;
ExecutorService _executorService;
ShardIteratorType _shardIteratorType;
- private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -67,12 +62,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
@Override
public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
List<Record> recordList = new ArrayList<>();
- Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList));
+ Future<KinesisFetchResult> kinesisFetchResultFuture =
+ _executorService.submit(() -> getResult(start, end, recordList));
try {
return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
- } catch(Exception e){
- return handleException((KinesisCheckpoint) start, recordList);
+ } catch (Exception e) {
+ return handleException((KinesisCheckpoint) start, recordList);
}
}
@@ -81,7 +77,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
try {
- if(_kinesisClient == null){
+ if (_kinesisClient == null) {
createConnection();
}
@@ -105,7 +101,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
recordList.addAll(getRecordsResponse.records());
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
- if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
+ if (kinesisEndSequenceNumber != null
+ && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
nextStartSequenceNumber = kinesisEndSequenceNumber;
break;
}
@@ -115,14 +112,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
}
- if(getRecordsResponse.hasChildShards()){
+ if (getRecordsResponse.hasChildShards()) {
//This statement returns true only when end of current shard has reached.
isEndOfShard = true;
break;
}
shardIterator = getRecordsResponse.nextShardIterator();
-
}
if (nextStartSequenceNumber == null && recordList.size() > 0) {
@@ -133,28 +129,20 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
return kinesisFetchResult;
- }catch (ProvisionedThroughputExceededException e) {
- LOG.warn(
- "The request rate for the stream is too high"
- , e);
+ } catch (ProvisionedThroughputExceededException e) {
+ LOG.warn("The request rate for the stream is too high", e);
return handleException(kinesisStartCheckpoint, recordList);
- }
- catch (ExpiredIteratorException e) {
- LOG.warn(
- "ShardIterator expired while trying to fetch records",e
- );
+ } catch (ExpiredIteratorException e) {
+ LOG.warn("ShardIterator expired while trying to fetch records", e);
return handleException(kinesisStartCheckpoint, recordList);
- }
- catch (ResourceNotFoundException | InvalidArgumentException e) {
+ } catch (ResourceNotFoundException | InvalidArgumentException e) {
// aws errors
LOG.error("Encountered AWS error while attempting to fetch records", e);
return handleException(kinesisStartCheckpoint, recordList);
- }
- catch (KinesisException e) {
+ } catch (KinesisException e) {
LOG.warn("Encountered unknown unrecoverable AWS exception", e);
throw new RuntimeException(e);
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
// non transient errors
LOG.error("Unknown fetchRecords exception", e);
throw new RuntimeException(e);
@@ -162,11 +150,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
- if(recordList.size() > 0){
+ if (recordList.size() > 0) {
String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
return new KinesisFetchResult(kinesisCheckpoint, recordList);
- }else{
+ } else {
KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
return new KinesisFetchResult(kinesisCheckpoint, recordList);
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index acac1fb..9bb4d0c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
-import java.util.Map;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.v2.ConsumerV2;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
@@ -38,7 +37,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
@Override
public PartitionGroupMetadataMap getPartitionGroupsMetadata(
PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
- return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata);
+ return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(),
+ currentPartitionGroupsMetadata);
}
@Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 39561f3..8da3d2e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -18,10 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
-import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.FetchResult;
import software.amazon.awssdk.services.kinesis.model.Record;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 626c8ea..f96533f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -22,12 +22,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
@@ -56,7 +52,8 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
//Return existing shard metadata
_stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
} else if (currentMetadataMap.containsKey(shard.parentShardId())) {
- KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
+ KinesisShardMetadata kinesisShardMetadata =
+ (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
if (isProcessingFinished(kinesisShardMetadata)) {
//Add child shards for processing since parent has finished
appendShardMetadata(stream, awsRegion, shard);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index ed51f8f..04bf4e6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.pinot.plugin.stream.kinesis;
import java.util.List;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 1d753c3..e24121b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -20,10 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
-import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
//TODO: Implement shardId as Array and have unique id
public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
@@ -48,13 +45,13 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
}
@Override
- public KinesisCheckpoint getEndCheckpoint() {
- return _endCheckpoint;
+ public void setStartCheckpoint(Checkpoint startCheckpoint) {
+ _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
}
@Override
- public void setStartCheckpoint(Checkpoint startCheckpoint) {
- _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
+ public KinesisCheckpoint getEndCheckpoint() {
+ return _endCheckpoint;
}
@Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index 6f660f7..f853875 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -20,40 +20,43 @@ package org.apache.pinot.plugin.stream.kinesis; /**
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class KinesisConsumerTest {
+
+ private static final String STREAM_NAME = "kinesis-test";
+ private static final String AWS_REGION = "us-west-2";
+
public static void main(String[] args) {
Map<String, String> props = new HashMap<>();
- props.put("stream", "kinesis-test");
- props.put("aws-region", "us-west-2");
- props.put("max-records-to-fetch", "2000");
- props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER");
-
+ props.put(KinesisConfig.STREAM, STREAM_NAME);
+ props.put(KinesisConfig.AWS_REGION, AWS_REGION);
+ props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
+ props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
KinesisConfig kinesisConfig = new KinesisConfig(props);
-
- KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2");
-
+ KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME, AWS_REGION);
List<Shard> shardList = kinesisConnectionHandler.getShards();
-
- for(Shard shard : shardList) {
+ for (Shard shard : shardList) {
System.out.println("SHARD: " + shard.shardId());
- KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
-
+ KinesisConsumer kinesisConsumer =
+ new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME, AWS_REGION));
+ System.out.println(
+ "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard
+ .sequenceNumberRange().endingSequenceNumber() + " >");
KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
- KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
-
+ KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60 * 1000L);
KinesisRecordsBatch list = fetchResult.getMessages();
int n = list.getMessageCount();
- for (int i=0;i<n;i++) {
+ System.out.println("Found " + n + " messages ");
+ for (int i = 0; i < n; i++) {
System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
}
+ kinesisConsumer.close();
}
+ kinesisConnectionHandler.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org