You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:26 UTC
[incubator-pinot] 36/47: Fix offsets in StreamMetadataProvider impl
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9c33895266f71512c4e8a0858d8f6eaa783faf44
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Jan 4 11:59:18 2021 -0800
Fix offsets in StreamMetadataProvider impl
---
.../org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java | 6 +++++-
.../plugin/stream/kinesis/KinesisStreamMetadataProvider.java | 8 +++++++-
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index d42f899..517f8c0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.Map;
-import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -54,6 +53,11 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
}
@Override
+ public String toString() {
+ return serialize();
+ }
+
+ @Override
public KinesisCheckpoint deserialize(String blob) {
try {
return new KinesisCheckpoint(blob);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index ba9d2b6..f86d06c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -2,7 +2,9 @@ package org.apache.pinot.plugin.stream.kinesis;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -40,7 +42,11 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
List<Shard> shards = _kinesisConnectionHandler.getShards();
for (Shard shard : shards) {
- partitionGroupInfos.add(new PartitionGroupInfo(shard.shardId().hashCode(), shard.sequenceNumberRange().startingSequenceNumber()));
+ Map<String, String> shardToSequenceNumMap = new HashMap<>();
+ shardToSequenceNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber());
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceNumMap);
+ partitionGroupInfos
+ .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()), kinesisCheckpoint.serialize()));
}
return partitionGroupInfos;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org