You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:42:55 UTC
[incubator-pinot] 05/47: Separate PartitionGroupInfo and
PartitionGroupMetadata
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a7fba5a7ffc843ea576d23e330cd2fd8441ee5fb
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 14:46:46 2020 -0800
Separate PartitionGroupInfo and PartitionGroupMetadata
---
.../helix/core/PinotHelixResourceManager.java | 12 +--
.../realtime/PinotLLCRealtimeSegmentManager.java | 108 ++++++++-------------
.../impl/fakestream/FakeStreamConsumerFactory.java | 3 +-
.../kafka09/KafkaPartitionLevelConsumerTest.java | 2 +-
.../kafka20/KafkaStreamMetadataProvider.java | 28 ++++--
.../pinot/spi/stream/PartitionGroupInfo.java | 43 ++++++++
.../pinot/spi/stream/StreamMetadataProvider.java | 6 +-
7 files changed, 115 insertions(+), 87 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f36e4f..f0d52bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -126,12 +126,7 @@ import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -1361,9 +1356,8 @@ public class PinotHelixResourceManager {
idealState = PinotTableIdealStateBuilder
.buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
_enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
- LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
- _pinotLLCRealtimeSegmentManager.setupNewShardedTable(rawRealtimeTableConfig, idealState);
+ _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
+ LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
} else {
if (streamConfig.hasHighLevelConsumerType()) {
@@ -1391,7 +1385,7 @@ public class PinotHelixResourceManager {
idealState = PinotTableIdealStateBuilder
.buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
_enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+ _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
} else {
LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9b03fa4..528125b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -78,6 +79,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
@@ -221,8 +223,14 @@ public class PinotLLCRealtimeSegmentManager {
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
.createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
+
+ List<PartitionGroupInfo> newPartitionGroupMetadataList;
+ try {
+ newPartitionGroupMetadataList =
+ streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
+ } catch (TimeoutException e) {
+ throw new IllegalStateException(e);
+ }
int numPartitionGroups = newPartitionGroupMetadataList.size();
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
@@ -234,8 +242,8 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
- for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
- String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata.getPartitionGroupId(),
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+ String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
@@ -277,50 +285,6 @@ public class PinotLLCRealtimeSegmentManager {
}
/**
- * Sets up the initial segments for a new LLC real-time table.
- * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC and LLC are configured.
- */
- public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
- Preconditions.checkState(!_isStopping, "Segment manager is stopping");
-
- String realtimeTableName = tableConfig.getTableName();
- LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
-
- // Make sure all the existing segments are HLC segments
- List<String> currentSegments = getAllSegments(realtimeTableName);
- for (String segmentName : currentSegments) {
- // TODO: Should return 4xx HTTP status code. Currently all exceptions are returning 500
- Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(segmentName),
- "Cannot set up new LLC table: %s with existing non-HLC segment: %s", realtimeTableName, segmentName);
- }
-
- _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
-
- PartitionLevelStreamConfig streamConfig =
- new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
- InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
- List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
- int numPartitionGroups = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
- int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-
- SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
- Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
-
- long currentTimeMs = getCurrentTimeMs();
- Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
- for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups; partitionGroupId++) {
- String segmentName =
- setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups,
- numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
- instancePartitionsMap);
- }
-
- setIdealState(realtimeTableName, idealState);
- }
-
- /**
* Removes all LLC segments from the given IdealState.
*/
public void removeLLCSegments(IdealState idealState) {
@@ -538,15 +502,23 @@ public class PinotLLCRealtimeSegmentManager {
// Step-2
// Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up
+
// get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
- StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
.createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
+
// find new partition groups [A],[B],[C],[D]
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
+ List<PartitionGroupInfo> newPartitionGroupMetadataList;
+ try {
+ newPartitionGroupMetadataList =
+ streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
+ } catch (TimeoutException e) {
+ throw new IllegalStateException(e);
+ }
// create new segment metadata, only if it is not IN_PROGRESS in the current state
Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
@@ -555,25 +527,24 @@ public class PinotLLCRealtimeSegmentManager {
List<String> newConsumingSegmentNames = new ArrayList<>();
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
- for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
- int newPartitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+ int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
- if (currentPartitionGroupMetadata == null) { // not present in current state
+ if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
// make new segment
- LLCSegmentName newLLCSegmentName =
- new LLCSegmentName(rawTableName, newPartitionGroupId, STARTING_SEQUENCE_NUMBER, newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
- IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
- newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+ String newLLCSegmentName =
+ setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs,
+ instancePartitions, numPartitions, numReplicas);
+ newConsumingSegmentNames.add(newLLCSegmentName);
} else {
String currentStatus = currentPartitionGroupMetadata.getStatus();
- if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { // not IN_PROGRESS anymore in current state
- // make new segment
+ if (!currentStatus.equals(Status.IN_PROGRESS.toString())) {
+ // not IN_PROGRESS anymore in current state. Should be DONE.
+ // This should ONLY happen for the committing segment's partition. Need to trigger new consuming segment
+ // todo: skip this if the partition doesn't match with the committing segment?
LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
- IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
}
@@ -1181,19 +1152,20 @@ public class PinotLLCRealtimeSegmentManager {
* Sets up a new partition.
* <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
*/
- private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionGroupId,
+ private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, PartitionGroupInfo partitionGroupInfo,
long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
+ int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
+ String startCheckpoint = partitionGroupInfo.getStartCheckpoint();
LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
LLCSegmentName newLLCSegmentName =
new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- StreamPartitionMsgOffset startOffset =
- getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionGroupId);
+
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
+ new CommittingSegmentDescriptor(null, startCheckpoint, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 289b226..54be1b6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;
+import java.util.Collections;
import java.util.Set;
import org.apache.pinot.core.util.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -87,7 +88,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
// stream metadata provider
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
- int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(null, 10_000).size();
+ int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10_000).size();
System.out.println(partitionCount);
// Partition metadata provider
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index fbdfdfb..43b72a8 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
KafkaStreamMetadataProvider streamMetadataProvider =
new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
- Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(null, 10000L), 2);
+ Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10000L), 2);
}
@Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 187c61b..eb606f2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -29,6 +29,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -37,12 +38,15 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider {
+ private StreamConfig _streamConfig;
+
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
this(clientId, streamConfig, Integer.MIN_VALUE);
}
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
+ _streamConfig = streamConfig;
}
@Override
@@ -57,14 +61,26 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
* Hence current partition groups are not needed to compute the new partition groups
*/
@Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+ public List<PartitionGroupInfo> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+ throws TimeoutException {
int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
- for (int i = 0; i < partitionCount; i++) {
- partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+ List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
+
+ // add a PartitionGroupInfo into the list foreach partition already present in current.
+ // the end checkpoint is set as checkpoint
+ for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+ currentPartitionGroupMetadata.getEndCheckpoint()));
+ }
+ // add PartitiongroupInfo for new partitions
+ // use offset criteria from stream config
+ for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
+ StreamPartitionMsgOffset streamPartitionMsgOffset =
+ fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
}
- return partitionGroupMetadataList;
+ return newPartitionGroupInfoList;
}
public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
new file mode 100644
index 0000000..438e148
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+public class PartitionGroupInfo {
+
+ // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
+ private final int _partitionGroupId;
+ private String _startCheckpoint;
+
+ public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
+ _partitionGroupId = partitionGroupId;
+ _startCheckpoint = startCheckpoint;
+ }
+
+ public void setStartCheckpoint(String startCheckpoint) {
+ _startCheckpoint = startCheckpoint;
+ }
+
+ public int getPartitionGroupId() {
+ return _partitionGroupId;
+ }
+
+ public String getStartCheckpoint() {
+ return _startCheckpoint;
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 5b9104e..a9cd2d6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
import java.io.Closeable;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -39,8 +40,9 @@ public interface StreamMetadataProvider extends Closeable {
int fetchPartitionCount(long timeoutMillis);
// takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
- List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis);
+ List<PartitionGroupInfo> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+ throws TimeoutException;
// Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org