You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:19 UTC

[incubator-pinot] 02/08: StreamPartitionOffset to implement Checkpoint

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 652a3e68462fe007b59a3505c44bba3f0f190c3b
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue Dec 29 18:43:48 2020 -0800

    StreamPartitionOffset to implement Checkpoint
---
 .../common/metadata/segment/RealtimeSegmentZKMetadata.java |  6 ++++++
 .../org/apache/pinot/common/utils/CommonConstants.java     |  4 ++++
 .../impl/fakestream/FakeStreamConsumerFactory.java         | 14 ++++++++++++++
 .../tests/FlakyConsumerRealtimeClusterIntegrationTest.java | 14 ++++++++++++++
 .../pinot/plugin/stream/kafka09/KafkaConsumerFactory.java  | 14 ++++++++++++++
 .../pinot/plugin/stream/kafka20/KafkaConsumerFactory.java  | 14 ++++++++++++++
 .../main/java/org/apache/pinot/spi/stream/Checkpoint.java  |  6 +++---
 .../java/org/apache/pinot/spi/stream/LongMsgOffset.java    | 10 ++++++++++
 .../apache/pinot/spi/stream/PartitionGroupConsumer.java    |  5 ++++-
 .../apache/pinot/spi/stream/PartitionGroupMetadata.java    |  2 --
 .../apache/pinot/spi/stream/StreamPartitionMsgOffset.java  |  2 +-
 11 files changed, 84 insertions(+), 7 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
index d88be18..c46af53 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
@@ -35,6 +35,7 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
   private Status _status = null;
   private int _sizeThresholdToFlushSegment = -1;
   private String _timeThresholdToFlushSegment = null; // store as period string for readability
+  private String _partitionGroupMetadataStr = null;
 
   public RealtimeSegmentZKMetadata() {
     setSegmentType(SegmentType.REALTIME);
@@ -49,6 +50,7 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
     if (flushThresholdTime != null && !flushThresholdTime.equals(NULL)) {
       _timeThresholdToFlushSegment = znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_TIME);
     }
+    _partitionGroupMetadataStr = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_GROUP_METADATA);
   }
 
   @Override
@@ -141,4 +143,8 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
   public void setTimeThresholdToFlushSegment(String timeThresholdPeriodString) {
     _timeThresholdToFlushSegment = timeThresholdPeriodString;
   }
+
+  public String getPartitionGroupMetadataStr() {
+    return _partitionGroupMetadataStr;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 9773e7e..7a91d8c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -379,6 +379,10 @@ public class CommonConstants {
     public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
     public static final String PARTITION_METADATA = "segment.partition.metadata";
     /**
+     * Serialized {@link org.apache.pinot.spi.stream.PartitionGroupMetadata} for this segment
+     */
+    public static final String PARTITION_GROUP_METADATA = "segment.partition.group.metadata";
+    /**
      * This field is used for parallel push protection to lock the segment globally.
      * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
      * next upload won't be blocked forever.
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index bb01e5c..9669223 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.realtime.impl.fakestream;
 
+import java.util.List;
 import java.util.Set;
 import org.apache.pinot.core.util.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -26,6 +27,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -66,6 +69,17 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
     return new FakeStreamMetadataProvider(_streamConfig);
   }
 
+  @Override
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+    return null;
+  }
+
+  @Override
+  public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+    return null;
+  }
+
   public static void main(String[] args)
       throws Exception {
     String clientId = "client_id_localhost_tester";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index b05244f..808a464 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -19,9 +19,12 @@
 package org.apache.pinot.integration.tests;
 
 import java.lang.reflect.Constructor;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -117,5 +120,16 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
     public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+        List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+      return null;
+    }
+
+    @Override
+    public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+      return null;
+    }
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 615e354..b8ed19d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.plugin.stream.kafka09;
 
+import java.util.List;
 import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -50,4 +53,15 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig);
   }
+
+  @Override
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+    return null;
+  }
+
+  @Override
+  public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+    return null;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index e0d1015..806baff 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
+import java.util.List;
 import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -47,4 +50,15 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig);
   }
+
+  @Override
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+    return null;
+  }
+
+  @Override
+  public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+    return null;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
index 627c964..bae8832 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
-public interface Checkpoint {
-  byte[] serialize();
-  Checkpoint deserialize(byte[] blob);
+public interface Checkpoint extends Comparable {
+  String serialize();
+  Checkpoint deserialize(String checkpointStr);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
index e5025f6..e8fa275 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
@@ -50,4 +50,14 @@ public class LongMsgOffset implements StreamPartitionMsgOffset {
   public String toString() {
     return Long.toString(_offset);
   }
+
+  @Override
+  public String serialize() {
+    return toString();
+  }
+
+  @Override
+  public Checkpoint deserialize(String checkpointStr) {
+    return new LongMsgOffset(checkpointStr);
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index 2f138c2..e096e67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.spi.stream;
 
-public interface PartitionGroupConsumer {
+import java.io.Closeable;
+
+
+public interface PartitionGroupConsumer extends Closeable {
   FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index 779c167..0f44173 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -25,8 +25,6 @@ public interface PartitionGroupMetadata {
 
   int getGroupId();
 
-  List<String> getPartitions();
-
   Checkpoint getStartCheckpoint(); // similar to getStartOffset
 
   Checkpoint getEndCheckpoint(); // similar to getEndOffset
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
index 72654bf..06a090e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
@@ -39,7 +39,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  * versions of the stream implementation
  */
 @InterfaceStability.Evolving
-public interface StreamPartitionMsgOffset extends Comparable {
+public interface StreamPartitionMsgOffset extends Checkpoint {
 
   /**
    * Compare this offset with another one.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org