You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:26 UTC

[incubator-pinot] 36/47: Fix offsets in StreamMetadataProvider impl

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9c33895266f71512c4e8a0858d8f6eaa783faf44
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Jan 4 11:59:18 2021 -0800

    Fix offsets in StreamMetadataProvider impl
---
 .../org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java | 6 +++++-
 .../plugin/stream/kinesis/KinesisStreamMetadataProvider.java      | 8 +++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index d42f899..517f8c0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Map;
-import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.JsonUtils;
 
@@ -54,6 +53,11 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
   }
 
   @Override
+  public String toString() {
+    return serialize();
+  }
+
+  @Override
   public KinesisCheckpoint deserialize(String blob) {
     try {
       return new KinesisCheckpoint(blob);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index ba9d2b6..f86d06c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -2,7 +2,9 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -40,7 +42,11 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
     List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
     List<Shard> shards = _kinesisConnectionHandler.getShards();
     for (Shard shard : shards) {
-      partitionGroupInfos.add(new PartitionGroupInfo(shard.shardId().hashCode(), shard.sequenceNumberRange().startingSequenceNumber()));
+      Map<String, String> shardToSequenceNumMap = new HashMap<>();
+      shardToSequenceNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber());
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceNumMap);
+      partitionGroupInfos
+          .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()), kinesisCheckpoint.serialize()));
     }
     return partitionGroupInfos;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org