You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:19 UTC

[incubator-pinot] 29/47: Add test code for kinesis

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 12880126a7a0c5a8b2e1ac3656e64fce7ebe47d5
Author: KKcorps <kh...@gmail.com>
AuthorDate: Tue Dec 22 22:05:02 2020 +0530

    Add test code for kinesis
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 17 +++++--
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 16 +++----
 .../plugin/stream/kinesis/KinesisConsumerTest.java | 54 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index d2e8715..a81d11f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
 
 
 public class KinesisConfig {
-  private final StreamConfig _streamConfig;
+  private final Map<String, String> _props;
+
+  public static final String STREAM = "stream";
   private static final String AWS_REGION = "aws-region";
   private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
 
@@ -30,18 +33,22 @@ public class KinesisConfig {
   private static final String DEFAULT_MAX_RECORDS = "20";
 
   public KinesisConfig(StreamConfig streamConfig) {
-    _streamConfig = streamConfig;
+    _props = streamConfig.getStreamConfigsMap();
+  }
+
+  public KinesisConfig(Map<String, String> props) {
+    _props = props;
   }
 
   public String getStream(){
-    return _streamConfig.getTopicName();
+    return _props.get(STREAM);
   }
 
   public String getAwsRegion(){
-    return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
+    return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
   }
 
   public Integer maxRecordsToFetch(){
-    return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+    return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index d77579e..626c8ea 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -35,28 +35,28 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
-      PartitionGroupMetadataMap partitionGroupMetadataMap) {
+      PartitionGroupMetadataMap currentPartitionGroupMetadataMap) {
     //TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
     //Return metadata only for shards in current metadata
     super(stream, awsRegion);
     KinesisPartitionGroupMetadataMap currentPartitionMeta =
-        (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap;
+        (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap;
     List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
 
     List<Shard> shardList = getShards();
 
-    Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>();
+    Map<String, PartitionGroupMetadata> currentMetadataMap = new HashMap<>();
     for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
       KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
-      metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
+      currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
     }
 
     for (Shard shard : shardList) {
-      if (metadataMap.containsKey(shard.shardId())) {
+      if (currentMetadataMap.containsKey(shard.shardId())) {
         //Return existing shard metadata
-        _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId()));
-      } else if (metadataMap.containsKey(shard.parentShardId())) {
-        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId());
+        _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
+      } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
+        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
         if (isProcessingFinished(kinesisShardMetadata)) {
           //Add child shards for processing since parent has finished
           appendShardMetadata(stream, awsRegion, shard);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
new file mode 100644
index 0000000..f8a0551
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -0,0 +1,54 @@
+package org.apache.pinot.plugin.stream.kinesis; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisConsumerTest {
+  public static void main(String[] args) {
+    Map<String, String> props = new HashMap<>();
+    props.put("stream", "kinesis-test");
+    props.put("aws-region", "us-west-2");
+    props.put("maxRecords", "10");
+
+    KinesisConfig kinesisConfig = new KinesisConfig(props);
+
+    KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2");
+
+    List<Shard> shardList = kinesisConnectionHandler.getShards();
+
+    for(Shard shard : shardList) {
+      KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
+
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
+      KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
+
+      List<Record> list = fetchResult.getMessages();
+
+      System.out.println("SHARD: " + shard.shardId());
+      for (Record record : list) {
+        System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org