You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:19 UTC
[incubator-pinot] 02/08: StreamPartitionOffset to implement
Checkpoint
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 652a3e68462fe007b59a3505c44bba3f0f190c3b
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue Dec 29 18:43:48 2020 -0800
StreamPartitionOffset to implement Checkpoint
---
.../common/metadata/segment/RealtimeSegmentZKMetadata.java | 6 ++++++
.../org/apache/pinot/common/utils/CommonConstants.java | 4 ++++
.../impl/fakestream/FakeStreamConsumerFactory.java | 14 ++++++++++++++
.../tests/FlakyConsumerRealtimeClusterIntegrationTest.java | 14 ++++++++++++++
.../pinot/plugin/stream/kafka09/KafkaConsumerFactory.java | 14 ++++++++++++++
.../pinot/plugin/stream/kafka20/KafkaConsumerFactory.java | 14 ++++++++++++++
.../main/java/org/apache/pinot/spi/stream/Checkpoint.java | 6 +++---
.../java/org/apache/pinot/spi/stream/LongMsgOffset.java | 10 ++++++++++
.../apache/pinot/spi/stream/PartitionGroupConsumer.java | 5 ++++-
.../apache/pinot/spi/stream/PartitionGroupMetadata.java | 2 --
.../apache/pinot/spi/stream/StreamPartitionMsgOffset.java | 2 +-
11 files changed, 84 insertions(+), 7 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
index d88be18..c46af53 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
@@ -35,6 +35,7 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
private Status _status = null;
private int _sizeThresholdToFlushSegment = -1;
private String _timeThresholdToFlushSegment = null; // store as period string for readability
+ private String _partitionGroupMetadataStr = null;
public RealtimeSegmentZKMetadata() {
setSegmentType(SegmentType.REALTIME);
@@ -49,6 +50,7 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
if (flushThresholdTime != null && !flushThresholdTime.equals(NULL)) {
_timeThresholdToFlushSegment = znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_TIME);
}
+ _partitionGroupMetadataStr = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_GROUP_METADATA);
}
@Override
@@ -141,4 +143,8 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
public void setTimeThresholdToFlushSegment(String timeThresholdPeriodString) {
_timeThresholdToFlushSegment = timeThresholdPeriodString;
}
+
+ public String getPartitionGroupMetadataStr() {
+ return _partitionGroupMetadataStr;
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 9773e7e..7a91d8c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -379,6 +379,10 @@ public class CommonConstants {
public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
public static final String PARTITION_METADATA = "segment.partition.metadata";
/**
+ * Serialized {@link org.apache.pinot.spi.stream.PartitionGroupMetadata} for this segment
+ */
+ public static final String PARTITION_GROUP_METADATA = "segment.partition.group.metadata";
+ /**
* This field is used for parallel push protection to lock the segment globally.
* We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
* next upload won't be blocked forever.
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index bb01e5c..9669223 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;
+import java.util.List;
import java.util.Set;
import org.apache.pinot.core.util.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -26,6 +27,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -66,6 +69,17 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
return new FakeStreamMetadataProvider(_streamConfig);
}
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+ return null;
+ }
+
+ @Override
+ public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ return null;
+ }
+
public static void main(String[] args)
throws Exception {
String clientId = "client_id_localhost_tester";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index b05244f..808a464 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -19,9 +19,12 @@
package org.apache.pinot.integration.tests;
import java.lang.reflect.Constructor;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -117,5 +120,16 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+ return null;
+ }
+
+ @Override
+ public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ return null;
+ }
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 615e354..b8ed19d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -18,7 +18,10 @@
*/
package org.apache.pinot.plugin.stream.kafka09;
+import java.util.List;
import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -50,4 +53,15 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
return new KafkaStreamMetadataProvider(clientId, _streamConfig);
}
+
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+ return null;
+ }
+
+ @Override
+ public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ return null;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index e0d1015..806baff 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,7 +18,10 @@
*/
package org.apache.pinot.plugin.stream.kafka20;
+import java.util.List;
import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -47,4 +50,15 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
return new KafkaStreamMetadataProvider(clientId, _streamConfig);
}
+
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
+ return null;
+ }
+
+ @Override
+ public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ return null;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
index 627c964..bae8832 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.spi.stream;
-public interface Checkpoint {
- byte[] serialize();
- Checkpoint deserialize(byte[] blob);
+public interface Checkpoint extends Comparable {
+ String serialize();
+ Checkpoint deserialize(String checkpointStr);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
index e5025f6..e8fa275 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/LongMsgOffset.java
@@ -50,4 +50,14 @@ public class LongMsgOffset implements StreamPartitionMsgOffset {
public String toString() {
return Long.toString(_offset);
}
+
+ @Override
+ public String serialize() {
+ return toString();
+ }
+
+ @Override
+ public Checkpoint deserialize(String checkpointStr) {
+ return new LongMsgOffset(checkpointStr);
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index 2f138c2..e096e67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.spi.stream;
-public interface PartitionGroupConsumer {
+import java.io.Closeable;
+
+
+public interface PartitionGroupConsumer extends Closeable {
FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index 779c167..0f44173 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -25,8 +25,6 @@ public interface PartitionGroupMetadata {
int getGroupId();
- List<String> getPartitions();
-
Checkpoint getStartCheckpoint(); // similar to getStartOffset
Checkpoint getEndCheckpoint(); // similar to getEndOffset
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
index 72654bf..06a090e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
@@ -39,7 +39,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
* versions of the stream implementation
*/
@InterfaceStability.Evolving
-public interface StreamPartitionMsgOffset extends Comparable {
+public interface StreamPartitionMsgOffset extends Checkpoint {
/**
* Compare this offset with another one.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org