You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:19 UTC
[incubator-pinot] 29/47: Add test code for kinesis
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 12880126a7a0c5a8b2e1ac3656e64fce7ebe47d5
Author: KKcorps <kh...@gmail.com>
AuthorDate: Tue Dec 22 22:05:02 2020 +0530
Add test code for kinesis
---
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 17 +++++--
.../kinesis/KinesisPartitionGroupMetadataMap.java | 16 +++----
.../plugin/stream/kinesis/KinesisConsumerTest.java | 54 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 13 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index d2e8715..a81d11f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -18,11 +18,14 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
+import java.util.Map;
import org.apache.pinot.spi.stream.StreamConfig;
public class KinesisConfig {
- private final StreamConfig _streamConfig;
+ private final Map<String, String> _props;
+
+ public static final String STREAM = "stream";
private static final String AWS_REGION = "aws-region";
private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
@@ -30,18 +33,22 @@ public class KinesisConfig {
private static final String DEFAULT_MAX_RECORDS = "20";
public KinesisConfig(StreamConfig streamConfig) {
- _streamConfig = streamConfig;
+ _props = streamConfig.getStreamConfigsMap();
+ }
+
+ public KinesisConfig(Map<String, String> props) {
+ _props = props;
}
public String getStream(){
- return _streamConfig.getTopicName();
+ return _props.get(STREAM);
}
public String getAwsRegion(){
- return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
+ return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
}
public Integer maxRecordsToFetch(){
- return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+ return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index d77579e..626c8ea 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -35,28 +35,28 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
- PartitionGroupMetadataMap partitionGroupMetadataMap) {
+ PartitionGroupMetadataMap currentPartitionGroupMetadataMap) {
//TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
//Return metadata only for shards in current metadata
super(stream, awsRegion);
KinesisPartitionGroupMetadataMap currentPartitionMeta =
- (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap;
+ (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap;
List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
List<Shard> shardList = getShards();
- Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>();
+ Map<String, PartitionGroupMetadata> currentMetadataMap = new HashMap<>();
for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
- metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
+ currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
}
for (Shard shard : shardList) {
- if (metadataMap.containsKey(shard.shardId())) {
+ if (currentMetadataMap.containsKey(shard.shardId())) {
//Return existing shard metadata
- _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId()));
- } else if (metadataMap.containsKey(shard.parentShardId())) {
- KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId());
+ _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
+ } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
+ KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
if (isProcessingFinished(kinesisShardMetadata)) {
//Add child shards for processing since parent has finished
appendShardMetadata(stream, awsRegion, shard);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
new file mode 100644
index 0000000..f8a0551
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -0,0 +1,54 @@
+package org.apache.pinot.plugin.stream.kinesis; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisConsumerTest {
+ public static void main(String[] args) {
+ Map<String, String> props = new HashMap<>();
+ props.put("stream", "kinesis-test");
+ props.put("aws-region", "us-west-2");
+ props.put("maxRecords", "10");
+
+ KinesisConfig kinesisConfig = new KinesisConfig(props);
+
+ KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2");
+
+ List<Shard> shardList = kinesisConnectionHandler.getShards();
+
+ for(Shard shard : shardList) {
+ KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
+
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
+ KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
+
+ List<Record> list = fetchResult.getMessages();
+
+ System.out.println("SHARD: " + shard.shardId());
+ for (Record record : list) {
+ System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org