You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:06 UTC

[incubator-pinot] branch sharded_consumer_type_support_with_kinesis created (now 063a06f)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 063a06f  fixing compilation

This branch includes the following new commits:

     new abc6588  Add interfaces for V2 consumers
     new 60195b0  Add initial implementation of Kinesis consumer
     new 1a8a80a  Add PartitionGroupMetdataMap interface
     new 76cfcf1  Add kinesis code to handle offsets
     new 56efa08  Refactor PartitionGroupMetadataMap interface
     new c9cd79c  Refactor kinesis shard metadata interface and add shardId to the metadata
     new ba3d100  Fix consumer code
     new 9567c18  Move shardId out of checkpoint to partition group metadata
     new 2463bd1  Reformat code
     new ee95eeb  Add license headers
     new b0eeec6  Add Kinesis config wrapper
     new 900450d  fetch records with timeout
     new 422688d  Add license header
     new b0a2468  Handle exceptions
     new 00db7a2  Refactor code
     new bb8e08f  Handle closed connections
     new 47b1664  Refactor: get shard iterator methods
     new dbdcaf0  Change shard metadata logic
     new d922695  Add test code for kinesis
     new 203aa08  Handle timeout exception in consumer and make shard iterator type configurable
     new 8d0248a  Add isEndOfPartition check in checkpoints
     new 4da5a2c  Return message batch instead of list in the fetch result
     new 063a06f  fixing compilation

The 23 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 10/23: Add license headers

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ee95eeb21c25eca9847fd2d672115a4c9d6dd7a0
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:39:25 2020 +0530

    Add license headers
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml     | 20 ++++++++++++++++++++
 .../plugin/stream/kinesis/KinesisCheckpoint.java     | 18 ++++++++++++++++++
 .../stream/kinesis/KinesisConnectionHandler.java     | 18 ++++++++++++++++++
 .../pinot/plugin/stream/kinesis/KinesisConsumer.java | 18 ++++++++++++++++++
 .../stream/kinesis/KinesisConsumerFactory.java       | 18 ++++++++++++++++++
 .../plugin/stream/kinesis/KinesisFetchResult.java    | 18 ++++++++++++++++++
 .../kinesis/KinesisPartitionGroupMetadataMap.java    | 18 ++++++++++++++++++
 .../plugin/stream/kinesis/KinesisShardMetadata.java  | 18 ++++++++++++++++++
 .../org/apache/pinot/spi/stream/v2/Checkpoint.java   | 18 ++++++++++++++++++
 .../org/apache/pinot/spi/stream/v2/ConsumerV2.java   | 18 ++++++++++++++++++
 .../org/apache/pinot/spi/stream/v2/FetchResult.java  | 18 ++++++++++++++++++
 .../pinot/spi/stream/v2/PartitionGroupMetadata.java  | 18 ++++++++++++++++++
 .../spi/stream/v2/PartitionGroupMetadataMap.java     | 18 ++++++++++++++++++
 .../pinot/spi/stream/v2/SegmentNameGenerator.java    | 18 ++++++++++++++++++
 .../pinot/spi/stream/v2/StreamConsumerFactoryV2.java | 18 ++++++++++++++++++
 15 files changed, 272 insertions(+)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index f863d17..1abc536 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -1,4 +1,24 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 89043ea..450173c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 554cca6..c41598e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.List;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 1181d14..7670f06 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 5e06a01..931fa07 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.Map;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 2801a09..52dab66 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 05d95de..9a34004 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index e1d23da..8141cd4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
index 0856454..030fe4e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 public interface Checkpoint {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
index afc8d38..48b387d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 public interface ConsumerV2 {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
index 78ae5ef..9d14473 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 import java.util.List;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
index 27c5ce7..d7c44d7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 public interface PartitionGroupMetadata {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
index 702f08a..ba37767 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 import java.util.List;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
index 689c686..6e65b25 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 public interface SegmentNameGenerator {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
index eb7f76e..9e671aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.spi.stream.v2;
 
 import java.util.Map;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/23: Add initial implementation of Kinesis consumer

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 60195b016b07b9d0305cc4e4e8e5a6e424f5b76f
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 10 19:08:41 2020 +0530

    Add initial implementation of Kinesis consumer
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   | 39 ++++++++++++++++++
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 28 +++++++++++++
 .../stream/kinesis/KinesisConnectionHandler.java   | 25 ++++++++++++
 .../plugin/stream/kinesis/KinesisConsumer.java     | 40 ++++++++++++++++++
 .../plugin/stream/kinesis/KinesisFetchResult.java  | 25 ++++++++++++
 .../stream/kinesis/KinesisShardMetadata.java       | 47 ++++++++++++++++++++++
 pinot-plugins/pinot-stream-ingestion/pom.xml       |  1 +
 7 files changed, 205 insertions(+)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
new file mode 100644
index 0000000..97e5eef
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>pinot-stream-ingestion</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>pinot-kinesis</artifactId>
+
+  <properties>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <phase.prop>package</phase.prop>
+    <aws.version>2.15.42</aws.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>kinesis</artifactId>
+      <version>${aws.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-json</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-spi</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
new file mode 100644
index 0000000..a330e78
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -0,0 +1,28 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+
+
+public class KinesisCheckpoint implements Checkpoint {
+  String _shardIterator;
+
+  public KinesisCheckpoint(String shardIterator){
+    _shardIterator = shardIterator;
+  }
+
+  public String getShardIterator() {
+    return _shardIterator;
+  }
+
+  @Override
+  public byte[] serialize() {
+    return _shardIterator.getBytes();
+  }
+
+  @Override
+  public Checkpoint deserialize(byte[] blob) {
+    return new KinesisCheckpoint(new String(blob));
+  }
+
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
new file mode 100644
index 0000000..7ea24c0
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -0,0 +1,25 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+
+public class KinesisConnectionHandler {
+  String _awsRegion = "";
+  KinesisClient _kinesisClient;
+
+  public KinesisConnectionHandler(){
+
+  }
+
+  public KinesisConnectionHandler(String awsRegion){
+    _awsRegion = awsRegion;
+    _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
+  }
+
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
new file mode 100644
index 0000000..251d831
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -0,0 +1,40 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Collections;
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.FetchResult;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+
+  //TODO: Fetch AWS region from  Stream Config.
+  public KinesisConsumer(String awsRegion) {
+    super(awsRegion);
+  }
+
+  @Override
+  public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+    KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+    KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+
+    String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator();
+
+    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+    GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+
+    String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+
+    if(!getRecordsResponse.hasRecords()){
+      return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList());
+    }
+
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator,
+        getRecordsResponse.records());
+
+    return kinesisFetchResult;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
new file mode 100644
index 0000000..5ef4e30
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -0,0 +1,25 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.FetchResult;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisFetchResult implements FetchResult {
+  private String _nextShardIterator;
+
+  public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
+     _nextShardIterator = nextShardIterator;
+  }
+
+  @Override
+  public Checkpoint getLastCheckpoint() {
+    return new KinesisCheckpoint(_nextShardIterator);
+  }
+
+  @Override
+  public byte[] getMessages() {
+    return new byte[0];
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
new file mode 100644
index 0000000..07ede73
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -0,0 +1,47 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+
+
+public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
+  Checkpoint _startCheckpoint;
+  Checkpoint _endCheckpoint;
+
+  public KinesisShardMetadata(String shardId, String streamName) {
+    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+    _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+  }
+
+  @Override
+  public Checkpoint getStartCheckpoint() {
+    return _startCheckpoint;
+  }
+
+  @Override
+  public Checkpoint getEndCheckpoint() {
+    return _endCheckpoint;
+  }
+
+  @Override
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+    _startCheckpoint = startCheckpoint;
+  }
+
+  @Override
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
+    _endCheckpoint = endCheckpoint;
+  }
+
+  @Override
+  public byte[] serialize() {
+    return new byte[0];
+  }
+
+  @Override
+  public PartitionGroupMetadata deserialize(byte[] blob) {
+    return null;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml
index 3a51626..e7b9a46 100644
--- a/pinot-plugins/pinot-stream-ingestion/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pom.xml
@@ -42,6 +42,7 @@
     <module>pinot-kafka-base</module>
     <module>pinot-kafka-0.9</module>
     <module>pinot-kafka-2.0</module>
+    <module>pinot-kinesis</module>
   </modules>
 
 </project>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 22/23: Return message batch instead of list in the fetch result

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4da5a2c733bf36a19bc7480b617a08a8c3e78c65
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 31 11:24:42 2020 +0530

    Return message batch instead of list in the fetch result
---
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  7 +--
 .../plugin/stream/kinesis/KinesisRecordsBatch.java | 52 ++++++++++++++++++++++
 .../plugin/stream/kinesis/KinesisConsumerTest.java |  7 +--
 .../apache/pinot/spi/stream/v2/FetchResult.java    |  3 +-
 4 files changed, 62 insertions(+), 7 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index aedcd5d..39561f3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -20,12 +20,13 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 
-public class KinesisFetchResult implements FetchResult<Record> {
+public class KinesisFetchResult implements FetchResult<byte[]> {
   private final KinesisCheckpoint _kinesisCheckpoint;
   private final List<Record> _recordList;
 
@@ -40,7 +41,7 @@ public class KinesisFetchResult implements FetchResult<Record> {
   }
 
   @Override
-  public List<Record> getMessages() {
-    return _recordList;
+  public KinesisRecordsBatch getMessages() {
+    return new KinesisRecordsBatch(_recordList);
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
new file mode 100644
index 0000000..ed51f8f
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -0,0 +1,52 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisRecordsBatch implements MessageBatch<byte[]> {
+  private List<Record> _recordList;
+
+  public KinesisRecordsBatch(List<Record> recordList) {
+    _recordList = recordList;
+  }
+
+  @Override
+  public int getMessageCount() {
+    return _recordList.size();
+  }
+
+  @Override
+  public byte[] getMessageAtIndex(int index) {
+    return _recordList.get(index).data().asByteArray();
+  }
+
+  @Override
+  public int getMessageOffsetAtIndex(int index) {
+    //TODO: Doesn't translate to offset. Needs to be replaced.
+    return _recordList.get(index).hashCode();
+  }
+
+  @Override
+  public int getMessageLengthAtIndex(int index) {
+    return _recordList.get(index).data().asByteArray().length;
+  }
+
+  @Override
+  public RowMetadata getMetadataAtIndex(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index 17691c4..6f660f7 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -48,10 +48,11 @@ public class KinesisConsumerTest {
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
       KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
 
-      List<Record> list = fetchResult.getMessages();
+      KinesisRecordsBatch list = fetchResult.getMessages();
+      int n = list.getMessageCount();
 
-      for (Record record : list) {
-        System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
+      for (int i=0;i<n;i++) {
+        System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
       }
     }
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
index 9d14473..2188ac9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -19,10 +19,11 @@
 package org.apache.pinot.spi.stream.v2;
 
 import java.util.List;
+import org.apache.pinot.spi.stream.MessageBatch;
 
 
 public interface FetchResult<T> {
   Checkpoint getLastCheckpoint();
-  List<T> getMessages();
+  MessageBatch<T> getMessages();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 08/23: Move shardId out of checkpoint to partition group metadata

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9567c182bfb4d68ec49ebda0484fe859c35562b4
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:25:13 2020 +0530

    Move shardId out of checkpoint to partition group metadata
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java    | 14 ++------------
 .../plugin/stream/kinesis/KinesisConsumer.java      | 21 +++++++++------------
 .../stream/kinesis/KinesisConsumerFactory.java      |  2 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java   |  4 +++-
 .../plugin/stream/kinesis/KinesisShardMetadata.java |  5 ++---
 5 files changed, 17 insertions(+), 29 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 8448665..aa80b17 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -4,11 +4,9 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 
 public class KinesisCheckpoint implements Checkpoint {
-  String _shardId;
   String _sequenceNumber;
 
-  public KinesisCheckpoint(String shardId, String sequenceNumber){
-    _shardId = shardId;
+  public KinesisCheckpoint(String sequenceNumber){
     _sequenceNumber = sequenceNumber;
   }
 
@@ -16,14 +14,6 @@ public class KinesisCheckpoint implements Checkpoint {
     return _sequenceNumber;
   }
 
-  public String getShardId() {
-    return _shardId;
-  }
-
-  public void setShardId(String shardId) {
-    _shardId = shardId;
-  }
-
   @Override
   public byte[] serialize() {
     return _sequenceNumber.getBytes();
@@ -32,7 +22,7 @@ public class KinesisCheckpoint implements Checkpoint {
   @Override
   public Checkpoint deserialize(byte[] blob) {
     //TODO: Implement SerDe
-    return new KinesisCheckpoint("", new String(blob));
+    return new KinesisCheckpoint(new String(blob));
   }
 
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7bc1006..d896d67 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -7,6 +7,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -18,18 +19,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
   String _stream;
   Integer _maxRecords;
+  String _shardId;
 
-  //TODO: Fetch AWS region from  Stream Config.
-  public KinesisConsumer(String stream, String awsRegion) {
-    super(stream, awsRegion);
-    _stream = stream;
-    _maxRecords = 20;
-  }
-
-  public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) {
-    super(stream, awsRegion);
+  public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) {
+    super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global"));
     _stream = stream;
     _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
+    KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
+    _shardId = kinesisShardMetadata.getShardId();
   }
 
   @Override
@@ -73,7 +70,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
     }
 
-    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber);
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
     KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
         recordList);
 
@@ -86,11 +83,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     if(kinesisStartCheckpoint.getSequenceNumber() != null) {
       String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
               .startingSequenceNumber(kinesisStartSequenceNumber).build());
     } else{
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
     }
 
     return getShardIteratorResponse.shardIterator();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index bdbc348..0608118 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
 
   @Override
   public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig);
+    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata);
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index d15804e..700ec3f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -16,9 +16,11 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
     super(stream, awsRegion);
     List<Shard> shardList = getShards();
     for(Shard shard : shardList){
+      String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
-      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber));
+      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
+      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
       _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 693b307..e1d23da 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -15,9 +15,8 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
 
   public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
     super(streamName, awsRegion);
-
-    _startCheckpoint = new KinesisCheckpoint(shardId, null);
-    _endCheckpoint = new KinesisCheckpoint(shardId, null);
+    _startCheckpoint = null;
+    _endCheckpoint = null;
     _shardId = shardId;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 07/23: Fix consumer code

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ba3d10035942154062689598abb943864a3bbad3
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 00:54:16 2020 +0530

    Fix consumer code
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   | 11 +--
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 15 +++-
 .../stream/kinesis/KinesisConnectionHandler.java   | 21 +++++-
 .../plugin/stream/kinesis/KinesisConsumer.java     | 88 +++++++++++++++-------
 .../stream/kinesis/KinesisConsumerFactory.java     |  2 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  8 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java  |  9 +--
 .../stream/kinesis/KinesisShardMetadata.java       | 11 +--
 8 files changed, 112 insertions(+), 53 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 97e5eef..f863d17 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -15,7 +15,7 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
-    <aws.version>2.15.42</aws.version>
+    <aws.version>2.13.46</aws.version>
   </properties>
 
   <dependencies>
@@ -24,12 +24,13 @@
       <artifactId>kinesis</artifactId>
       <version>${aws.version}</version>
     </dependency>
+    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-json</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.12.0</version>
     </dependency>
+
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-spi</artifactId>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 77f790b..8448665 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -4,9 +4,11 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 
 public class KinesisCheckpoint implements Checkpoint {
+  String _shardId;
   String _sequenceNumber;
 
-  public KinesisCheckpoint(String sequenceNumber){
+  public KinesisCheckpoint(String shardId, String sequenceNumber){
+    _shardId = shardId;
     _sequenceNumber = sequenceNumber;
   }
 
@@ -14,6 +16,14 @@ public class KinesisCheckpoint implements Checkpoint {
     return _sequenceNumber;
   }
 
+  public String getShardId() {
+    return _shardId;
+  }
+
+  public void setShardId(String shardId) {
+    _shardId = shardId;
+  }
+
   @Override
   public byte[] serialize() {
     return _sequenceNumber.getBytes();
@@ -21,7 +31,8 @@ public class KinesisCheckpoint implements Checkpoint {
 
   @Override
   public Checkpoint deserialize(byte[] blob) {
-    return new KinesisCheckpoint(new String(blob));
+    //TODO: Implement SerDe
+    return new KinesisCheckpoint("", new String(blob));
   }
 
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 7ea24c0..d8888fa 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -1,25 +1,42 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.List;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 
 public class KinesisConnectionHandler {
-  String _awsRegion = "";
+  private String _stream;
+  private String _awsRegion;
   KinesisClient _kinesisClient;
 
   public KinesisConnectionHandler(){
 
   }
 
-  public KinesisConnectionHandler(String awsRegion){
+  public KinesisConnectionHandler(String stream, String awsRegion){
+    _stream = stream;
     _awsRegion = awsRegion;
     _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
   }
 
+  public List<Shard> getShards(){
+    ListShardsResponse listShardsResponse =  _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
+    return listShardsResponse.shards();
+  }
+
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index dc44079..7bc1006 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -3,6 +3,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
@@ -16,57 +17,86 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
   String _stream;
+  Integer _maxRecords;
 
   //TODO: Fetch AWS region from  Stream Config.
   public KinesisConsumer(String stream, String awsRegion) {
-    super(awsRegion);
+    super(stream, awsRegion);
     _stream = stream;
+    _maxRecords = 20;
+  }
+
+  public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) {
+    super(stream, awsRegion);
+    _stream = stream;
+    _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
   }
 
   @Override
-  public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+  public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
-    KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
 
-    String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
-    String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+    String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
-    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType(
-        ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build());
-
-    String shardIterator = getShardIteratorResponse.shardIterator();
-    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
-    GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+    List<Record> recordList = new ArrayList<>();
 
-    String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+    String kinesisEndSequenceNumber = null;
 
-    //TODO: Get records in the loop and stop when end sequence number is reached or there is an exception.
-    if(!getRecordsResponse.hasRecords()){
-      return new KinesisFetchResult(kinesisStartSequenceNumber, Collections.emptyList());
+    if(end != null) {
+      KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+      kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
     }
 
-    List<Record> recordList = new ArrayList<>();
-    recordList.addAll(getRecordsResponse.records());
+    String nextStartSequenceNumber = null;
+    Long startTimestamp = System.currentTimeMillis();
+
+    while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){
+      GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
+      GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
-    String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
-    while(kinesisNextShardIterator != null){
-      getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build();
-      getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
-      if(getRecordsResponse.hasRecords()){
+      if(getRecordsResponse.records().size() > 0){
         recordList.addAll(getRecordsResponse.records());
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
-      }
 
-      if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ) {
-        nextStartSequenceNumber = kinesisEndSequenceNumber;
-        break;
+        if(kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ){
+          nextStartSequenceNumber = kinesisEndSequenceNumber;
+          break;
+        }
+
+        if(recordList.size() >= _maxRecords) break;
       }
-      kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+
+      shardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(nextStartSequenceNumber,
-        getRecordsResponse.records());
+    if(nextStartSequenceNumber == null && recordList.size() > 0){
+      nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+    }
+
+    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber);
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
+        recordList);
 
     return kinesisFetchResult;
   }
+
+  private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
+    GetShardIteratorResponse getShardIteratorResponse;
+
+    if(kinesisStartCheckpoint.getSequenceNumber() != null) {
+      String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
+      getShardIteratorResponse = _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+              .startingSequenceNumber(kinesisStartSequenceNumber).build());
+    } else{
+      getShardIteratorResponse = _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+    }
+
+    return getShardIteratorResponse.shardIterator();
+  }
+
+  private boolean isTimedOut(Long startTimestamp, Long timeout) {
+    return (System.currentTimeMillis() - startTimestamp) >= timeout;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 6bd1e3a..bdbc348 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
 
   @Override
   public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig);
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index dc8e764..2996b28 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -8,17 +8,17 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 
 
 public class KinesisFetchResult implements FetchResult<Record> {
-  private final String _nextShardIterator;
+  private final KinesisCheckpoint _kinesisCheckpoint;
   private final List<Record> _recordList;
 
-  public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
-     _nextShardIterator = nextShardIterator;
+  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList){
+     _kinesisCheckpoint = kinesisCheckpoint;
      _recordList = recordList;
   }
 
   @Override
   public Checkpoint getLastCheckpoint() {
-    return new KinesisCheckpoint(_nextShardIterator);
+    return _kinesisCheckpoint;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 87f7235..d15804e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -13,13 +13,12 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
-    super(awsRegion);
-    ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
-    List<Shard> shardList = listShardsResponse.shards();
+    super(stream, awsRegion);
+    List<Shard> shardList = getShards();
     for(Shard shard : shardList){
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
-      KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
-      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+      KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
+      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber));
       _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 4a19285..693b307 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
@@ -12,11 +13,11 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   Checkpoint _startCheckpoint;
   Checkpoint _endCheckpoint;
 
-  public KinesisShardMetadata(String shardId, String streamName) {
-    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType(
-        ShardIteratorType.LATEST).streamName(streamName).build());
-    _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
-    _endCheckpoint = null;
+  public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
+    super(streamName, awsRegion);
+
+    _startCheckpoint = new KinesisCheckpoint(shardId, null);
+    _endCheckpoint = new KinesisCheckpoint(shardId, null);
     _shardId = shardId;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 12/23: fetch records with timeout

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 900450da05347fd6ce958053141b701c7e09d0e9
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:44:38 2020 +0530

    fetch records with timeout
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 30 ++++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 96241d4..910b9ee 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -21,6 +21,12 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
@@ -39,6 +45,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   String _stream;
   Integer _maxRecords;
   String _shardId;
+  ExecutorService _executorService;
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -46,10 +53,27 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
+    _executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() {
+      @Override
+      public KinesisFetchResult call()
+          throws Exception {
+        return getResult(start, end);
+      }
+    });
+
+    try {
+      return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
+    } catch(Exception e){
+      return null;
+    }
+  }
+
+  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
     try {
       KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
@@ -65,9 +89,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       }
 
       String nextStartSequenceNumber = null;
-      Long startTimestamp = System.currentTimeMillis();
 
-      while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+      while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
         GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
@@ -119,7 +142,4 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     return getShardIteratorResponse.shardIterator();
   }
 
-  private boolean isTimedOut(Long startTimestamp, Long timeout) {
-    return (System.currentTimeMillis() - startTimestamp) >= timeout;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 11/23: Add Kinesis config wrapper

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b0eeec6926d43a7769e57e83b345e5a52a1c1221
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:35:18 2020 +0530

    Add Kinesis config wrapper
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 29 ++++++++
 .../plugin/stream/kinesis/KinesisConsumer.java     | 78 ++++++++++++----------
 .../stream/kinesis/KinesisConsumerFactory.java     | 10 ++-
 3 files changed, 74 insertions(+), 43 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..01d666a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -0,0 +1,29 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public class KinesisConfig {
+  private final StreamConfig _streamConfig;
+  private static final String AWS_REGION = "aws-region";
+  private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+
+  private static final String DEFAULT_AWS_REGION = "us-central-1";
+  private static final String DEFAULT_MAX_RECORDS = "20";
+
+  public KinesisConfig(StreamConfig streamConfig) {
+    _streamConfig = streamConfig;
+  }
+
+  public String getStream(){
+    return _streamConfig.getTopicName();
+  }
+
+  public String getAwsRegion(){
+    return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
+  }
+
+  public Integer maxRecordsToFetch(){
+    return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7670f06..96241d4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -30,71 +30,75 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.KinesisException;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-
+//TODO: Handle exceptions and timeout
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
   String _stream;
   Integer _maxRecords;
   String _shardId;
 
-  public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) {
-    super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global"));
-    _stream = stream;
-    _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
+  public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
+    super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+    _stream = kinesisConfig.getStream();
+    _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
-    KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+    try {
+      KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
-    String shardIterator = getShardIterator(kinesisStartCheckpoint);
+      String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
-    List<Record> recordList = new ArrayList<>();
+      List<Record> recordList = new ArrayList<>();
 
-    String kinesisEndSequenceNumber = null;
+      String kinesisEndSequenceNumber = null;
 
-    if (end != null) {
-      KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
-      kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
-    }
+      if (end != null) {
+        KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+        kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+      }
 
-    String nextStartSequenceNumber = null;
-    Long startTimestamp = System.currentTimeMillis();
+      String nextStartSequenceNumber = null;
+      Long startTimestamp = System.currentTimeMillis();
 
-    while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
-      GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
-      GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+      while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+        GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
+        GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
-      if (getRecordsResponse.records().size() > 0) {
-        recordList.addAll(getRecordsResponse.records());
-        nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+        if (getRecordsResponse.records().size() > 0) {
+          recordList.addAll(getRecordsResponse.records());
+          nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
 
-        if (kinesisEndSequenceNumber != null
-            && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
-          nextStartSequenceNumber = kinesisEndSequenceNumber;
-          break;
-        }
+          if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
+            nextStartSequenceNumber = kinesisEndSequenceNumber;
+            break;
+          }
 
-        if (recordList.size() >= _maxRecords) {
-          break;
+          if (recordList.size() >= _maxRecords) {
+            break;
+          }
         }
-      }
 
-      shardIterator = getRecordsResponse.nextShardIterator();
-    }
+        shardIterator = getRecordsResponse.nextShardIterator();
+      }
 
-    if (nextStartSequenceNumber == null && recordList.size() > 0) {
-      nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
-    }
+      if (nextStartSequenceNumber == null && recordList.size() > 0) {
+        nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+      }
 
-    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+      KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
-    return kinesisFetchResult;
+      return kinesisFetchResult;
+    }catch (KinesisException e){
+      return null;
+    }
   }
 
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 931fa07..da39aab 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -28,19 +28,17 @@ import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
 
 
 public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
-  private StreamConfig _streamConfig;
-  private final String AWS_REGION = "aws-region";
+  private KinesisConfig _kinesisConfig;
 
   @Override
   public void init(StreamConfig streamConfig) {
-    _streamConfig = streamConfig;
+    _kinesisConfig = new KinesisConfig(streamConfig);
   }
 
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(),
-        _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global"));
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion());
   }
 
   @Override
@@ -50,6 +48,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
 
   @Override
   public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata);
+    return new KinesisConsumer(_kinesisConfig, metadata);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/23: Add interfaces for V2 consumers

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit abc65886a3d09535135f16d21ae388ae95665cb0
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 10 19:08:15 2020 +0530

    Add interfaces for V2 consumers
---
 .../org/apache/pinot/spi/stream/v2/Checkpoint.java    |  6 ++++++
 .../org/apache/pinot/spi/stream/v2/ConsumerV2.java    |  6 ++++++
 .../org/apache/pinot/spi/stream/v2/FetchResult.java   |  7 +++++++
 .../pinot/spi/stream/v2/PartitionGroupMetadata.java   | 16 ++++++++++++++++
 .../pinot/spi/stream/v2/SegmentNameGenerator.java     |  7 +++++++
 .../pinot/spi/stream/v2/StreamConsumerFactoryV2.java  | 19 +++++++++++++++++++
 6 files changed, 61 insertions(+)

diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
new file mode 100644
index 0000000..0856454
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -0,0 +1,6 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface Checkpoint {
+  byte[] serialize();
+  Checkpoint deserialize(byte[] blob);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
new file mode 100644
index 0000000..afc8d38
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
@@ -0,0 +1,6 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface ConsumerV2 {
+  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+}
+
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
new file mode 100644
index 0000000..b490835
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -0,0 +1,7 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface FetchResult {
+  Checkpoint getLastCheckpoint();
+  byte[] getMessages();
+}
+
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
new file mode 100644
index 0000000..27c5ce7
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
@@ -0,0 +1,16 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface PartitionGroupMetadata {
+  Checkpoint getStartCheckpoint(); // similar to getStartOffset
+
+  Checkpoint getEndCheckpoint(); // similar to getEndOffset
+
+  void setStartCheckpoint(Checkpoint startCheckpoint);
+
+  void setEndCheckpoint(Checkpoint endCheckpoint);
+
+  byte[] serialize();
+
+  PartitionGroupMetadata deserialize(byte[] blob);
+}
+
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
new file mode 100644
index 0000000..689c686
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
@@ -0,0 +1,7 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface SegmentNameGenerator {
+  // generates a unique name for a partition group based on the metadata
+    String generateSegmentName(PartitionGroupMetadata metadata);
+
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
new file mode 100644
index 0000000..bd3017d
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
@@ -0,0 +1,19 @@
+package org.apache.pinot.spi.stream.v2;
+
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public interface StreamConsumerFactoryV2 {
+  void init(StreamConfig streamConfig);
+
+  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
+  Map<Long, PartitionGroupMetadata> getPartitionGroupsMetadata(Map<Long, PartitionGroupMetadata> currentPartitionGroupsMetadata);
+
+  // creates a name generator which generates segment name for a partition group
+  SegmentNameGenerator getSegmentNameGenerator();
+
+  // creates a consumer which consumes from a partition group
+  ConsumerV2 createConsumer(PartitionGroupMetadata metadata);
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 23/23: fixing compilation

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 063a06f934c3cd94ffadc9613316b7141c118522
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sat Jan 2 17:14:31 2021 -0800

    fixing compilation
---
 pinot-distribution/pinot-assembly.xml              |  4 ++
 pinot-distribution/pom.xml                         |  4 ++
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   | 64 ++++++++++++++++++++--
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  1 +
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 15 ++---
 .../stream/kinesis/KinesisConnectionHandler.java   | 26 +++------
 .../plugin/stream/kinesis/KinesisConsumer.java     | 50 +++++++----------
 .../stream/kinesis/KinesisConsumerFactory.java     |  4 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  3 -
 .../kinesis/KinesisPartitionGroupMetadataMap.java  |  7 +--
 .../plugin/stream/kinesis/KinesisRecordsBatch.java | 18 ++++++
 .../stream/kinesis/KinesisShardMetadata.java       | 13 ++---
 .../plugin/stream/kinesis/KinesisConsumerTest.java | 16 +++---
 13 files changed, 136 insertions(+), 89 deletions(-)

diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml
index 2dfb36e..de7329f 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -55,6 +55,10 @@
       <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/target/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</source>
       <destName>plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</destName>
     </file>
+    <file>
+      <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/target/pinot-kinesis-${project.version}-shaded.jar</source>
+      <destName>plugins/pinot-stream-ingestion/pinot-kinesis/pinot-kinesis-${project.version}-shaded.jar</destName>
+    </file>
     <!-- End Include Pinot Stream Ingestion Plugins-->
     <!-- Start Include Pinot Batch Ingestion Plugins-->
     <file>
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index 1a3f106..f29cae0 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -86,6 +86,10 @@
         </exclusion>
         <exclusion>
           <groupId>org.apache.pinot</groupId>
+          <artifactId>pinot-kinesis</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-batch-ingestion-standalone</artifactId>
         </exclusion>
         <exclusion>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 0c9ae0b..4fce169 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -19,19 +19,20 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>pinot-stream-ingestion</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>0.7.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>pinot-kinesis</artifactId>
-
+  <name>Pinot Kinesis</name>
+  <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
@@ -43,6 +44,32 @@
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>kinesis</artifactId>
       <version>${aws.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.reactivestreams</groupId>
+          <artifactId>reactive-streams</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
     <dependency>
@@ -52,8 +79,33 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-spi</artifactId>
+      <groupId>org.reactivestreams</groupId>
+      <artifactId>reactive-streams</artifactId>
+      <version>1.0.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec</artifactId>
+      <version>4.1.42.Final</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>4.1.42.Final</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+      <version>4.1.42.Final</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-common</artifactId>
+      <version>4.1.42.Final</version>
     </dependency>
   </dependencies>
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 54e26d0..f3a7a49 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 
+
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
   Boolean _isEndOfPartition = false;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 82fc438..34f3e3d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -24,16 +24,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConfig {
-  private final Map<String, String> _props;
-
   public static final String STREAM = "stream";
+  public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
   private static final String AWS_REGION = "aws-region";
   private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
-  public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
-
   private static final String DEFAULT_AWS_REGION = "us-central-1";
   private static final String DEFAULT_MAX_RECORDS = "20";
   private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST";
+  private final Map<String, String> _props;
 
   public KinesisConfig(StreamConfig streamConfig) {
     _props = streamConfig.getStreamConfigsMap();
@@ -43,20 +41,19 @@ public class KinesisConfig {
     _props = props;
   }
 
-  public String getStream(){
+  public String getStream() {
     return _props.get(STREAM);
   }
 
-  public String getAwsRegion(){
+  public String getAwsRegion() {
     return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
   }
 
-  public Integer maxRecordsToFetch(){
+  public Integer maxRecordsToFetch() {
     return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
   }
 
-  public ShardIteratorType getShardIteratorType(){
+  public ShardIteratorType getShardIteratorType() {
     return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 0cf4787..4d968f6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -19,28 +19,18 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.List;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
 import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 
 public class KinesisConnectionHandler {
+  KinesisClient _kinesisClient;
   private String _stream;
   private String _awsRegion;
-  KinesisClient _kinesisClient;
 
   public KinesisConnectionHandler() {
 
@@ -58,18 +48,18 @@ public class KinesisConnectionHandler {
     return listShardsResponse.shards();
   }
 
-  public void createConnection(){
-    if(_kinesisClient == null) {
-      _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
-          .build();
+  public void createConnection() {
+    if (_kinesisClient == null) {
+      _kinesisClient =
+          KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+              .build();
     }
   }
 
-  public void close(){
-    if(_kinesisClient != null) {
+  public void close() {
+    if (_kinesisClient != null) {
       _kinesisClient.close();
       _kinesisClient = null;
     }
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 336468a..fb414f0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -19,18 +19,13 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.FetchResult;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +33,6 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
 import software.amazon.awssdk.services.kinesis.model.KinesisException;
 import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
@@ -46,13 +40,14 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
+
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+  private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
   String _stream;
   Integer _maxRecords;
   String _shardId;
   ExecutorService _executorService;
   ShardIteratorType _shardIteratorType;
-  private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -67,12 +62,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
     List<Record> recordList = new ArrayList<>();
-    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList));
+    Future<KinesisFetchResult> kinesisFetchResultFuture =
+        _executorService.submit(() -> getResult(start, end, recordList));
 
     try {
       return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
-    } catch(Exception e){
-        return handleException((KinesisCheckpoint) start, recordList);
+    } catch (Exception e) {
+      return handleException((KinesisCheckpoint) start, recordList);
     }
   }
 
@@ -81,7 +77,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
     try {
 
-      if(_kinesisClient == null){
+      if (_kinesisClient == null) {
         createConnection();
       }
 
@@ -105,7 +101,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
           recordList.addAll(getRecordsResponse.records());
           nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
 
-          if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
+          if (kinesisEndSequenceNumber != null
+              && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
             nextStartSequenceNumber = kinesisEndSequenceNumber;
             break;
           }
@@ -115,14 +112,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
           }
         }
 
-        if(getRecordsResponse.hasChildShards()){
+        if (getRecordsResponse.hasChildShards()) {
           //This statement returns true only when end of current shard has reached.
           isEndOfShard = true;
           break;
         }
 
         shardIterator = getRecordsResponse.nextShardIterator();
-
       }
 
       if (nextStartSequenceNumber == null && recordList.size() > 0) {
@@ -133,28 +129,20 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
-    }catch (ProvisionedThroughputExceededException e) {
-      LOG.warn(
-          "The request rate for the stream is too high"
-      , e);
+    } catch (ProvisionedThroughputExceededException e) {
+      LOG.warn("The request rate for the stream is too high", e);
       return handleException(kinesisStartCheckpoint, recordList);
-    }
-    catch (ExpiredIteratorException e) {
-      LOG.warn(
-          "ShardIterator expired while trying to fetch records",e
-      );
+    } catch (ExpiredIteratorException e) {
+      LOG.warn("ShardIterator expired while trying to fetch records", e);
       return handleException(kinesisStartCheckpoint, recordList);
-    }
-    catch (ResourceNotFoundException | InvalidArgumentException e) {
+    } catch (ResourceNotFoundException | InvalidArgumentException e) {
       // aws errors
       LOG.error("Encountered AWS error while attempting to fetch records", e);
       return handleException(kinesisStartCheckpoint, recordList);
-    }
-    catch (KinesisException e) {
+    } catch (KinesisException e) {
       LOG.warn("Encountered unknown unrecoverable AWS exception", e);
       throw new RuntimeException(e);
-    }
-    catch (Throwable e) {
+    } catch (Throwable e) {
       // non transient errors
       LOG.error("Unknown fetchRecords exception", e);
       throw new RuntimeException(e);
@@ -162,11 +150,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   }
 
   private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
-    if(recordList.size() > 0){
+    if (recordList.size() > 0) {
       String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
       return new KinesisFetchResult(kinesisCheckpoint, recordList);
-    }else{
+    } else {
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
       return new KinesisFetchResult(kinesisCheckpoint, recordList);
     }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index acac1fb..9bb4d0c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
@@ -38,7 +37,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata);
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(),
+        currentPartitionGroupsMetadata);
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 39561f3..8da3d2e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.ArrayList;
 import java.util.List;
-import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 626c8ea..f96533f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -22,12 +22,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
@@ -56,7 +52,8 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
         //Return existing shard metadata
         _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
       } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
-        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
+        KinesisShardMetadata kinesisShardMetadata =
+            (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
         if (isProcessingFinished(kinesisShardMetadata)) {
           //Add child shards for processing since parent has finished
           appendShardMetadata(stream, awsRegion, shard);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index ed51f8f..04bf4e6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.List;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 1d753c3..e24121b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -20,10 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
-import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
 
 //TODO: Implement shardId as Array and have unique id
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
@@ -48,13 +45,13 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public KinesisCheckpoint getEndCheckpoint() {
-    return _endCheckpoint;
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
   }
 
   @Override
-  public void setStartCheckpoint(Checkpoint startCheckpoint) {
-    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
+  public KinesisCheckpoint getEndCheckpoint() {
+    return _endCheckpoint;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index 6f660f7..ce9de2d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -20,9 +20,6 @@ package org.apache.pinot.plugin.stream.kinesis; /**
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
@@ -40,20 +37,25 @@ public class KinesisConsumerTest {
 
     List<Shard> shardList = kinesisConnectionHandler.getShards();
 
-    for(Shard shard : shardList) {
+    for (Shard shard : shardList) {
       System.out.println("SHARD: " + shard.shardId());
 
-      KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
-
+      KinesisConsumer kinesisConsumer =
+          new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
+      System.out.println(
+          "KinesisCheckpoint: start - " + shard.sequenceNumberRange().startingSequenceNumber() + "end - " + shard
+              .sequenceNumberRange().endingSequenceNumber());
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
       KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
 
       KinesisRecordsBatch list = fetchResult.getMessages();
       int n = list.getMessageCount();
 
-      for (int i=0;i<n;i++) {
+      System.out.println("Found " + n + " messages ");
+      for (int i = 0; i < n; i++) {
         System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
       }
+      kinesisConsumer.close();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 20/23: Handle timeout exception in consumer and make shard iterator type configurable

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 203aa08511cbbda76a0cb76e57d39f60aa513da0
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 24 17:48:04 2020 +0530

    Handle timeout exception in consumer and make shard iterator type configurable
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  1 -
 .../pinot/plugin/stream/kinesis/KinesisConfig.java |  8 +++++
 .../stream/kinesis/KinesisConnectionHandler.java   |  1 +
 .../plugin/stream/kinesis/KinesisConsumer.java     | 36 +++++++++-------------
 .../stream/kinesis/KinesisShardMetadata.java       |  2 +-
 .../plugin/stream/kinesis/KinesisConsumerTest.java |  8 +++--
 6 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 8de95e2..027b789 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,7 +20,6 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 
-
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index a81d11f..82fc438 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConfig {
@@ -28,9 +29,11 @@ public class KinesisConfig {
   public static final String STREAM = "stream";
   private static final String AWS_REGION = "aws-region";
   private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+  public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
 
   private static final String DEFAULT_AWS_REGION = "us-central-1";
   private static final String DEFAULT_MAX_RECORDS = "20";
+  private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST";
 
   public KinesisConfig(StreamConfig streamConfig) {
     _props = streamConfig.getStreamConfigsMap();
@@ -51,4 +54,9 @@ public class KinesisConfig {
   public Integer maxRecordsToFetch(){
     return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
   }
+
+  public ShardIteratorType getShardIteratorType(){
+    return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
+  }
+
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 3607787..0cf4787 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -71,4 +71,5 @@ public class KinesisConnectionHandler {
       _kinesisClient = null;
     }
   }
+
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 3263f87..abbc753 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -51,6 +51,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   Integer _maxRecords;
   String _shardId;
   ExecutorService _executorService;
+  ShardIteratorType _shardIteratorType;
   private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
@@ -59,22 +60,23 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
+    _shardIteratorType = kinesisConfig.getShardIteratorType();
     _executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
-    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end));
+    List<Record> recordList = new ArrayList<>();
+    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList));
 
     try {
       return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch(Exception e){
-      return null;
+        return handleException((KinesisCheckpoint) start, recordList);
     }
   }
 
-  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
-    List<Record> recordList = new ArrayList<>();
+  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end, List<Record> recordList) {
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
     try {
@@ -83,7 +85,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
         createConnection();
       }
 
-      String shardIterator = getShardIterator(kinesisStartCheckpoint);
+      String shardIterator = getShardIterator(kinesisStartCheckpoint.getSequenceNumber());
 
       String kinesisEndSequenceNumber = null;
 
@@ -162,25 +164,15 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     }
   }
 
-  private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
-    if (kinesisStartCheckpoint.getSequenceNumber() != null) {
-      return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber());
-    } else {
-      return getShardIterator(ShardIteratorType.LATEST, null);
-    }
-  }
+  public String getShardIterator(String sequenceNumber) {
 
-  public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){
-    if(sequenceNumber == null){
-      return _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
-              .shardIteratorType(shardIteratorType).build()).shardIterator();
-    }else{
-      return _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
-              .shardIteratorType(shardIteratorType)
-              .startingSequenceNumber(sequenceNumber).build()).shardIterator();
+    GetShardIteratorRequest.Builder requestBuilder =
+        GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(_shardIteratorType);
+
+    if (sequenceNumber != null) {
+      requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
     }
+    return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 327e034..1d753c3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -25,7 +25,7 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-//TODO: Implement shardId as Array
+//TODO: Implement shardId as Array and have unique id
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
   String _shardId;
   KinesisCheckpoint _startCheckpoint;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index f8a0551..17691c4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.plugin.stream.kinesis; /**
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
@@ -29,7 +31,8 @@ public class KinesisConsumerTest {
     Map<String, String> props = new HashMap<>();
     props.put("stream", "kinesis-test");
     props.put("aws-region", "us-west-2");
-    props.put("maxRecords", "10");
+    props.put("max-records-to-fetch", "2000");
+    props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER");
 
     KinesisConfig kinesisConfig = new KinesisConfig(props);
 
@@ -38,6 +41,8 @@ public class KinesisConsumerTest {
     List<Shard> shardList = kinesisConnectionHandler.getShards();
 
     for(Shard shard : shardList) {
+      System.out.println("SHARD: " + shard.shardId());
+
       KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
 
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
@@ -45,7 +50,6 @@ public class KinesisConsumerTest {
 
       List<Record> list = fetchResult.getMessages();
 
-      System.out.println("SHARD: " + shard.shardId());
       for (Record record : list) {
         System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 15/23: Refactor code

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 00db7a285a2e49e1364cd39a338bc115b50ce250
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 23:49:27 2020 +0530

    Refactor code
---
 .../pinot/plugin/stream/kinesis/KinesisConnectionHandler.java       | 6 ++++++
 .../org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java     | 1 -
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index c41598e..ba94b0a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -59,4 +59,10 @@ public class KinesisConnectionHandler {
         _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
     return listShardsResponse.shards();
   }
+
+  public void close(){
+    if(_kinesisClient != null) {
+      _kinesisClient.close();
+    }
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index dfd6cda..24810ba 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -46,7 +46,6 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-//TODO: Handle exceptions and timeout
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
   String _stream;
   Integer _maxRecords;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 18/23: Change shard metadata logic

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit dbdcaf0d95c8c55d6ecd05eeb6068975cfb8da64
Author: KKcorps <kh...@gmail.com>
AuthorDate: Tue Dec 22 20:42:05 2020 +0530

    Change shard metadata logic
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  2 +-
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  2 +-
 .../stream/kinesis/KinesisConsumerFactory.java     |  2 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  2 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 55 +++++++++++++++++++---
 .../stream/kinesis/KinesisShardMetadata.java       | 16 +++----
 6 files changed, 60 insertions(+), 19 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 1abc536..0c9ae0b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -35,7 +35,7 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
-    <aws.version>2.13.46</aws.version>
+    <aws.version>2.15.50</aws.version>
   </properties>
 
   <dependencies>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 450173c..8de95e2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -38,7 +38,7 @@ public class KinesisCheckpoint implements Checkpoint {
   }
 
   @Override
-  public Checkpoint deserialize(byte[] blob) {
+  public KinesisCheckpoint deserialize(byte[] blob) {
     //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index da39aab..acac1fb 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -38,7 +38,7 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion());
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata);
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 52dab66..aedcd5d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -35,7 +35,7 @@ public class KinesisFetchResult implements FetchResult<Record> {
   }
 
   @Override
-  public Checkpoint getLastCheckpoint() {
+  public KinesisCheckpoint getLastCheckpoint() {
     return _kinesisCheckpoint;
   }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 9a34004..d77579e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -19,7 +19,11 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -30,19 +34,56 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
-  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) {
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
+      PartitionGroupMetadataMap partitionGroupMetadataMap) {
+    //TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
+    //Return metadata only for shards in current metadata
     super(stream, awsRegion);
+    KinesisPartitionGroupMetadataMap currentPartitionMeta =
+        (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap;
+    List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
+
     List<Shard> shardList = getShards();
+
+    Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>();
+    for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
+      KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
+      metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
+    }
+
     for (Shard shard : shardList) {
-      String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
-      String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
-      KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
-      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
-      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
-      _stringPartitionGroupMetadataIndex.add(shardMetadata);
+      if (metadataMap.containsKey(shard.shardId())) {
+        //Return existing shard metadata
+        _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId()));
+      } else if (metadataMap.containsKey(shard.parentShardId())) {
+        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId());
+        if (isProcessingFinished(kinesisShardMetadata)) {
+          //Add child shards for processing since parent has finished
+          appendShardMetadata(stream, awsRegion, shard);
+        } else {
+          //Do not process this shard unless the parent shard is finished or expired
+        }
+      } else {
+        //This is a new shard with no parents. We can start processing this shard.
+        appendShardMetadata(stream, awsRegion, shard);
+      }
     }
   }
 
+  private boolean isProcessingFinished(KinesisShardMetadata kinesisShardMetadata) {
+    return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null && kinesisShardMetadata
+        .getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber());
+  }
+
+  private void appendShardMetadata(String stream, String awsRegion, Shard shard) {
+    String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
+    String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+    KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
+    shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
+    shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+    _stringPartitionGroupMetadataIndex.add(shardMetadata);
+  }
+
   @Override
   public List<PartitionGroupMetadata> getMetadataList() {
     return _stringPartitionGroupMetadataIndex;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 8141cd4..327e034 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -25,11 +25,11 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-
+//TODO: Implement shardId as Array
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
   String _shardId;
-  Checkpoint _startCheckpoint;
-  Checkpoint _endCheckpoint;
+  KinesisCheckpoint _startCheckpoint;
+  KinesisCheckpoint _endCheckpoint;
 
   public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
     super(streamName, awsRegion);
@@ -43,23 +43,23 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public Checkpoint getStartCheckpoint() {
+  public KinesisCheckpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 
   @Override
-  public Checkpoint getEndCheckpoint() {
+  public KinesisCheckpoint getEndCheckpoint() {
     return _endCheckpoint;
   }
 
   @Override
   public void setStartCheckpoint(Checkpoint startCheckpoint) {
-    _startCheckpoint = startCheckpoint;
+    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
   }
 
   @Override
   public void setEndCheckpoint(Checkpoint endCheckpoint) {
-    _endCheckpoint = endCheckpoint;
+    _endCheckpoint = (KinesisCheckpoint) endCheckpoint;
   }
 
   @Override
@@ -68,7 +68,7 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public PartitionGroupMetadata deserialize(byte[] blob) {
+  public KinesisShardMetadata deserialize(byte[] blob) {
     return null;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 19/23: Add test code for kinesis

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d922695383d45519067ae8cebcaa9553568ffee8
Author: KKcorps <kh...@gmail.com>
AuthorDate: Tue Dec 22 22:05:02 2020 +0530

    Add test code for kinesis
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 17 +++++--
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 16 +++----
 .../plugin/stream/kinesis/KinesisConsumerTest.java | 54 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index d2e8715..a81d11f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
 
 
 public class KinesisConfig {
-  private final StreamConfig _streamConfig;
+  private final Map<String, String> _props;
+
+  public static final String STREAM = "stream";
   private static final String AWS_REGION = "aws-region";
   private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
 
@@ -30,18 +33,22 @@ public class KinesisConfig {
   private static final String DEFAULT_MAX_RECORDS = "20";
 
   public KinesisConfig(StreamConfig streamConfig) {
-    _streamConfig = streamConfig;
+    _props = streamConfig.getStreamConfigsMap();
+  }
+
+  public KinesisConfig(Map<String, String> props) {
+    _props = props;
   }
 
   public String getStream(){
-    return _streamConfig.getTopicName();
+    return _props.get(STREAM);
   }
 
   public String getAwsRegion(){
-    return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
+    return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
   }
 
   public Integer maxRecordsToFetch(){
-    return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+    return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index d77579e..626c8ea 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -35,28 +35,28 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
-      PartitionGroupMetadataMap partitionGroupMetadataMap) {
+      PartitionGroupMetadataMap currentPartitionGroupMetadataMap) {
     //TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
     //Return metadata only for shards in current metadata
     super(stream, awsRegion);
     KinesisPartitionGroupMetadataMap currentPartitionMeta =
-        (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap;
+        (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap;
     List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
 
     List<Shard> shardList = getShards();
 
-    Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>();
+    Map<String, PartitionGroupMetadata> currentMetadataMap = new HashMap<>();
     for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
       KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
-      metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
+      currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
     }
 
     for (Shard shard : shardList) {
-      if (metadataMap.containsKey(shard.shardId())) {
+      if (currentMetadataMap.containsKey(shard.shardId())) {
         //Return existing shard metadata
-        _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId()));
-      } else if (metadataMap.containsKey(shard.parentShardId())) {
-        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId());
+        _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
+      } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
+        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
         if (isProcessingFinished(kinesisShardMetadata)) {
           //Add child shards for processing since parent has finished
           appendShardMetadata(stream, awsRegion, shard);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
new file mode 100644
index 0000000..f8a0551
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -0,0 +1,54 @@
+package org.apache.pinot.plugin.stream.kinesis; /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisConsumerTest {
+  public static void main(String[] args) {
+    Map<String, String> props = new HashMap<>();
+    props.put("stream", "kinesis-test");
+    props.put("aws-region", "us-west-2");
+    props.put("maxRecords", "10");
+
+    KinesisConfig kinesisConfig = new KinesisConfig(props);
+
+    KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2");
+
+    List<Shard> shardList = kinesisConnectionHandler.getShards();
+
+    for(Shard shard : shardList) {
+      KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
+
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
+      KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L);
+
+      List<Record> list = fetchResult.getMessages();
+
+      System.out.println("SHARD: " + shard.shardId());
+      for (Record record : list) {
+        System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String());
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 03/23: Add PartitionGroupMetdataMap interface

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1a8a80ac01836012419feaab45853b670d953afa
Author: KKcorps <kh...@gmail.com>
AuthorDate: Fri Dec 11 13:56:52 2020 +0530

    Add PartitionGroupMetdataMap interface
---
 .../src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java  | 7 +++++--
 .../org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java  | 4 ++++
 .../org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java    | 2 +-
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
index b490835..78ae5ef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -1,7 +1,10 @@
 package org.apache.pinot.spi.stream.v2;
 
-public interface FetchResult {
+import java.util.List;
+
+
+public interface FetchResult<T> {
   Checkpoint getLastCheckpoint();
-  byte[] getMessages();
+  List<T> getMessages();
 }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
new file mode 100644
index 0000000..3c344bc
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
@@ -0,0 +1,4 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface PartitionGroupMetadataMap {
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
index bd3017d..eb7f76e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
@@ -8,7 +8,7 @@ public interface StreamConsumerFactoryV2 {
   void init(StreamConfig streamConfig);
 
   // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
-  Map<Long, PartitionGroupMetadata> getPartitionGroupsMetadata(Map<Long, PartitionGroupMetadata> currentPartitionGroupsMetadata);
+  PartitionGroupMetadataMap getPartitionGroupsMetadata(PartitionGroupMetadataMap currentPartitionGroupsMetadata);
 
   // creates a name generator which generates segment name for a partition group
   SegmentNameGenerator getSegmentNameGenerator();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 09/23: Reformat code

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2463bd1f68591cdc81ec6ee14d89d066bd3f5f97
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 01:27:05 2020 +0530

    Reformat code
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  3 +--
 .../stream/kinesis/KinesisConnectionHandler.java   | 14 ++++++-----
 .../plugin/stream/kinesis/KinesisConsumer.java     | 28 ++++++++++++----------
 .../stream/kinesis/KinesisConsumerFactory.java     |  3 ++-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  6 ++---
 .../kinesis/KinesisPartitionGroupMetadataMap.java  |  5 ++--
 6 files changed, 32 insertions(+), 27 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index aa80b17..89043ea 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -6,7 +6,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
 
-  public KinesisCheckpoint(String sequenceNumber){
+  public KinesisCheckpoint(String sequenceNumber) {
     _sequenceNumber = sequenceNumber;
   }
 
@@ -24,5 +24,4 @@ public class KinesisCheckpoint implements Checkpoint {
     //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index d8888fa..554cca6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -24,19 +24,21 @@ public class KinesisConnectionHandler {
   private String _awsRegion;
   KinesisClient _kinesisClient;
 
-  public KinesisConnectionHandler(){
+  public KinesisConnectionHandler() {
 
   }
 
-  public KinesisConnectionHandler(String stream, String awsRegion){
+  public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
     _awsRegion = awsRegion;
-    _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
+    _kinesisClient =
+        KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+            .build();
   }
 
-  public List<Shard> getShards(){
-    ListShardsResponse listShardsResponse =  _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
+  public List<Shard> getShards() {
+    ListShardsResponse listShardsResponse =
+        _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
     return listShardsResponse.shards();
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index d896d67..1181d14 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -39,7 +39,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
     String kinesisEndSequenceNumber = null;
 
-    if(end != null) {
+    if (end != null) {
       KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
       kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
     }
@@ -47,32 +47,34 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     String nextStartSequenceNumber = null;
     Long startTimestamp = System.currentTimeMillis();
 
-    while(shardIterator != null && !isTimedOut(startTimestamp, timeout)){
+    while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
       GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
       GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
-      if(getRecordsResponse.records().size() > 0){
+      if (getRecordsResponse.records().size() > 0) {
         recordList.addAll(getRecordsResponse.records());
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
 
-        if(kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ){
+        if (kinesisEndSequenceNumber != null
+            && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
           nextStartSequenceNumber = kinesisEndSequenceNumber;
           break;
         }
 
-        if(recordList.size() >= _maxRecords) break;
+        if (recordList.size() >= _maxRecords) {
+          break;
+        }
       }
 
       shardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    if(nextStartSequenceNumber == null && recordList.size() > 0){
+    if (nextStartSequenceNumber == null && recordList.size() > 0) {
       nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
     }
 
     KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint,
-        recordList);
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
     return kinesisFetchResult;
   }
@@ -80,14 +82,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
     GetShardIteratorResponse getShardIteratorResponse;
 
-    if(kinesisStartCheckpoint.getSequenceNumber() != null) {
+    if (kinesisStartCheckpoint.getSequenceNumber() != null) {
       String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+              .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
               .startingSequenceNumber(kinesisStartSequenceNumber).build());
-    } else{
+    } else {
       getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build());
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+              .shardIteratorType(ShardIteratorType.LATEST).build());
     }
 
     return getShardIteratorResponse.shardIterator();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 0608118..5e06a01 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -21,7 +21,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(),
+        _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global"));
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 2996b28..2801a09 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -11,9 +11,9 @@ public class KinesisFetchResult implements FetchResult<Record> {
   private final KinesisCheckpoint _kinesisCheckpoint;
   private final List<Record> _recordList;
 
-  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList){
-     _kinesisCheckpoint = kinesisCheckpoint;
-     _recordList = recordList;
+  public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList) {
+    _kinesisCheckpoint = kinesisCheckpoint;
+    _recordList = recordList;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 700ec3f..05d95de 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -12,10 +12,10 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
   private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
-  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) {
     super(stream, awsRegion);
     List<Shard> shardList = getShards();
-    for(Shard shard : shardList){
+    for (Shard shard : shardList) {
       String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
@@ -34,5 +34,4 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
   public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
     return _stringPartitionGroupMetadataIndex.get(index);
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 04/23: Add kinesis code to handle offsets

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 76cfcf1816d3d36974841081346edb8c95bf73fc
Author: KKcorps <kh...@gmail.com>
AuthorDate: Fri Dec 11 13:57:25 2020 +0530

    Add kinesis code to handle offsets
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 13 ++++---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 42 +++++++++++++++++++---
 .../stream/kinesis/KinesisConsumerFactory.java     | 36 +++++++++++++++++++
 .../plugin/stream/kinesis/KinesisFetchResult.java  | 11 +++---
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 31 ++++++++++++++++
 .../stream/kinesis/KinesisShardMetadata.java       |  5 ++-
 6 files changed, 121 insertions(+), 17 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index a330e78..77f790b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -1,23 +1,22 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 
 
 public class KinesisCheckpoint implements Checkpoint {
-  String _shardIterator;
+  String _sequenceNumber;
 
-  public KinesisCheckpoint(String shardIterator){
-    _shardIterator = shardIterator;
+  public KinesisCheckpoint(String sequenceNumber){
+    _sequenceNumber = sequenceNumber;
   }
 
-  public String getShardIterator() {
-    return _shardIterator;
+  public String getSequenceNumber() {
+    return _sequenceNumber;
   }
 
   @Override
   public byte[] serialize() {
-    return _shardIterator.getBytes();
+    return _sequenceNumber.getBytes();
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 251d831..dc44079 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -1,19 +1,26 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+  String _stream;
 
   //TODO: Fetch AWS region from  Stream Config.
-  public KinesisConsumer(String awsRegion) {
+  public KinesisConsumer(String stream, String awsRegion) {
     super(awsRegion);
+    _stream = stream;
   }
 
   @Override
@@ -21,18 +28,43 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
     KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
 
-    String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator();
+    String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
+    String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
 
-    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType(
+        ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build());
+
+    String shardIterator = getShardIteratorResponse.shardIterator();
+    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
     GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
     String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
 
+    //TODO: Get records in the loop and stop when end sequence number is reached or there is an exception.
     if(!getRecordsResponse.hasRecords()){
-      return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList());
+      return new KinesisFetchResult(kinesisStartSequenceNumber, Collections.emptyList());
+    }
+
+    List<Record> recordList = new ArrayList<>();
+    recordList.addAll(getRecordsResponse.records());
+
+    String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+    while(kinesisNextShardIterator != null){
+      getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build();
+      getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+      if(getRecordsResponse.hasRecords()){
+        recordList.addAll(getRecordsResponse.records());
+        nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+      }
+
+      if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ) {
+        nextStartSequenceNumber = kinesisEndSequenceNumber;
+        break;
+      }
+      kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator,
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(nextStartSequenceNumber,
         getRecordsResponse.records());
 
     return kinesisFetchResult;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
new file mode 100644
index 0000000..6bd1e3a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -0,0 +1,36 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import org.apache.pinot.spi.stream.v2.SegmentNameGenerator;
+import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
+
+
+public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
+  private StreamConfig _streamConfig;
+  private final String AWS_REGION = "aws-region";
+
+  @Override
+  public void init(StreamConfig streamConfig) {
+    _streamConfig = streamConfig;
+  }
+
+  @Override
+  public PartitionGroupMetadataMap getPartitionGroupsMetadata(
+      PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
+    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+  }
+
+  @Override
+  public SegmentNameGenerator getSegmentNameGenerator() {
+    return null;
+  }
+
+  @Override
+  public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
+    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 5ef4e30..dc8e764 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -1,16 +1,19 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 
-public class KinesisFetchResult implements FetchResult {
-  private String _nextShardIterator;
+public class KinesisFetchResult implements FetchResult<Record> {
+  private final String _nextShardIterator;
+  private final List<Record> _recordList;
 
   public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
      _nextShardIterator = nextShardIterator;
+     _recordList = recordList;
   }
 
   @Override
@@ -19,7 +22,7 @@ public class KinesisFetchResult implements FetchResult {
   }
 
   @Override
-  public byte[] getMessages() {
-    return new byte[0];
+  public List<Record> getMessages() {
+    return _recordList;
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
new file mode 100644
index 0000000..bc3fef2
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -0,0 +1,31 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
+  private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new HashMap<>();
+
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+    super(awsRegion);
+    ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
+    List<Shard> shardList = listShardsResponse.shards();
+    for(Shard shard : shardList){
+      String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+      KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
+      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+      _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+    }
+  }
+
+  public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
+      return _stringPartitionGroupMetadataMap;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 07ede73..d50d821 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
@@ -11,8 +12,10 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   Checkpoint _endCheckpoint;
 
   public KinesisShardMetadata(String shardId, String streamName) {
-    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType(
+        ShardIteratorType.LATEST).streamName(streamName).build());
     _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+    _endCheckpoint = null;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 17/23: Refactor: get shard iterator methods

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 47b166408d40bcbe56e95f5d864113c2d7ee4bfe
Author: KKcorps <kh...@gmail.com>
AuthorDate: Mon Dec 21 14:25:25 2020 +0530

    Refactor: get shard iterator methods
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index fd48a92..3263f87 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -163,21 +163,24 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   }
 
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
-    GetShardIteratorResponse getShardIteratorResponse;
-
     if (kinesisStartCheckpoint.getSequenceNumber() != null) {
-      String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
-              .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-              .startingSequenceNumber(kinesisStartSequenceNumber).build());
+      return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber());
     } else {
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
-              .shardIteratorType(ShardIteratorType.LATEST).build());
+      return getShardIterator(ShardIteratorType.LATEST, null);
     }
+  }
 
-    return getShardIteratorResponse.shardIterator();
+  public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){
+    if(sequenceNumber == null){
+      return _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+              .shardIteratorType(shardIteratorType).build()).shardIterator();
+    }else{
+      return _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+              .shardIteratorType(shardIteratorType)
+              .startingSequenceNumber(sequenceNumber).build()).shardIterator();
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 06/23: Refactor kinesis shard metadata interface and add shardId to the metadata

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c9cd79c8b1c79b788ea88a399ad44bbdc96a8c9a
Author: KKcorps <kh...@gmail.com>
AuthorDate: Fri Dec 11 23:57:29 2020 +0530

    Refactor kinesis shard metadata interface and add shardId to the metadata
---
 .../kinesis/KinesisPartitionGroupMetadataMap.java    | 20 +++++++++++++-------
 .../plugin/stream/kinesis/KinesisShardMetadata.java  |  6 ++++++
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index bc3fef2..87f7235 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -1,8 +1,7 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -11,7 +10,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
-  private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new HashMap<>();
+  private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
     super(awsRegion);
@@ -20,12 +19,19 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i
     for(Shard shard : shardList){
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
-      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
-      _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+      _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
 
-  public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
-      return _stringPartitionGroupMetadataMap;
+  @Override
+  public List<PartitionGroupMetadata> getMetadataList() {
+    return _stringPartitionGroupMetadataIndex;
   }
+
+  @Override
+  public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
+    return _stringPartitionGroupMetadataIndex.get(index);
+  }
+
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index d50d821..4a19285 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -8,6 +8,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
+  String _shardId;
   Checkpoint _startCheckpoint;
   Checkpoint _endCheckpoint;
 
@@ -16,6 +17,11 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
         ShardIteratorType.LATEST).streamName(streamName).build());
     _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
     _endCheckpoint = null;
+    _shardId = shardId;
+  }
+
+  public String getShardId() {
+    return _shardId;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 16/23: Handle closed connections

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit bb8e08fda6da218f57882c348ca61ddc5a53e451
Author: KKcorps <kh...@gmail.com>
AuthorDate: Mon Dec 21 14:21:55 2020 +0530

    Handle closed connections
---
 .../plugin/stream/kinesis/KinesisConnectionHandler.java      | 12 +++++++++---
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  |  8 ++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index ba94b0a..3607787 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -49,9 +49,7 @@ public class KinesisConnectionHandler {
   public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
     _awsRegion = awsRegion;
-    _kinesisClient =
-        KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
-            .build();
+    createConnection();
   }
 
   public List<Shard> getShards() {
@@ -60,9 +58,17 @@ public class KinesisConnectionHandler {
     return listShardsResponse.shards();
   }
 
+  public void createConnection(){
+    if(_kinesisClient == null) {
+      _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+          .build();
+    }
+  }
+
   public void close(){
     if(_kinesisClient != null) {
       _kinesisClient.close();
+      _kinesisClient = null;
     }
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 24810ba..fd48a92 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -79,6 +79,10 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
     try {
 
+      if(_kinesisClient == null){
+        createConnection();
+      }
+
       String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
       String kinesisEndSequenceNumber = null;
@@ -176,4 +180,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     return getShardIteratorResponse.shardIterator();
   }
 
+  @Override
+  public void close() {
+    super.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 21/23: Add isEndOfPartition check in checkpoints

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8d0248a03bbb3c4d42cd390e7c3012a763e9786b
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 24 17:58:40 2020 +0530

    Add isEndOfPartition check in checkpoints
---
 .../pinot/plugin/stream/kinesis/KinesisCheckpoint.java       | 12 +++++++++++-
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  | 10 +++++++++-
 .../main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 027b789..54e26d0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,11 +22,22 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
+  Boolean _isEndOfPartition = false;
 
   public KinesisCheckpoint(String sequenceNumber) {
     _sequenceNumber = sequenceNumber;
   }
 
+  public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) {
+    _sequenceNumber = sequenceNumber;
+    _isEndOfPartition = isEndOfPartition;
+  }
+
+  @Override
+  public boolean isEndOfPartition() {
+    return _isEndOfPartition;
+  }
+
   public String getSequenceNumber() {
     return _sequenceNumber;
   }
@@ -38,7 +49,6 @@ public class KinesisCheckpoint implements Checkpoint {
 
   @Override
   public KinesisCheckpoint deserialize(byte[] blob) {
-    //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index abbc753..336468a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -95,6 +95,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       }
 
       String nextStartSequenceNumber = null;
+      boolean isEndOfShard = false;
 
       while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
@@ -114,14 +115,21 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
           }
         }
 
+        if(getRecordsResponse.hasChildShards()){
+          //This statement returns true only when end of current shard has reached.
+          isEndOfShard = true;
+          break;
+        }
+
         shardIterator = getRecordsResponse.nextShardIterator();
+
       }
 
       if (nextStartSequenceNumber == null && recordList.size() > 0) {
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       }
 
-      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard);
       KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
index 030fe4e..0195684 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream.v2;
 
 public interface Checkpoint {
+  boolean isEndOfPartition();
   byte[] serialize();
   Checkpoint deserialize(byte[] blob);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 14/23: Handle exceptions

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b0a2468f27fd6638be796657e30f65bc52fc24ed
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 23:41:06 2020 +0530

    Handle exceptions
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 59 +++++++++++++++++-----
 1 file changed, 47 insertions(+), 12 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 910b9ee..dfd6cda 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -32,12 +32,18 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
 import software.amazon.awssdk.services.kinesis.model.KinesisException;
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 //TODO: Handle exceptions and timeout
@@ -46,6 +52,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   Integer _maxRecords;
   String _shardId;
   ExecutorService _executorService;
+  private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -58,13 +65,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
-    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() {
-      @Override
-      public KinesisFetchResult call()
-          throws Exception {
-        return getResult(start, end);
-      }
-    });
+    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end));
 
     try {
       return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
@@ -74,13 +75,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   }
 
   private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
+    List<Record> recordList = new ArrayList<>();
+    KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+
     try {
-      KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
       String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
-      List<Record> recordList = new ArrayList<>();
-
       String kinesisEndSequenceNumber = null;
 
       if (end != null) {
@@ -119,8 +120,42 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
-    }catch (KinesisException e){
-      return null;
+    }catch (ProvisionedThroughputExceededException e) {
+      LOG.warn(
+          "The request rate for the stream is too high"
+      , e);
+      return handleException(kinesisStartCheckpoint, recordList);
+    }
+    catch (ExpiredIteratorException e) {
+      LOG.warn(
+          "ShardIterator expired while trying to fetch records",e
+      );
+      return handleException(kinesisStartCheckpoint, recordList);
+    }
+    catch (ResourceNotFoundException | InvalidArgumentException e) {
+      // aws errors
+      LOG.error("Encountered AWS error while attempting to fetch records", e);
+      return handleException(kinesisStartCheckpoint, recordList);
+    }
+    catch (KinesisException e) {
+      LOG.warn("Encountered unknown unrecoverable AWS exception", e);
+      throw new RuntimeException(e);
+    }
+    catch (Throwable e) {
+      // non transient errors
+      LOG.error("Unknown fetchRecords exception", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
+    if(recordList.size() > 0){
+      String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+      return new KinesisFetchResult(kinesisCheckpoint, recordList);
+    }else{
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
+      return new KinesisFetchResult(kinesisCheckpoint, recordList);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 13/23: Add license header

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 422688d3bc5839bcf4ce925ff55fc8ef81d69d32
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:45:11 2020 +0530

    Add license header
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java     | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 01d666a..d2e8715 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.StreamConfig;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 05/23: Refactor PartitionGroupMetadataMap interface

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 56efa08d6735a2cb1f205e337c79c90bb1f1c13c
Author: KKcorps <kh...@gmail.com>
AuthorDate: Fri Dec 11 23:56:08 2020 +0530

    Refactor PartitionGroupMetadataMap interface
---
 .../org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
index 3c344bc..702f08a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
@@ -1,4 +1,12 @@
 package org.apache.pinot.spi.stream.v2;
 
+import java.util.List;
+
+
 public interface PartitionGroupMetadataMap {
+
+  List<PartitionGroupMetadata> getMetadataList();
+
+  PartitionGroupMetadata getPartitionGroupMetadata(int index);
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org