You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:07 UTC

[incubator-pinot] 01/23: Add interfaces for V2 consumers

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit abc65886a3d09535135f16d21ae388ae95665cb0
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 10 19:08:15 2020 +0530

    Add interfaces for V2 consumers
---
 .../org/apache/pinot/spi/stream/v2/Checkpoint.java    |  6 ++++++
 .../org/apache/pinot/spi/stream/v2/ConsumerV2.java    |  6 ++++++
 .../org/apache/pinot/spi/stream/v2/FetchResult.java   |  7 +++++++
 .../pinot/spi/stream/v2/PartitionGroupMetadata.java   | 16 ++++++++++++++++
 .../pinot/spi/stream/v2/SegmentNameGenerator.java     |  7 +++++++
 .../pinot/spi/stream/v2/StreamConsumerFactoryV2.java  | 19 +++++++++++++++++++
 6 files changed, 61 insertions(+)

diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
new file mode 100644
index 0000000..0856454
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -0,0 +1,6 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface Checkpoint {
+  byte[] serialize();
+  Checkpoint deserialize(byte[] blob);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
new file mode 100644
index 0000000..afc8d38
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
@@ -0,0 +1,6 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface ConsumerV2 {
+  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+}
+
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
new file mode 100644
index 0000000..b490835
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
@@ -0,0 +1,7 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface FetchResult {
+  Checkpoint getLastCheckpoint();
+  byte[] getMessages();
+}
+
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
new file mode 100644
index 0000000..27c5ce7
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
@@ -0,0 +1,16 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface PartitionGroupMetadata {
+  Checkpoint getStartCheckpoint(); // similar to getStartOffset
+
+  Checkpoint getEndCheckpoint(); // similar to getEndOffset
+
+  void setStartCheckpoint(Checkpoint startCheckpoint);
+
+  void setEndCheckpoint(Checkpoint endCheckpoint);
+
+  byte[] serialize();
+
+  PartitionGroupMetadata deserialize(byte[] blob);
+}
+
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
new file mode 100644
index 0000000..689c686
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
@@ -0,0 +1,7 @@
+package org.apache.pinot.spi.stream.v2;
+
+public interface SegmentNameGenerator {
+  // generates a unique name for a partition group based on the metadata
+    String generateSegmentName(PartitionGroupMetadata metadata);
+
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
new file mode 100644
index 0000000..bd3017d
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
@@ -0,0 +1,19 @@
+package org.apache.pinot.spi.stream.v2;
+
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public interface StreamConsumerFactoryV2 {
+  void init(StreamConfig streamConfig);
+
+  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
+  Map<Long, PartitionGroupMetadata> getPartitionGroupsMetadata(Map<Long, PartitionGroupMetadata> currentPartitionGroupsMetadata);
+
+  // creates a name generator which generates segment name for a partition group
+  SegmentNameGenerator getSegmentNameGenerator();
+
+  // creates a consumer which consumes from a partition group
+  ConsumerV2 createConsumer(PartitionGroupMetadata metadata);
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org