You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:18 UTC

[incubator-pinot] 28/47: Change shard metadata logic

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 26085a88ac34d6b00737bbd68b8ff409ad281467
Author: KKcorps <kh...@gmail.com>
AuthorDate: Tue Dec 22 20:42:05 2020 +0530

    Change shard metadata logic
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  2 +-
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  2 +-
 .../stream/kinesis/KinesisConsumerFactory.java     |  2 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  2 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 55 +++++++++++++++++++---
 .../stream/kinesis/KinesisShardMetadata.java       | 16 +++----
 6 files changed, 60 insertions(+), 19 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 1abc536..0c9ae0b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -35,7 +35,7 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
-    <aws.version>2.13.46</aws.version>
+    <aws.version>2.15.50</aws.version>
   </properties>
 
   <dependencies>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 450173c..8de95e2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -38,7 +38,7 @@ public class KinesisCheckpoint implements Checkpoint {
   }
 
   @Override
-  public Checkpoint deserialize(byte[] blob) {
+  public KinesisCheckpoint deserialize(byte[] blob) {
     //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index da39aab..acac1fb 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -38,7 +38,7 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion());
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata);
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 52dab66..aedcd5d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -35,7 +35,7 @@ public class KinesisFetchResult implements FetchResult<Record> {
   }
 
   @Override
-  public Checkpoint getLastCheckpoint() {
+  public KinesisCheckpoint getLastCheckpoint() {
     return _kinesisCheckpoint;
   }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 9a34004..d77579e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -19,7 +19,11 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -30,19 +34,56 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
-  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) {
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
+      PartitionGroupMetadataMap partitionGroupMetadataMap) {
+    //TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
+    //Return metadata only for shards in current metadata
     super(stream, awsRegion);
+    KinesisPartitionGroupMetadataMap currentPartitionMeta =
+        (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap;
+    List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
+
     List<Shard> shardList = getShards();
+
+    Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>();
+    for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
+      KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
+      metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
+    }
+
     for (Shard shard : shardList) {
-      String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
-      String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
-      KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
-      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
-      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
-      _stringPartitionGroupMetadataIndex.add(shardMetadata);
+      if (metadataMap.containsKey(shard.shardId())) {
+        //Return existing shard metadata
+        _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId()));
+      } else if (metadataMap.containsKey(shard.parentShardId())) {
+        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId());
+        if (isProcessingFinished(kinesisShardMetadata)) {
+          //Add child shards for processing since parent has finished
+          appendShardMetadata(stream, awsRegion, shard);
+        } else {
+          //Do not process this shard unless the parent shard is finished or expired
+        }
+      } else {
+        //This is a new shard with no parents. We can start processing this shard.
+        appendShardMetadata(stream, awsRegion, shard);
+      }
     }
   }
 
+  private boolean isProcessingFinished(KinesisShardMetadata kinesisShardMetadata) {
+    return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null && kinesisShardMetadata
+        .getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber());
+  }
+
+  private void appendShardMetadata(String stream, String awsRegion, Shard shard) {
+    String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
+    String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+    KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
+    shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
+    shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+    _stringPartitionGroupMetadataIndex.add(shardMetadata);
+  }
+
   @Override
   public List<PartitionGroupMetadata> getMetadataList() {
     return _stringPartitionGroupMetadataIndex;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 8141cd4..327e034 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -25,11 +25,11 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-
+//TODO: Implement shardId as Array
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
   String _shardId;
-  Checkpoint _startCheckpoint;
-  Checkpoint _endCheckpoint;
+  KinesisCheckpoint _startCheckpoint;
+  KinesisCheckpoint _endCheckpoint;
 
   public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
     super(streamName, awsRegion);
@@ -43,23 +43,23 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public Checkpoint getStartCheckpoint() {
+  public KinesisCheckpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 
   @Override
-  public Checkpoint getEndCheckpoint() {
+  public KinesisCheckpoint getEndCheckpoint() {
     return _endCheckpoint;
   }
 
   @Override
   public void setStartCheckpoint(Checkpoint startCheckpoint) {
-    _startCheckpoint = startCheckpoint;
+    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
   }
 
   @Override
   public void setEndCheckpoint(Checkpoint endCheckpoint) {
-    _endCheckpoint = endCheckpoint;
+    _endCheckpoint = (KinesisCheckpoint) endCheckpoint;
   }
 
   @Override
@@ -68,7 +68,7 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public PartitionGroupMetadata deserialize(byte[] blob) {
+  public KinesisShardMetadata deserialize(byte[] blob) {
     return null;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org