You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:36 UTC

[incubator-pinot] 46/47: Add unit tests in Kinesis consumer (#6410)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ab9655b8d42bd1a41920260ff3080070c6863cca
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Mon Feb 1 22:44:37 2021 +0530

    Add unit tests in Kinesis consumer (#6410)
    
    * Bug fixes: Handle connection broken exception
    
    * Add unit tests for partition group consumer
    
    * Add tests for child shards
    
    * Add unit test for kafka consumer
    
    * Refactor: remove unused imports, expand * imports and rename classes
    
    * Fix: enforcer errors
    
    * Remove powermock dependency
    
    * Fix jackson version conflict. Shade jackson dependency
    
    * Remove powermock
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  79 ++++++++-
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |   7 +-
 .../pinot/plugin/stream/kinesis/KinesisConfig.java |   4 +
 .../stream/kinesis/KinesisConnectionHandler.java   |   6 +
 .../plugin/stream/kinesis/KinesisConsumer.java     |  11 ++
 .../kinesis/KinesisStreamMetadataProvider.java     |  10 +-
 ...st.java => KinesisConsumerIntegrationTest.java} |   3 +-
 .../plugin/stream/kinesis/KinesisConsumerTest.java | 187 ++++++++++++++++-----
 .../kinesis/KinesisStreamMetadataProviderTest.java | 156 +++++++++++++++++
 .../pinot/plugin/stream/kinesis/TestUtils.java     |  55 ++++++
 pom.xml                                            |   2 +-
 11 files changed, 468 insertions(+), 52 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 38d4f73..b636b9f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -36,9 +36,25 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
-    <aws.version>2.15.50</aws.version>
+    <aws.version>2.14.28</aws.version>
+    <jackson.version>2.10.4</jackson.version>
+    <netty.version>4.1.42.Final</netty.version>
+    <easymock.version>4.2</easymock.version>
+    <reactive.version>1.0.2</reactive.version>
   </properties>
 
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>software.amazon.awssdk</groupId>
+        <artifactId>bom</artifactId>
+        <version>${aws.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>software.amazon.awssdk</groupId>
@@ -75,38 +91,87 @@
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
-      <version>2.12.0</version>
+      <version>${jackson.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.reactivestreams</groupId>
       <artifactId>reactive-streams</artifactId>
-      <version>1.0.2</version>
+      <version>${reactive.version}</version>
     </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-codec</artifactId>
-      <version>4.1.42.Final</version>
+      <version>${netty.version}</version>
     </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
-      <version>4.1.42.Final</version>
+      <version>${netty.version}</version>
     </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-transport</artifactId>
-      <version>4.1.42.Final</version>
+      <version>${netty.version}</version>
     </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
-      <version>4.1.42.Final</version>
+      <version>${netty.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>${easymock.version}</version>
+      <scope>test</scope>
     </dependency>
+
   </dependencies>
 
+  <profiles>
+    <profile>
+      <id>build-shaded-jar</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>3.2.1</version>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <configuration>
+                  <relocations>
+                    <relocation>
+                      <pattern>com.fasterxml.jackson</pattern>
+                      <shadedPattern>shaded.kinesis.com.fasterxml.jackson</shadedPattern>
+                    </relocation>
+                    <relocation>
+                      <pattern>software.amazon</pattern>
+                      <shadedPattern>shaded.kinesis.software.amazon</shadedPattern>
+                    </relocation>
+                  </relocations>
+                  <transformers>
+                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
+                  </transformers>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
 </project>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index e1f8b05..57904ac 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.pinot.spi.stream.Checkpoint;
@@ -33,6 +34,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 public class KinesisCheckpoint implements StreamPartitionMsgOffset {
   private final Map<String, String> _shardToStartSequenceMap;
+  public static final ObjectMapper objectMapper = new ObjectMapper();
 
   public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
     _shardToStartSequenceMap = shardToStartSequenceMap;
@@ -40,8 +42,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
 
   public KinesisCheckpoint(String checkpointStr)
       throws IOException {
-    _shardToStartSequenceMap = JsonUtils.stringToObject(checkpointStr, new TypeReference<Map<String, String>>() {
-    });
+    _shardToStartSequenceMap = objectMapper.readValue(checkpointStr, new TypeReference<Map<String, String>>(){});
   }
 
   public Map<String, String> getShardToStartSequenceMap() {
@@ -51,7 +52,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
   @Override
   public String serialize() {
     try {
-      return JsonUtils.objectToString(_shardToStartSequenceMap);
+      return objectMapper.writeValueAsString(_shardToStartSequenceMap);
     } catch (JsonProcessingException e) {
       throw new IllegalStateException();
     }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index fbe369f..6e46498 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -59,4 +59,8 @@ public class KinesisConfig {
   public ShardIteratorType getShardIteratorType() {
     return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
   }
+
+  public void setMaxRecordsToFetch(int maxRecordsToFetch){
+    _props.put(MAX_RECORDS_TO_FETCH, String.valueOf(maxRecordsToFetch));
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 61d065e..0686742 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -41,6 +41,12 @@ public class KinesisConnectionHandler {
     createConnection();
   }
 
+  public KinesisConnectionHandler(String stream, String awsRegion, KinesisClient kinesisClient) {
+    _stream = stream;
+    _awsRegion = awsRegion;
+    _kinesisClient = kinesisClient;
+  }
+
   /**
    * Lists all shards of the stream
    */
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 9c56f95..8ad27b4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@@ -61,6 +62,15 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     _executorService = Executors.newSingleThreadExecutor();
   }
 
+  public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) {
+    super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion(), kinesisClient);
+    _kinesisClient = kinesisClient;
+    _stream = kinesisConfig.getStream();
+    _maxRecords = kinesisConfig.maxRecordsToFetch();
+    _shardIteratorType = kinesisConfig.getShardIteratorType();
+    _executorService = Executors.newSingleThreadExecutor();
+  }
+
   /**
    * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint
    */
@@ -175,6 +185,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     if (sequenceNumber != null && _shardIteratorType.toString().contains("SEQUENCE")) {
       requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
     }
+
     return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
   }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 42150a3..7af1df1 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -59,6 +59,14 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
     _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
   }
 
+  public KinesisStreamMetadataProvider(String clientId, StreamConfig streamConfig, KinesisConnectionHandler kinesisConnectionHandler, StreamConsumerFactory streamConsumerFactory) {
+    KinesisConfig kinesisConfig = new KinesisConfig(streamConfig);
+    _kinesisConnectionHandler = kinesisConnectionHandler;
+    _kinesisStreamConsumerFactory = streamConsumerFactory;
+    _clientId = clientId;
+    _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
+  }
+
   @Override
   public int fetchPartitionCount(long timeoutMillis) {
     throw new UnsupportedOperationException();
@@ -83,7 +91,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
     List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
 
     Map<String, Shard> shardIdToShardMap =
-        _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s));
+        _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s, (s1, s2) -> s1));
     Set<String> shardsInCurrent = new HashSet<>();
     Set<String> shardsEnded = new HashSet<>();
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
similarity index 96%
copy from pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
copy to pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
index f9ed779..1e832fa 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java
@@ -22,12 +22,11 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
-public class KinesisConsumerTest {
+public class KinesisConsumerIntegrationTest {
 
   private static final String STREAM_NAME = "kinesis-test";
   private static final String AWS_REGION = "us-west-2";
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index f9ed779..384c512 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -18,51 +18,162 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.easymock.Capture;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
 
 
 public class KinesisConsumerTest {
+  public static final int TIMEOUT = 1000;
+  public static final int NUM_RECORDS = 10;
+  public static final String DUMMY_RECORD_PREFIX = "DUMMY_RECORD-";
+  public static final String PARTITION_KEY_PREFIX = "PARTITION_KEY-";
+  public static final String PLACEHOLDER = "DUMMY";
+
+  private static KinesisConnectionHandler kinesisConnectionHandler;
+  private static StreamConsumerFactory streamConsumerFactory;
+  private static KinesisClient kinesisClient;
+  private List<Record> recordList;
+
+  @BeforeMethod
+  public void setupTest() {
+    kinesisConnectionHandler = createMock(KinesisConnectionHandler.class);
+    kinesisClient = createMock(KinesisClient.class);
+    streamConsumerFactory = createMock(StreamConsumerFactory.class);
+
+    recordList = new ArrayList<>();
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      Record record =
+          Record.builder().data(SdkBytes.fromUtf8String(DUMMY_RECORD_PREFIX + i)).partitionKey(PARTITION_KEY_PREFIX + i)
+              .sequenceNumber(String.valueOf(i + 1)).build();
+      recordList.add(record);
+    }
+  }
+
+  @Test
+  public void testBasicConsumer() {
+    Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
+    Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
+
+    GetRecordsResponse getRecordsResponse =
+        GetRecordsResponse.builder().nextShardIterator(null).records(recordList).build();
+    GetShardIteratorResponse getShardIteratorResponse =
+        GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
+
+    expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
+    expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
+        .anyTimes();
 
-  private static final String STREAM_NAME = "kinesis-test";
-  private static final String AWS_REGION = "us-west-2";
-
-  public static void main(String[] args)
-      throws IOException {
-    Map<String, String> props = new HashMap<>();
-    props.put(KinesisConfig.STREAM, STREAM_NAME);
-    props.put(KinesisConfig.AWS_REGION, AWS_REGION);
-    props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
-    props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
-    KinesisConfig kinesisConfig = new KinesisConfig(props);
-    KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME, AWS_REGION);
-    List<Shard> shardList = kinesisConnectionHandler.getShards();
-    for (Shard shard : shardList) {
-      System.out.println("SHARD: " + shard.shardId());
-
-      KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig);
-      System.out.println(
-          "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard
-              .sequenceNumberRange().endingSequenceNumber() + " >");
-      Map<String, String> shardIdToSeqNumMap = new HashMap<>();
-      shardIdToSeqNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber());
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardIdToSeqNumMap);
-      KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, 60 * 1000);
-      int n = kinesisRecordsBatch.getMessageCount();
-
-      System.out.println("Found " + n + " messages ");
-      for (int i = 0; i < n; i++) {
-        System.out.println(
-            "SEQ-NO: " + kinesisRecordsBatch.getMessageOffsetAtIndex(i) + ", DATA: " + kinesisRecordsBatch
-                .getMessageAtIndex(i));
-      }
-      kinesisConsumer.close();
+    replay(kinesisClient);
+
+    KinesisConsumer kinesisConsumer = new KinesisConsumer(TestUtils.getKinesisConfig(), kinesisClient);
+
+    Map<String, String> shardToSequenceMap = new HashMap<>();
+    shardToSequenceMap.put("0", "1");
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+    KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT);
+
+    Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), NUM_RECORDS);
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i);
     }
-    kinesisConnectionHandler.close();
+
+    Assert.assertFalse(kinesisRecordsBatch.isEndOfPartitionGroup());
+  }
+
+  @Test
+  public void testBasicConsumerWithMaxRecordsLimit() {
+    int maxRecordsLimit = 20;
+    Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
+    Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
+
+    GetRecordsResponse getRecordsResponse =
+        GetRecordsResponse.builder().nextShardIterator(PLACEHOLDER).records(recordList).build();
+    GetShardIteratorResponse getShardIteratorResponse =
+        GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
+
+    expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
+    expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
+        .anyTimes();
+
+    replay(kinesisClient);
+
+    KinesisConfig kinesisConfig = TestUtils.getKinesisConfig();
+    kinesisConfig.setMaxRecordsToFetch(maxRecordsLimit);
+    KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, kinesisClient);
+
+    Map<String, String> shardToSequenceMap = new HashMap<>();
+    shardToSequenceMap.put("0", "1");
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+    KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT);
+
+    Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), maxRecordsLimit);
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i);
+    }
+  }
+
+  @Test
+  public void testBasicConsumerWithChildShard() {
+    int maxRecordsLimit = 20;
+
+    List<ChildShard> shardList = new ArrayList<>();
+    shardList.add(ChildShard.builder().shardId(PLACEHOLDER).parentShards("0").build());
+
+    Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
+    Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
+
+    GetRecordsResponse getRecordsResponse =
+        GetRecordsResponse.builder().nextShardIterator(null).records(recordList).childShards(shardList).build();
+    GetShardIteratorResponse getShardIteratorResponse =
+        GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();
+
+    expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
+    expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
+        .anyTimes();
+
+    replay(kinesisClient);
+
+    KinesisConfig kinesisConfig = TestUtils.getKinesisConfig();
+    kinesisConfig.setMaxRecordsToFetch(maxRecordsLimit);
+    KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, kinesisClient);
+
+    Map<String, String> shardToSequenceMap = new HashMap<>();
+    shardToSequenceMap.put("0", "1");
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+    KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT);
+
+    Assert.assertTrue(kinesisRecordsBatch.isEndOfPartitionGroup());
+    Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), NUM_RECORDS);
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i);
+    }
+  }
+
+  public String baToString(byte[] bytes) {
+    return SdkBytes.fromByteArray(bytes).asUtf8String();
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
new file mode 100644
index 0000000..4845e57
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.captureInt;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class KinesisStreamMetadataProviderTest {
+  private static final String SHARD_ID_0 = "0";
+  private static final String SHARD_ID_1 = "1";
+  public static final String CLIENT_ID = "dummy";
+  public static final int TIMEOUT = 1000;
+
+  private static KinesisConnectionHandler kinesisConnectionHandler;
+  private KinesisStreamMetadataProvider kinesisStreamMetadataProvider;
+  private static StreamConsumerFactory streamConsumerFactory;
+  private static PartitionGroupConsumer partitionGroupConsumer;
+
+  @BeforeMethod
+  public void setupTest() {
+    kinesisConnectionHandler = createMock(KinesisConnectionHandler.class);
+    streamConsumerFactory = createMock(StreamConsumerFactory.class);
+    partitionGroupConsumer = createNiceMock(PartitionGroupConsumer.class);
+    kinesisStreamMetadataProvider =
+        new KinesisStreamMetadataProvider(CLIENT_ID, TestUtils.getStreamConfig(), kinesisConnectionHandler,
+            streamConsumerFactory);
+  }
+
+  @Test
+  public void getPartitionsGroupInfoListTest()
+      throws Exception {
+    Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+    Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+
+    expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
+    replay(kinesisConnectionHandler);
+
+    List<PartitionGroupInfo> result = kinesisStreamMetadataProvider
+        .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), new ArrayList<>(), TIMEOUT);
+
+
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+    Assert.assertEquals(result.get(1).getPartitionGroupId(), 1);
+  }
+
+  @Test
+  public void getPartitionsGroupInfoEndOfShardTest()
+      throws Exception {
+    List<PartitionGroupMetadata> currentPartitionGroupMeta = new ArrayList<>();
+
+    Map<String, String> shardToSequenceMap = new HashMap<>();
+    shardToSequenceMap.put("0", "1");
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+
+    currentPartitionGroupMeta.add(new PartitionGroupMetadata(0, 1, kinesisCheckpoint, kinesisCheckpoint, "CONSUMING"));
+
+    Capture<Checkpoint> checkpointArgs = newCapture(CaptureType.ALL);
+    Capture<PartitionGroupMetadata> partitionGroupMetadataCapture = newCapture(CaptureType.ALL);
+    Capture<Integer> intArguments = newCapture(CaptureType.ALL);
+    Capture<String> stringCapture = newCapture(CaptureType.ALL);
+
+    Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build();
+    Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+    expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
+    expect(streamConsumerFactory
+        .createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture)))
+        .andReturn(partitionGroupConsumer).anyTimes();
+    expect(partitionGroupConsumer
+        .fetchMessages(capture(checkpointArgs), capture(checkpointArgs), captureInt(intArguments)))
+        .andReturn(new KinesisRecordsBatch(new ArrayList<>(), "0", true)).anyTimes();
+
+    replay(kinesisConnectionHandler, streamConsumerFactory, partitionGroupConsumer);
+
+    List<PartitionGroupInfo> result = kinesisStreamMetadataProvider
+        .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), currentPartitionGroupMeta, TIMEOUT);
+
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
+  }
+
+  @Test
+  public void getPartitionsGroupInfoChildShardsest()
+      throws Exception {
+    List<PartitionGroupMetadata> currentPartitionGroupMeta = new ArrayList<>();
+
+    Map<String, String> shardToSequenceMap = new HashMap<>();
+    shardToSequenceMap.put("1", "1");
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap);
+
+    currentPartitionGroupMeta.add(new PartitionGroupMetadata(0, 1, kinesisCheckpoint, kinesisCheckpoint, "CONSUMING"));
+
+    Capture<Checkpoint> checkpointArgs = newCapture(CaptureType.ALL);
+    Capture<PartitionGroupMetadata> partitionGroupMetadataCapture = newCapture(CaptureType.ALL);
+    Capture<Integer> intArguments = newCapture(CaptureType.ALL);
+    Capture<String> stringCapture = newCapture(CaptureType.ALL);
+
+    Shard shard0 = Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
+    Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build();
+
+    expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
+    expect(streamConsumerFactory
+        .createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture)))
+        .andReturn(partitionGroupConsumer).anyTimes();
+    expect(partitionGroupConsumer
+        .fetchMessages(capture(checkpointArgs), capture(checkpointArgs), captureInt(intArguments)))
+        .andReturn(new KinesisRecordsBatch(new ArrayList<>(), "0", true)).anyTimes();
+
+    replay(kinesisConnectionHandler, streamConsumerFactory, partitionGroupConsumer);
+
+    List<PartitionGroupInfo> result = kinesisStreamMetadataProvider
+        .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), currentPartitionGroupMeta, TIMEOUT);
+
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
new file mode 100644
index 0000000..28d02de
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+
+public class TestUtils {
+  private static final String STREAM_NAME = "kinesis-test";
+  private static final String AWS_REGION = "us-west-2";
+
+  public static StreamConfig getStreamConfig() {
+    Map<String, String> props = new HashMap<>();
+    props.put(KinesisConfig.STREAM, STREAM_NAME);
+    props.put(KinesisConfig.AWS_REGION, AWS_REGION);
+    props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
+    props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
+    props.put(StreamConfigProperties.STREAM_TYPE, "kinesis");
+    props.put("stream.kinesis.consumer.type", "lowLevel");
+    props.put("stream.kinesis.topic.name", STREAM_NAME);
+    props.put("stream.kinesis.decoder.class.name", "ABCD");
+    props.put("stream.kinesis.consumer.factory.class.name",
+        "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory");
+    return new StreamConfig("", props);
+  }
+
+  public static KinesisConfig getKinesisConfig() {
+    Map<String, String> props = new HashMap<>();
+    props.put(KinesisConfig.STREAM, STREAM_NAME);
+    props.put(KinesisConfig.AWS_REGION, AWS_REGION);
+    props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
+    props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
+    return new KinesisConfig(props);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 881526b..5676edc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
     <parquet.version>1.8.0</parquet.version>
     <helix.version>0.9.8</helix.version>
     <zkclient.version>0.7</zkclient.version>
-    <jackson.version>2.12.0</jackson.version>
+    <jackson.version>2.9.8</jackson.version>
     <async-http-client.version>1.9.21</async-http-client.version>
     <jersey.version>2.28</jersey.version>
     <grizzly.version>2.4.4</grizzly.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org