You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:36 UTC
[incubator-pinot] 46/47: Add unit tests in Kinesis consumer (#6410)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ab9655b8d42bd1a41920260ff3080070c6863cca
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Mon Feb 1 22:44:37 2021 +0530
Add unit tests in Kinesis consumer (#6410)
* Bug fixes: Handle connection broken exception
* Add unit tests for partition group consumer
* Add tests for child shards
* Add unit test for kafka consumer
* Refactor: remove unused imports, expand * imports and rename classes
* Fix: enforcer errors
* Remove powermock dependency
* Fix jackson version conflict. Shade jackson dependency
* Remove powermock
---
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 79 ++++++++-
.../plugin/stream/kinesis/KinesisCheckpoint.java | 7 +-
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 4 +
.../stream/kinesis/KinesisConnectionHandler.java | 6 +
.../plugin/stream/kinesis/KinesisConsumer.java | 11 ++
.../kinesis/KinesisStreamMetadataProvider.java | 10 +-
...st.java => KinesisConsumerIntegrationTest.java} | 3 +-
.../plugin/stream/kinesis/KinesisConsumerTest.java | 187 ++++++++++++++++-----
.../kinesis/KinesisStreamMetadataProviderTest.java | 156 +++++++++++++++++
.../pinot/plugin/stream/kinesis/TestUtils.java | 55 ++++++
pom.xml | 2 +-
11 files changed, 468 insertions(+), 52 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 38d4f73..b636b9f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -36,9 +36,25 @@
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<phase.prop>package</phase.prop>
- <aws.version>2.15.50</aws.version>
+ <aws.version>2.14.28</aws.version>
+ <jackson.version>2.10.4</jackson.version>
+ <netty.version>4.1.42.Final</netty.version>
+ <easymock.version>4.2</easymock.version>
+ <reactive.version>1.0.2</reactive.version>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>bom</artifactId>
+ <version>${aws.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
@@ -75,38 +91,87 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
- <version>2.12.0</version>
+ <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
- <version>1.0.2</version>
+ <version>${reactive.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
- <version>4.1.42.Final</version>
+ <version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
- <version>4.1.42.Final</version>
+ <version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
- <version>4.1.42.Final</version>
+ <version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
- <version>4.1.42.Final</version>
+ <version>${netty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>${easymock.version}</version>
+ <scope>test</scope>
</dependency>
+
</dependencies>
+ <profiles>
+ <profile>
+ <id>build-shaded-jar</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>shaded.kinesis.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>software.amazon</pattern>
+ <shadedPattern>shaded.kinesis.software.amazon</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
</project>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index e1f8b05..57904ac 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.spi.stream.Checkpoint;
@@ -33,6 +34,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
*/
public class KinesisCheckpoint implements StreamPartitionMsgOffset {
private final Map<String, String> _shardToStartSequenceMap;
+ public static final ObjectMapper objectMapper = new ObjectMapper();
public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
_shardToStartSequenceMap = shardToStartSequenceMap;
@@ -40,8 +42,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
public KinesisCheckpoint(String checkpointStr)
throws IOException {
- _shardToStartSequenceMap = JsonUtils.stringToObject(checkpointStr, new TypeReference<Map<String, String>>() {
- });
+ _shardToStartSequenceMap = objectMapper.readValue(checkpointStr, new TypeReference<Map<String, String>>(){});
}
public Map<String, String> getShardToStartSequenceMap() {
@@ -51,7 +52,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
@Override
public String serialize() {
try {
- return JsonUtils.objectToString(_shardToStartSequenceMap);
+ return objectMapper.writeValueAsString(_shardToStartSequenceMap);
} catch (JsonProcessingException e) {
throw new IllegalStateException();
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index fbe369f..6e46498 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -59,4 +59,8 @@ public class KinesisConfig {
public ShardIteratorType getShardIteratorType() {
return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
}
+
+ public void setMaxRecordsToFetch(int maxRecordsToFetch){
+ _props.put(MAX_RECORDS_TO_FETCH, String.valueOf(maxRecordsToFetch));
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 61d065e..0686742 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -41,6 +41,12 @@ public class KinesisConnectionHandler {
createConnection();
}
+ public KinesisConnectionHandler(String stream, String awsRegion, KinesisClient kinesisClient) {
+ _stream = stream;
+ _awsRegion = awsRegion;
+ _kinesisClient = kinesisClient;
+ }
+
/**
* Lists all shards of the stream
*/
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 9c56f95..8ad27b4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@@ -61,6 +62,15 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
_executorService = Executors.newSingleThreadExecutor();
}
+ public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) {
+ super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion(), kinesisClient);
+ _kinesisClient = kinesisClient;
+ _stream = kinesisConfig.getStream();
+ _maxRecords = kinesisConfig.maxRecordsToFetch();
+ _shardIteratorType = kinesisConfig.getShardIteratorType();
+ _executorService = Executors.newSingleThreadExecutor();
+ }
+
/**
* Fetch records from the Kinesis stream between the start and end KinesisCheckpoint
*/
@@ -175,6 +185,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
if (sequenceNumber != null && _shardIteratorType.toString().contains("SEQUENCE")) {
requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
}
+
return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 42150a3..7af1df1 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -59,6 +59,14 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
_fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
}
+ public KinesisStreamMetadataProvider(String clientId, StreamConfig streamConfig, KinesisConnectionHandler kinesisConnectionHandler, StreamConsumerFactory streamConsumerFactory) {
+ KinesisConfig kinesisConfig = new KinesisConfig(streamConfig);
+ _kinesisConnectionHandler = kinesisConnectionHandler;
+ _kinesisStreamConsumerFactory = streamConsumerFactory;
+ _clientId = clientId;
+ _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
+ }
+
@Override
public int fetchPartitionCount(long timeoutMillis) {
throw new UnsupportedOperationException();
@@ -83,7 +91,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
Map<String, Shard> shardIdToShardMap =
- _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s));
+ _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s, (s1, s2) -> s1));
Set<String> shardsInCurrent = new HashSet<>();
Set<String> shardsEnded = new HashSet<>();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
similarity index 96%
copy from pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
copy to pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
index f9ed779..1e832fa 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
@@ -22,12 +22,11 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-public class KinesisConsumerTest {
+public class KinesisConsumerIntegrationTest {
private static final String STREAM_NAME = "kinesis-test";
private static final String AWS_REGION = "us-west-2";
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index f9ed779..384c512 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -18,51 +18,162 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
-import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.easymock.Capture;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
public class KinesisConsumerTest {
+ public static final int TIMEOUT = 1000;
+ public static final int NUM_RECORDS = 10;
+ public static final String DUMMY_RECORD_PREFIX = "DUMMY_RECORD-";
+ public static final String PARTITION_KEY_PREFIX = "PARTITION_KEY-";
+ public static final String PLACEHOLDER = "DUMMY";
+
+ private static KinesisConnectionHandler kinesisConnectionHandler;
+ private static StreamConsumerFactory streamConsumerFactory;
+ private static KinesisClient kinesisClient;
+ private List<Record> recordList;
+
+ @BeforeMethod
+ public void setupTest() {
+ kinesisConnectionHandler = createMock(KinesisConnectionHandler.class);
+ kinesisClient = createMock(KinesisClient.class);
+ streamConsumerFactory = createMock(StreamConsumerFactory.class);
+
+ recordList = new ArrayList<>();
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ Record record =
+ Record.builder().data(SdkBytes.fromUtf8String(DUMMY_RECORD_PREFIX + i)).partitionKey(PARTITION_KEY_PREFIX + i)
+ .sequenceNumber(String.valueOf(i + 1)).build();
+ recordList.add(record);
+ }
+ }
+
+ @Test
+ public void testBasicConsumer() {
+ Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
+ Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
+
+ GetRecordsResponse getRecordsResponse =
+ GetRecordsResponse.builder().nextShardIterator(null).records(recordList).build();
+ GetShardIteratorResponse getShardIteratorResponse =
+ GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
+
+ expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
+ expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
+ .anyTimes();
- private static final String STREAM_NAME = "kinesis-test";
- private static final String AWS_REGION = "us-west-2";
-
- public static void main(String[] args)
- throws IOException {
- Map<String, String> props = new HashMap<>();
- props.put(KinesisConfig.STREAM, STREAM_NAME);
- props.put(KinesisConfig.AWS_REGION, AWS_REGION);
- props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
- props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
- KinesisConfig kinesisConfig = new KinesisConfig(props);
- KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME, AWS_REGION);
- List<Shard> shardList = kinesisConnectionHandler.getShards();
- for (Shard shard : shardList) {
- System.out.println("SHARD: " + shard.shardId());
-
- KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig);
- System.out.println(
- "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard
- .sequenceNumberRange().endingSequenceNumber() + " >");
- Map<String, String> shardIdToSeqNumMap = new HashMap<>();
- shardIdToSeqNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber());
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardIdToSeqNumMap);
- KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, 60 * 1000);
- int n = kinesisRecordsBatch.getMessageCount();
-
- System.out.println("Found " + n + " messages ");
- for (int i = 0; i < n; i++) {
- System.out.println(
- "SEQ-NO: " + kinesisRecordsBatch.getMessageOffsetAtIndex(i) + ", DATA: " + kinesisRecordsBatch
- .getMessageAtIndex(i));
- }
- kinesisConsumer.close();
+ replay(kinesisClient);
+
+ KinesisConsumer kinesisConsumer = new KinesisConsumer(TestUtils.getKinesisConfig(), kinesisClient);
+
+ Map<String, String> shardToSequenceMap = new HashMap<>();
+ shardToSequenceMap.put("0", "1");
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+ KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT);
+
+ Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), NUM_RECORDS);
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i);
}
- kinesisConnectionHandler.close();
+
+ Assert.assertFalse(kinesisRecordsBatch.isEndOfPartitionGroup());
+ }
+
+ @Test
+ public void testBasicConsumerWithMaxRecordsLimit() {
+ int maxRecordsLimit = 20;
+ Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
+ Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
+
+ GetRecordsResponse getRecordsResponse =
+ GetRecordsResponse.builder().nextShardIterator(PLACEHOLDER).records(recordList).build();
+ GetShardIteratorResponse getShardIteratorResponse =
+ GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
+
+ expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
+ expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
+ .anyTimes();
+
+ replay(kinesisClient);
+
+ KinesisConfig kinesisConfig = TestUtils.getKinesisConfig();
+ kinesisConfig.setMaxRecordsToFetch(maxRecordsLimit);
+ KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, kinesisClient);
+
+ Map<String, String> shardToSequenceMap = new HashMap<>();
+ shardToSequenceMap.put("0", "1");
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+ KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT);
+
+ Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), maxRecordsLimit);
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i);
+ }
+ }
+
+ @Test
+ public void testBasicConsumerWithChildShard() {
+ int maxRecordsLimit = 20;
+
+ List<ChildShard> shardList = new ArrayList<>();
+ shardList.add(ChildShard.builder().shardId(PLACEHOLDER).parentShards("0").build());
+
+ Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
+ Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
+
+ GetRecordsResponse getRecordsResponse =
+ GetRecordsResponse.builder().nextShardIterator(null).records(recordList).childShards(shardList).build();
+ GetShardIteratorResponse getShardIteratorResponse =
+ GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
+
+ expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
+ expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
+ .anyTimes();
+
+ replay(kinesisClient);
+
+ KinesisConfig kinesisConfig = TestUtils.getKinesisConfig();
+ kinesisConfig.setMaxRecordsToFetch(maxRecordsLimit);
+ KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, kinesisClient);
+
+ Map<String, String> shardToSequenceMap = new HashMap<>();
+ shardToSequenceMap.put("0", "1");
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+ KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT);
+
+ Assert.assertTrue(kinesisRecordsBatch.isEndOfPartitionGroup());
+ Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), NUM_RECORDS);
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i);
+ }
+ }
+
+ public String baToString(byte[] bytes) {
+ return SdkBytes.fromByteArray(bytes).asUtf8String();
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
new file mode 100644
index 0000000..4845e57
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.captureInt;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class KinesisStreamMetadataProviderTest {
+ private static final String SHARD_ID_0 = "0";
+ private static final String SHARD_ID_1 = "1";
+ public static final String CLIENT_ID = "dummy";
+ public static final int TIMEOUT = 1000;
+
+ private static KinesisConnectionHandler kinesisConnectionHandler;
+ private KinesisStreamMetadataProvider kinesisStreamMetadataProvider;
+ private static StreamConsumerFactory streamConsumerFactory;
+ private static PartitionGroupConsumer partitionGroupConsumer;
+
+ @BeforeMethod
+ public void setupTest() {
+ kinesisConnectionHandler = createMock(KinesisConnectionHandler.class);
+ streamConsumerFactory = createMock(StreamConsumerFactory.class);
+ partitionGroupConsumer = createNiceMock(PartitionGroupConsumer.class);
+ kinesisStreamMetadataProvider =
+ new KinesisStreamMetadataProvider(CLIENT_ID, TestUtils.getStreamConfig(), kinesisConnectionHandler,
+ streamConsumerFactory);
+ }
+
+ @Test
+ public void getPartitionsGroupInfoListTest()
+ throws Exception {
+ Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+ Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+
+ expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
+ replay(kinesisConnectionHandler);
+
+ List<PartitionGroupInfo> result = kinesisStreamMetadataProvider
+ .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), new ArrayList<>(), TIMEOUT);
+
+
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+ Assert.assertEquals(result.get(1).getPartitionGroupId(), 1);
+ }
+
+ @Test
+ public void getPartitionsGroupInfoEndOfShardTest()
+ throws Exception {
+ List<PartitionGroupMetadata> currentPartitionGroupMeta = new ArrayList<>();
+
+ Map<String, String> shardToSequenceMap = new HashMap<>();
+ shardToSequenceMap.put("0", "1");
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+
+ currentPartitionGroupMeta.add(new PartitionGroupMetadata(0, 1, kinesisCheckpoint, kinesisCheckpoint, "CONSUMING"));
+
+ Capture<Checkpoint> checkpointArgs = newCapture(CaptureType.ALL);
+ Capture<PartitionGroupMetadata> partitionGroupMetadataCapture = newCapture(CaptureType.ALL);
+ Capture<Integer> intArguments = newCapture(CaptureType.ALL);
+ Capture<String> stringCapture = newCapture(CaptureType.ALL);
+
+ Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build();
+ Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+ expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
+ expect(streamConsumerFactory
+ .createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture)))
+ .andReturn(partitionGroupConsumer).anyTimes();
+ expect(partitionGroupConsumer
+ .fetchMessages(capture(checkpointArgs), capture(checkpointArgs), captureInt(intArguments)))
+ .andReturn(new KinesisRecordsBatch(new ArrayList<>(), "0", true)).anyTimes();
+
+ replay(kinesisConnectionHandler, streamConsumerFactory, partitionGroupConsumer);
+
+ List<PartitionGroupInfo> result = kinesisStreamMetadataProvider
+ .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), currentPartitionGroupMeta, TIMEOUT);
+
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
+ }
+
+ @Test
+ public void getPartitionsGroupInfoChildShardsest()
+ throws Exception {
+ List<PartitionGroupMetadata> currentPartitionGroupMeta = new ArrayList<>();
+
+ Map<String, String> shardToSequenceMap = new HashMap<>();
+ shardToSequenceMap.put("1", "1");
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+
+ currentPartitionGroupMeta.add(new PartitionGroupMetadata(0, 1, kinesisCheckpoint, kinesisCheckpoint, "CONSUMING"));
+
+ Capture<Checkpoint> checkpointArgs = newCapture(CaptureType.ALL);
+ Capture<PartitionGroupMetadata> partitionGroupMetadataCapture = newCapture(CaptureType.ALL);
+ Capture<Integer> intArguments = newCapture(CaptureType.ALL);
+ Capture<String> stringCapture = newCapture(CaptureType.ALL);
+
+ Shard shard0 = Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+ Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build();
+
+ expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
+ expect(streamConsumerFactory
+ .createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture)))
+ .andReturn(partitionGroupConsumer).anyTimes();
+ expect(partitionGroupConsumer
+ .fetchMessages(capture(checkpointArgs), capture(checkpointArgs), captureInt(intArguments)))
+ .andReturn(new KinesisRecordsBatch(new ArrayList<>(), "0", true)).anyTimes();
+
+ replay(kinesisConnectionHandler, streamConsumerFactory, partitionGroupConsumer);
+
+ List<PartitionGroupInfo> result = kinesisStreamMetadataProvider
+ .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), currentPartitionGroupMeta, TIMEOUT);
+
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
new file mode 100644
index 0000000..28d02de
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+
+public class TestUtils {
+ private static final String STREAM_NAME = "kinesis-test";
+ private static final String AWS_REGION = "us-west-2";
+
+ public static StreamConfig getStreamConfig() {
+ Map<String, String> props = new HashMap<>();
+ props.put(KinesisConfig.STREAM, STREAM_NAME);
+ props.put(KinesisConfig.AWS_REGION, AWS_REGION);
+ props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
+ props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
+ props.put(StreamConfigProperties.STREAM_TYPE, "kinesis");
+ props.put("stream.kinesis.consumer.type", "lowLevel");
+ props.put("stream.kinesis.topic.name", STREAM_NAME);
+ props.put("stream.kinesis.decoder.class.name", "ABCD");
+ props.put("stream.kinesis.consumer.factory.class.name",
+ "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory");
+ return new StreamConfig("", props);
+ }
+
+ public static KinesisConfig getKinesisConfig() {
+ Map<String, String> props = new HashMap<>();
+ props.put(KinesisConfig.STREAM, STREAM_NAME);
+ props.put(KinesisConfig.AWS_REGION, AWS_REGION);
+ props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
+ props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
+ return new KinesisConfig(props);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 881526b..5676edc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
<parquet.version>1.8.0</parquet.version>
<helix.version>0.9.8</helix.version>
<zkclient.version>0.7</zkclient.version>
- <jackson.version>2.12.0</jackson.version>
+ <jackson.version>2.9.8</jackson.version>
<async-http-client.version>1.9.21</async-http-client.version>
<jersey.version>2.28</jersey.version>
<grizzly.version>2.4.4</grizzly.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org