You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/03 03:50:26 UTC

[incubator-pinot] branch sharded_consumer_type_support_with_kinesis updated: Implementation fixes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new a341b28  Implementation fixes
a341b28 is described below

commit a341b285016b1b478824de2a721e19d38186089f
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Sat Jan 2 19:49:34 2021 -0800

    Implementation fixes
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  2 +-
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 47 +++++++----
 .../plugin/stream/kinesis/KinesisConsumer.java     | 46 +++++------
 .../stream/kinesis/KinesisConsumerFactory.java     | 39 +++++----
 .../plugin/stream/kinesis/KinesisFetchResult.java  | 44 ----------
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 93 ----------------------
 .../stream/kinesis/KinesisShardMetadata.java       | 71 -----------------
 .../kinesis/KinesisStreamMetadataProvider.java     | 53 ++++++++++++
 .../plugin/stream/kinesis/KinesisConsumerTest.java | 18 +++--
 .../pinot/spi/stream/PartitionGroupMetadata.java   |  3 +
 .../org/apache/pinot/spi/stream/v2/Checkpoint.java | 25 ------
 .../org/apache/pinot/spi/stream/v2/ConsumerV2.java | 24 ------
 .../apache/pinot/spi/stream/v2/FetchResult.java    | 29 -------
 .../spi/stream/v2/PartitionGroupMetadata.java      | 34 --------
 .../spi/stream/v2/PartitionGroupMetadataMap.java   | 30 -------
 .../pinot/spi/stream/v2/SegmentNameGenerator.java  | 25 ------
 .../spi/stream/v2/StreamConsumerFactoryV2.java     | 37 ---------
 pinot-tools/pom.xml                                |  5 ++
 pom.xml                                            |  2 +-
 19 files changed, 148 insertions(+), 479 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 4fce169..38d4f73 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -109,4 +109,4 @@
     </dependency>
   </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index f3a7a49..1b8f86e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -18,38 +18,51 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import org.apache.pinot.spi.stream.v2.Checkpoint;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.utils.JsonUtils;
 
 
 public class KinesisCheckpoint implements Checkpoint {
-  String _sequenceNumber;
-  Boolean _isEndOfPartition = false;
+  private Map<String, String> _shardToStartSequenceMap;
 
-  public KinesisCheckpoint(String sequenceNumber) {
-    _sequenceNumber = sequenceNumber;
+  public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
+    _shardToStartSequenceMap = shardToStartSequenceMap;
   }
 
-  public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) {
-    _sequenceNumber = sequenceNumber;
-    _isEndOfPartition = isEndOfPartition;
+  public KinesisCheckpoint(String checkpointStr)
+      throws IOException {
+    _shardToStartSequenceMap = JsonUtils.stringToObject(checkpointStr, new TypeReference<Map<String, String>>() {
+    });
   }
 
-  @Override
-  public boolean isEndOfPartition() {
-    return _isEndOfPartition;
+  public Map<String, String> getShardToStartSequenceMap() {
+    return _shardToStartSequenceMap;
   }
 
-  public String getSequenceNumber() {
-    return _sequenceNumber;
+  @Override
+  public String serialize() {
+    try {
+      return JsonUtils.objectToString(_shardToStartSequenceMap);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException();
+    }
   }
 
   @Override
-  public byte[] serialize() {
-    return _sequenceNumber.getBytes();
+  public KinesisCheckpoint deserialize(String blob) {
+    try {
+      return new KinesisCheckpoint(blob);
+    } catch (IOException e) {
+      throw new IllegalStateException();
+    }
   }
 
   @Override
-  public KinesisCheckpoint deserialize(byte[] blob) {
-    return new KinesisCheckpoint(new String(blob));
+  public int compareTo(Object o) {
+    return this._shardToStartSequenceMap.values().iterator().next().compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next());
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index fb414f0..8a24208 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -19,14 +19,16 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.spi.stream.v2.Checkpoint;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
@@ -41,28 +43,25 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
-public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
   private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
   String _stream;
   Integer _maxRecords;
-  String _shardId;
   ExecutorService _executorService;
   ShardIteratorType _shardIteratorType;
 
-  public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
+  public KinesisConsumer(KinesisConfig kinesisConfig) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
     _stream = kinesisConfig.getStream();
     _maxRecords = kinesisConfig.maxRecordsToFetch();
-    KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
-    _shardId = kinesisShardMetadata.getShardId();
     _shardIteratorType = kinesisConfig.getShardIteratorType();
     _executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
-  public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+  public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout) {
     List<Record> recordList = new ArrayList<>();
-    Future<KinesisFetchResult> kinesisFetchResultFuture =
+    Future<KinesisRecordsBatch> kinesisFetchResultFuture =
         _executorService.submit(() -> getResult(start, end, recordList));
 
     try {
@@ -72,7 +71,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     }
   }
 
-  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end, List<Record> recordList) {
+  private KinesisRecordsBatch getResult(Checkpoint start, Checkpoint end, List<Record> recordList) {
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
     try {
@@ -81,13 +80,14 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
         createConnection();
       }
 
-      String shardIterator = getShardIterator(kinesisStartCheckpoint.getSequenceNumber());
+      Map.Entry<String, String> next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next();
+      String shardIterator = getShardIterator(next.getKey(), next.getValue());
 
       String kinesisEndSequenceNumber = null;
 
       if (end != null) {
         KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
-        kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+        kinesisEndSequenceNumber = kinesisEndCheckpoint.getShardToStartSequenceMap().values().iterator().next();
       }
 
       String nextStartSequenceNumber = null;
@@ -125,10 +125,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       }
 
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard);
-      KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
-
-      return kinesisFetchResult;
+      return new KinesisRecordsBatch(recordList);
     } catch (ProvisionedThroughputExceededException e) {
       LOG.warn("The request rate for the stream is too high", e);
       return handleException(kinesisStartCheckpoint, recordList);
@@ -149,21 +146,22 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     }
   }
 
-  private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
+  private KinesisRecordsBatch handleException(KinesisCheckpoint start, List<Record> recordList) {
     if (recordList.size() > 0) {
       String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
-      return new KinesisFetchResult(kinesisCheckpoint, recordList);
+      Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap());
+      newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
+      return new KinesisRecordsBatch(recordList);
     } else {
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
-      return new KinesisFetchResult(kinesisCheckpoint, recordList);
+      return new KinesisRecordsBatch(recordList);
+
     }
   }
 
-  public String getShardIterator(String sequenceNumber) {
+  public String getShardIterator(String shardId, String sequenceNumber) {
 
     GetShardIteratorRequest.Builder requestBuilder =
-        GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(_shardIteratorType);
+        GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType);
 
     if (sequenceNumber != null) {
       requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 9bb4d0c..aa90812 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,36 +18,41 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import org.apache.pinot.spi.stream.v2.SegmentNameGenerator;
-import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
+import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 
 
-public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
-  private KinesisConfig _kinesisConfig;
+public class KinesisConsumerFactory extends StreamConsumerFactory {
 
   @Override
-  public void init(StreamConfig streamConfig) {
-    _kinesisConfig = new KinesisConfig(streamConfig);
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public PartitionGroupMetadataMap getPartitionGroupsMetadata(
-      PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(),
-        currentPartitionGroupsMetadata);
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
+      String groupId) {
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public SegmentNameGenerator getSegmentNameGenerator() {
+  public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
     return null;
   }
 
   @Override
-  public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_kinesisConfig, metadata);
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new KinesisStreamMetadataProvider(clientId, new KinesisConfig(_streamConfig));
   }
+
+  @Override
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+    return new KinesisConsumer(new KinesisConfig(_streamConfig));
+  }
+
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
deleted file mode 100644
index 8da3d2e..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kinesis;
-
-import java.util.List;
-import org.apache.pinot.spi.stream.v2.FetchResult;
-import software.amazon.awssdk.services.kinesis.model.Record;
-
-
-public class KinesisFetchResult implements FetchResult<byte[]> {
-  private final KinesisCheckpoint _kinesisCheckpoint;
-  private final List<Record> _recordList;
-
-  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList) {
-    _kinesisCheckpoint = kinesisCheckpoint;
-    _recordList = recordList;
-  }
-
-  @Override
-  public KinesisCheckpoint getLastCheckpoint() {
-    return _kinesisCheckpoint;
-  }
-
-  @Override
-  public KinesisRecordsBatch getMessages() {
-    return new KinesisRecordsBatch(_recordList);
-  }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
deleted file mode 100644
index f96533f..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kinesis;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import software.amazon.awssdk.services.kinesis.model.Shard;
-
-
-public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
-  private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
-
-  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
-      PartitionGroupMetadataMap currentPartitionGroupMetadataMap) {
-    //TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
-    //Return metadata only for shards in current metadata
-    super(stream, awsRegion);
-    KinesisPartitionGroupMetadataMap currentPartitionMeta =
-        (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap;
-    List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
-
-    List<Shard> shardList = getShards();
-
-    Map<String, PartitionGroupMetadata> currentMetadataMap = new HashMap<>();
-    for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
-      KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
-      currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
-    }
-
-    for (Shard shard : shardList) {
-      if (currentMetadataMap.containsKey(shard.shardId())) {
-        //Return existing shard metadata
-        _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
-      } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
-        KinesisShardMetadata kinesisShardMetadata =
-            (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
-        if (isProcessingFinished(kinesisShardMetadata)) {
-          //Add child shards for processing since parent has finished
-          appendShardMetadata(stream, awsRegion, shard);
-        } else {
-          //Do not process this shard unless the parent shard is finished or expired
-        }
-      } else {
-        //This is a new shard with no parents. We can start processing this shard.
-        appendShardMetadata(stream, awsRegion, shard);
-      }
-    }
-  }
-
-  private boolean isProcessingFinished(KinesisShardMetadata kinesisShardMetadata) {
-    return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null && kinesisShardMetadata
-        .getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber());
-  }
-
-  private void appendShardMetadata(String stream, String awsRegion, Shard shard) {
-    String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
-    String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
-    KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
-    shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
-    shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
-    _stringPartitionGroupMetadataIndex.add(shardMetadata);
-  }
-
-  @Override
-  public List<PartitionGroupMetadata> getMetadataList() {
-    return _stringPartitionGroupMetadataIndex;
-  }
-
-  @Override
-  public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
-    return _stringPartitionGroupMetadataIndex.get(index);
-  }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
deleted file mode 100644
index e24121b..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kinesis;
-
-import org.apache.pinot.spi.stream.v2.Checkpoint;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-
-
-//TODO: Implement shardId as Array and have unique id
-public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
-  String _shardId;
-  KinesisCheckpoint _startCheckpoint;
-  KinesisCheckpoint _endCheckpoint;
-
-  public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
-    super(streamName, awsRegion);
-    _startCheckpoint = null;
-    _endCheckpoint = null;
-    _shardId = shardId;
-  }
-
-  public String getShardId() {
-    return _shardId;
-  }
-
-  @Override
-  public KinesisCheckpoint getStartCheckpoint() {
-    return _startCheckpoint;
-  }
-
-  @Override
-  public void setStartCheckpoint(Checkpoint startCheckpoint) {
-    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
-  }
-
-  @Override
-  public KinesisCheckpoint getEndCheckpoint() {
-    return _endCheckpoint;
-  }
-
-  @Override
-  public void setEndCheckpoint(Checkpoint endCheckpoint) {
-    _endCheckpoint = (KinesisCheckpoint) endCheckpoint;
-  }
-
-  @Override
-  public byte[] serialize() {
-    return new byte[0];
-  }
-
-  @Override
-  public KinesisShardMetadata deserialize(byte[] blob) {
-    return null;
-  }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
new file mode 100644
index 0000000..ba9d2b6
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -0,0 +1,53 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
+  private final KinesisConfig _kinesisConfig;
+  private KinesisConnectionHandler _kinesisConnectionHandler;
+
+  public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) {
+    _kinesisConfig = kinesisConfig;
+    _kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {
+    return 0;
+  }
+
+  @Override
+  public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
+      throws TimeoutException {
+    return 0;
+  }
+
+  @Override
+  public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
+      throws TimeoutException {
+    List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
+    List<Shard> shards = _kinesisConnectionHandler.getShards();
+    for (Shard shard : shards) {
+      partitionGroupInfos.add(new PartitionGroupInfo(shard.shardId().hashCode(), shard.sequenceNumberRange().startingSequenceNumber()));
+    }
+    return partitionGroupInfos;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index f853875..57baae9 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -17,9 +17,11 @@ package org.apache.pinot.plugin.stream.kinesis; /**
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
@@ -29,7 +31,8 @@ public class KinesisConsumerTest {
   private static final String STREAM_NAME = "kinesis-test";
   private static final String AWS_REGION = "us-west-2";
 
-  public static void main(String[] args) {
+  public static void main(String[] args)
+      throws IOException {
     Map<String, String> props = new HashMap<>();
     props.put(KinesisConfig.STREAM, STREAM_NAME);
     props.put(KinesisConfig.AWS_REGION, AWS_REGION);
@@ -42,18 +45,19 @@ public class KinesisConsumerTest {
       System.out.println("SHARD: " + shard.shardId());
 
       KinesisConsumer kinesisConsumer =
-          new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME, AWS_REGION));
+          new KinesisConsumer(kinesisConfig);
       System.out.println(
           "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard
               .sequenceNumberRange().endingSequenceNumber() + " >");
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
-      KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60 * 1000L);
-      KinesisRecordsBatch list = fetchResult.getMessages();
-      int n = list.getMessageCount();
+      Map<String, String> shardIdToSeqNumMap = new HashMap<>();
+      shardIdToSeqNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber());
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardIdToSeqNumMap);
+      KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, 60 * 1000);
+      int n = kinesisRecordsBatch.getMessageCount();
 
       System.out.println("Found " + n + " messages ");
       for (int i = 0; i < n; i++) {
-        System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
+        System.out.println("SEQ-NO: " + kinesisRecordsBatch.getMessageOffsetAtIndex(i) + ", DATA: " + kinesisRecordsBatch.getMessageAtIndex(i));
       }
       kinesisConsumer.close();
     }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index f662d99..7c4e3ef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.List;
+
+
 public class PartitionGroupMetadata {
 
   // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
deleted file mode 100644
index 0195684..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface Checkpoint {
-  boolean isEndOfPartition();
-  byte[] serialize();
-  Checkpoint deserialize(byte[] blob);
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
deleted file mode 100644
index 48b387d..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface ConsumerV2 {
-  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
-}
-
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
deleted file mode 100644
index 2188ac9..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-import java.util.List;
-import org.apache.pinot.spi.stream.MessageBatch;
-
-
-public interface FetchResult<T> {
-  Checkpoint getLastCheckpoint();
-  MessageBatch<T> getMessages();
-}
-
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
deleted file mode 100644
index d7c44d7..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface PartitionGroupMetadata {
-  Checkpoint getStartCheckpoint(); // similar to getStartOffset
-
-  Checkpoint getEndCheckpoint(); // similar to getEndOffset
-
-  void setStartCheckpoint(Checkpoint startCheckpoint);
-
-  void setEndCheckpoint(Checkpoint endCheckpoint);
-
-  byte[] serialize();
-
-  PartitionGroupMetadata deserialize(byte[] blob);
-}
-
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
deleted file mode 100644
index ba37767..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-import java.util.List;
-
-
-public interface PartitionGroupMetadataMap {
-
-  List<PartitionGroupMetadata> getMetadataList();
-
-  PartitionGroupMetadata getPartitionGroupMetadata(int index);
-
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
deleted file mode 100644
index 6e65b25..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface SegmentNameGenerator {
-  // generates a unique name for a partition group based on the metadata
-    String generateSegmentName(PartitionGroupMetadata metadata);
-
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
deleted file mode 100644
index 9e671aa..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-import java.util.Map;
-import org.apache.pinot.spi.stream.StreamConfig;
-
-
-public interface StreamConsumerFactoryV2 {
-  void init(StreamConfig streamConfig);
-
-  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
-  PartitionGroupMetadataMap getPartitionGroupsMetadata(PartitionGroupMetadataMap currentPartitionGroupsMetadata);
-
-  // creates a name generator which generates segment name for a partition group
-  SegmentNameGenerator getSegmentNameGenerator();
-
-  // creates a consumer which consumes from a partition group
-  ConsumerV2 createConsumer(PartitionGroupMetadata metadata);
-
-}
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index cf3fe70..e3c21ef 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -87,6 +87,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-kinesis</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-kafka-${kafka.version}</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
diff --git a/pom.xml b/pom.xml
index 79dabf7..237b5c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
     <parquet.version>1.8.0</parquet.version>
     <helix.version>0.9.8</helix.version>
     <zkclient.version>0.7</zkclient.version>
-    <jackson.version>2.9.8</jackson.version>
+    <jackson.version>2.12.0</jackson.version>
     <async-http-client.version>1.9.21</async-http-client.version>
     <jersey.version>2.28</jersey.version>
     <grizzly.version>2.4.4</grizzly.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org