You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:21 UTC
[incubator-pinot] 04/08: Controller side code
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b13946066ce3e30375f18ddabd4023c113219eb6
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 12:04:54 2020 -0800
Controller side code
---
.../segment/RealtimeSegmentZKMetadata.java | 6 -
.../helix/core/PinotHelixResourceManager.java | 89 ++++-----
.../helix/core/PinotTableIdealStateBuilder.java | 9 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 201 +++++++++++++--------
.../fakestream/FakePartitionGroupMetadata.java | 48 -----
.../kafka09/KafkaPartitionGroupMetadata.java | 48 -----
.../kafka20/KafkaPartitionGroupMetadata.java | 48 -----
.../pinot/spi/stream/PartitionGroupMetadata.java | 52 +++++-
8 files changed, 207 insertions(+), 294 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
index c46af53..d88be18 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/RealtimeSegmentZKMetadata.java
@@ -35,7 +35,6 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
private Status _status = null;
private int _sizeThresholdToFlushSegment = -1;
private String _timeThresholdToFlushSegment = null; // store as period string for readability
- private String _partitionGroupMetadataStr = null;
public RealtimeSegmentZKMetadata() {
setSegmentType(SegmentType.REALTIME);
@@ -50,7 +49,6 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
if (flushThresholdTime != null && !flushThresholdTime.equals(NULL)) {
_timeThresholdToFlushSegment = znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_TIME);
}
- _partitionGroupMetadataStr = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_GROUP_METADATA);
}
@Override
@@ -143,8 +141,4 @@ public class RealtimeSegmentZKMetadata extends SegmentZKMetadata {
public void setTimeThresholdToFlushSegment(String timeThresholdPeriodString) {
_timeThresholdToFlushSegment = timeThresholdPeriodString;
}
-
- public String getPartitionGroupMetadataStr() {
- return _partitionGroupMetadataStr;
- }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7f36e5a..1ff3f9d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -85,7 +85,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -124,6 +123,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -1337,65 +1337,50 @@ public class PinotHelixResourceManager {
IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
IdealState idealState = getTableIdealState(realtimeTableName);
+
if (streamConfig.isShardedConsumerType()) {
- setupShardedRealtimeTable(streamConfig, idealState, realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber());
- }
+ idealState = PinotTableIdealStateBuilder
+ .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
+ _enableBatchMessageMode);
+ _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+ LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
+ _pinotLLCRealtimeSegmentManager.setupNewShardedTable(rawRealtimeTableConfig, idealState);
+ } else {
- if (streamConfig.hasHighLevelConsumerType()) {
- if (idealState == null) {
- LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
- idealState = PinotTableIdealStateBuilder
- .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
- _propertyStore, _enableBatchMessageMode);
- _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
- } else {
- // Remove LLC segments if it is not configured
- if (!streamConfig.hasLowLevelConsumerType()) {
- _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
+ if (streamConfig.hasHighLevelConsumerType()) {
+ if (idealState == null) {
+ LOGGER.info("Initializing IdealState for HLC table: {}", realtimeTableName);
+ idealState = PinotTableIdealStateBuilder
+ .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, _helixZkManager,
+ _propertyStore, _enableBatchMessageMode);
+ _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
+ } else {
+ // Remove LLC segments if it is not configured
+ if (!streamConfig.hasLowLevelConsumerType()) {
+ _pinotLLCRealtimeSegmentManager.removeLLCSegments(idealState);
+ }
}
+ // For HLC table, property store entry must exist to trigger watchers to create segments
+ ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
}
- // For HLC table, property store entry must exist to trigger watchers to create segments
- ensurePropertyStoreEntryExistsForHighLevelConsumer(realtimeTableName);
- }
-
- // Either we have only low-level consumer, or both.
- if (streamConfig.hasLowLevelConsumerType()) {
- // Will either create idealstate entry, or update the IS entry with new segments
- // (unless there are low-level segments already present)
- if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
- PinotTableIdealStateBuilder
- .buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, realtimeTableName, realtimeTableConfig,
- idealState, _enableBatchMessageMode);
- LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
- } else {
- LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
+
+ // Either we have only low-level consumer, or both.
+ if (streamConfig.hasLowLevelConsumerType()) {
+ // Will either create idealstate entry, or update the IS entry with new segments
+ // (unless there are low-level segments already present)
+ if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
+ idealState = PinotTableIdealStateBuilder
+ .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
+ _enableBatchMessageMode);
+ _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+ LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
+ } else {
+ LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
+ }
}
}
}
- /**
- * Sets up the realtime table ideal state
- * @param streamConfig
- */
- private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) {
- StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
- StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
- .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
-
- // get current partition groups and their metadata - this will be empty when creating the table
- List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState);
-
- // get new partition groups and their metadata,
- // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard)
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 5000);
-
- // setup segment zk metadata and ideal state for all the new found partition groups
- _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
- }
-
-
-
private void ensurePropertyStoreEntryExistsForHighLevelConsumer(String realtimeTableName) {
String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForResource(realtimeTableName);
if (!_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 1e95966..a7b3c9e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -94,9 +94,8 @@ public class PinotTableIdealStateBuilder {
return idealState;
}
- public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
- String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
- boolean enableBatchMessageMode) {
+ public static IdealState buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
+ IdealState idealState, boolean enableBatchMessageMode) {
// Validate replicasPerPartition here.
final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -105,7 +104,7 @@ public class PinotTableIdealStateBuilder {
}
final int nReplicas;
try {
- nReplicas = Integer.valueOf(replicasPerPartitionStr);
+ nReplicas = Integer.parseInt(replicasPerPartitionStr);
} catch (NumberFormatException e) {
throw new PinotHelixResourceManager.InvalidTableConfigException(
"Invalid value for replicasPerPartition, expected a number: " + replicasPerPartitionStr, e);
@@ -113,7 +112,7 @@ public class PinotTableIdealStateBuilder {
if (idealState == null) {
idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
}
- pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+ return idealState;
}
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6b220f2..0de5e91 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
@@ -44,6 +45,7 @@ import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -161,82 +163,84 @@ public class PinotLLCRealtimeSegmentManager {
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
}
+
/**
- * The committing segment will call this.
- *
- * For example, say we have 3 shards, grouped into PartitionGroups as [0], [1], [2]
- * Now segment of PG (partition group) 0 is committing. First, we'll update the metadata to DONE, and ideal state to ONLINE
- * Then, the currentPartitionGroupMetadata list will contain - [1], [2]
- * The newPartitionGroupMetadata list will contain - [0], [1], [2]
- * We then get the set of PGs for which new segments need to be made - [0]
+ * Using the ideal state and segment metadata, return a list of the current partition groups
*/
- public void commitPartitionGroup(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
- TableConfig realtimeTableConfig = getTableConfig(realtimeTableName);
- StreamConfig streamConfig = new StreamConfig(realtimeTableName, IngestionConfigUtils.getStreamConfigMap(realtimeTableConfig));
- int numReplicas = realtimeTableConfig.getValidationConfig().getReplicasPerPartitionNumber();
- IdealState idealState = getIdealState(realtimeTableName);
-
- // update status in segment metadata to DONE
- // ..
-
- // update Ideal State for this segment to ONLINE
- // ..
-
- // fetch current partition groups (which are actively CONSUMING - from example above, [1], [2])
- List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
-
- // get new partition groups (honor any groupings which are already consuming - [0], [1], [2])
- StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
- StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
- .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
-
- // from the above list, remove the partition groups which are already CONSUMING
- // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0])
- // ..
-
- // setup segment metadata and ideal state for the new found partition groups
- setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
- }
+ public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
+ List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
- public void setupIdealStateForConsuming(List<SegmentZKMetadata> segmentZKMetadata, int numReplicas) {
- // add all segments from the list to ideal state, with state CONSUMING
- }
+ // from all segment names in the ideal state, find unique groups
+ Map<Integer, LLCSegmentName> groupIdToLatestSegment = new HashMap<>();
+ for (String segment : idealState.getPartitionSet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ groupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
+ if (latestSegment == null) {
+ return llcSegmentName;
+ } else {
+ return latestSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? latestSegment
+ : llcSegmentName;
+ }
+ });
+ }
- public void persistSegmentMetadata(List<SegmentZKMetadata> segmentMetadata) {
- // persist new segment metadata from list to zk
+ // create a PartitionGroupMetadata for each latest segment
+ for (Map.Entry<Integer, LLCSegmentName> entry : groupIdToLatestSegment.entrySet()) {
+ int partitionGroupId = entry.getKey();
+ LLCSegmentName llcSegmentName = entry.getValue();
+ RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider
+ .getRealtimeSegmentZKMetadata(_propertyStore, llcSegmentName.getTableName(), llcSegmentName.getSegmentName());
+ Preconditions.checkNotNull(realtimeSegmentZKMetadata);
+ LLCRealtimeSegmentZKMetadata llRealtimeSegmentZKMetadata =
+ (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
+ PartitionGroupMetadata partitionGroupMetadata =
+ new PartitionGroupMetadata(partitionGroupId, llcSegmentName.getSequenceNumber(),
+ llRealtimeSegmentZKMetadata.getStartOffset(), llRealtimeSegmentZKMetadata.getEndOffset(),
+ llRealtimeSegmentZKMetadata.getStatus().toString());
+ partitionGroupMetadataList.add(partitionGroupMetadata);
+ }
+ return partitionGroupMetadataList;
}
/**
- * Using the list of partition group metadata, create a list of equivalent segment zk metadata
+ * Sets up the realtime table ideal state for a table of consumer type SHARDED
*/
- public List<SegmentZKMetadata> constructSegmentMetadata(List<PartitionGroupMetadata> partitionGroupMetadataList) {
- List<SegmentZKMetadata> segmentZKMetadata = new ArrayList<>();
- // for each partition group construct a segment zk metadata object
- return segmentZKMetadata;
- }
+ public void setupNewShardedTable(TableConfig tableConfig, IdealState idealState) {
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
- /**
- * Using the ideal state, return a list of the current partition groups
- */
- public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
- // from all segment names in the ideal state, find unique groups
+ String realtimeTableName = tableConfig.getTableName();
+ LOGGER.info("Setting up new SHARDED table: {}", realtimeTableName);
- // create a PartitionGroupMetadata, one for each group
- return partitionGroupMetadataList;
- }
+ _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
- public void setupNewPartitionGroups(List<PartitionGroupMetadata> newPartitionGroupMetadataList, int numReplicas) {
- // construct segment zk metadata for the new partition groups
- List<SegmentZKMetadata> segmentMetadata = constructSegmentMetadata(newPartitionGroupMetadataList);
+ PartitionLevelStreamConfig streamConfig =
+ new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+
+ // get new partition groups and their metadata
+ StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+ .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
+
+ InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
+ int numReplicas = getNumReplicas(tableConfig, instancePartitions);
- // create these new segments metadata
- persistSegmentMetadata(segmentMetadata);
+ SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
- // setup ideal state for the new segments
- setupIdealStateForConsuming(segmentMetadata, numReplicas);
+ long currentTimeMs = getCurrentTimeMs();
+ Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
+ for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
+ String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata.getPartitionGroupId(),
+ currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
+ instancePartitionsMap);
+ }
+ setIdealState(realtimeTableName, idealState);
}
public boolean getIsSplitCommitEnabled() {
@@ -532,13 +536,50 @@ public class PinotLLCRealtimeSegmentManager {
_helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
// Step-2
+
+ // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up
+ // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+ StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+ .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
+ // find new partition groups [A],[B],[C],[D]
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
+
+ // create new segment metadata, only if it is not IN_PROGRESS in the current state
+ Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
+ Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
+
+ List<String> newConsumingSegmentNames = new ArrayList<>();
+ String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegmentName =
- getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig,
- new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)),
- newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata,
- instancePartitions, numPartitions, numReplicas);
+ for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
+ int newPartitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+ PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
+ if (currentPartitionGroupMetadata == null) { // not present in current state
+ // make new segment
+ LLCSegmentName newLLCSegmentName =
+ new LLCSegmentName(rawTableName, newPartitionGroupId, STARTING_SEQUENCE_NUMBER, newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+ } else {
+ String currentStatus = currentPartitionGroupMetadata.getStatus();
+ if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { // not IN_PROGRESS anymore in current state
+ // make new segment
+ LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
+ currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+ }
+ }
+ }
+
// Step-3
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -554,7 +595,7 @@ public class PinotLLCRealtimeSegmentManager {
Lock lock = _idealStateUpdateLocks[lockIndex];
try {
lock.lock();
- updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newLLCSegmentName.getSegmentName(),
+ updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentNames,
segmentAssignment, instancePartitionsMap);
} finally {
lock.unlock();
@@ -845,7 +886,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
- String newSegmentName, SegmentAssignment segmentAssignment,
+ List<String> newSegmentNames, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
@@ -862,14 +903,18 @@ public class PinotLLCRealtimeSegmentManager {
"Exceeded max segment completion time for segment " + committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
- newSegmentName, segmentAssignment, instancePartitionsMap);
+ null, segmentAssignment, instancePartitionsMap);
+ for (String newSegmentName : newSegmentNames) {
+ updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), null,
+ newSegmentName, segmentAssignment, instancePartitionsMap);
+ }
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
}
@VisibleForTesting
void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
- @Nullable String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment,
+ @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
@@ -880,11 +925,11 @@ public class PinotLLCRealtimeSegmentManager {
}
// Assign instances to the new segment and add instances as state CONSUMING
- List<String> instancesAssigned =
- segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
- instanceStatesMap.put(newSegmentName,
- SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
- LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
+ if (newSegmentName != null) {
+ List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+ instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+ LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
+ }
}
/*
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
deleted file mode 100644
index 78ee12c..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.pinot.core.realtime.impl.fakestream;
-
-import org.apache.pinot.spi.stream.Checkpoint;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-
-
-public class FakePartitionGroupMetadata implements PartitionGroupMetadata {
-
- private final int _groupId;
- public FakePartitionGroupMetadata(int groupId) {
- _groupId = groupId;
- }
-
- @Override
- public int getGroupId() {
- return getGroupId();
- }
-
- @Override
- public Checkpoint getStartCheckpoint() {
- return null;
- }
-
- @Override
- public Checkpoint getEndCheckpoint() {
- return null;
- }
-
- @Override
- public void setStartCheckpoint(Checkpoint startCheckpoint) {
-
- }
-
- @Override
- public void setEndCheckpoint(Checkpoint endCheckpoint) {
-
- }
-
- @Override
- public byte[] serialize() {
- return new byte[0];
- }
-
- @Override
- public PartitionGroupMetadata deserialize(byte[] blob) {
- return null;
- }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
deleted file mode 100644
index 1d792ac..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.pinot.plugin.stream.kafka09;
-
-import org.apache.pinot.spi.stream.Checkpoint;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-
-
-public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
-
- private final int _groupId;
- public KafkaPartitionGroupMetadata(int partitionId) {
- _groupId = partitionId;
- }
-
- @Override
- public int getGroupId() {
- return _groupId;
- }
-
- @Override
- public Checkpoint getStartCheckpoint() {
- return null;
- }
-
- @Override
- public Checkpoint getEndCheckpoint() {
- return null;
- }
-
- @Override
- public void setStartCheckpoint(Checkpoint startCheckpoint) {
-
- }
-
- @Override
- public void setEndCheckpoint(Checkpoint endCheckpoint) {
-
- }
-
- @Override
- public byte[] serialize() {
- return new byte[0];
- }
-
- @Override
- public PartitionGroupMetadata deserialize(byte[] blob) {
- return null;
- }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
deleted file mode 100644
index 31ae75a..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.pinot.plugin.stream.kafka20;
-
-import org.apache.pinot.spi.stream.Checkpoint;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-
-
-public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
-
- private final int _groupId;
- public KafkaPartitionGroupMetadata(int partitionId) {
- _groupId = partitionId;
- }
-
- @Override
- public int getGroupId() {
- return _groupId;
- }
-
- @Override
- public Checkpoint getStartCheckpoint() {
- return null;
- }
-
- @Override
- public Checkpoint getEndCheckpoint() {
- return null;
- }
-
- @Override
- public void setStartCheckpoint(Checkpoint startCheckpoint) {
-
- }
-
- @Override
- public void setEndCheckpoint(Checkpoint endCheckpoint) {
-
- }
-
- @Override
- public byte[] serialize() {
- return new byte[0];
- }
-
- @Override
- public PartitionGroupMetadata deserialize(byte[] blob) {
- return null;
- }
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index 0f44173..f662d99 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -18,22 +18,56 @@
*/
package org.apache.pinot.spi.stream;
-import java.util.List;
+public class PartitionGroupMetadata {
+ // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
+ private final int _partitionGroupId;
+ private int _sequenceNumber;
+ private String _startCheckpoint;
+ private String _endCheckpoint;
+ private String _status;
-public interface PartitionGroupMetadata {
+ public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, String startCheckpoint,
+ String endCheckpoint, String status) {
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _startCheckpoint = startCheckpoint;
+ _endCheckpoint = endCheckpoint;
+ }
- int getGroupId();
+ public void setSequenceNumber(int sequenceNumber) {
+ _sequenceNumber = sequenceNumber;
+ }
- Checkpoint getStartCheckpoint(); // similar to getStartOffset
+ public void setStartCheckpoint(String startCheckpoint) {
+ _startCheckpoint = startCheckpoint;
+ }
- Checkpoint getEndCheckpoint(); // similar to getEndOffset
+ public void setEndCheckpoint(String endCheckpoint) {
+ _endCheckpoint = endCheckpoint;
+ }
- void setStartCheckpoint(Checkpoint startCheckpoint);
+ public int getPartitionGroupId() {
+ return _partitionGroupId;
+ }
- void setEndCheckpoint(Checkpoint endCheckpoint);
+ public int getSequenceNumber() {
+ return _sequenceNumber;
+ }
- byte[] serialize();
+ public String getStartCheckpoint() {
+ return _startCheckpoint;
+ }
- PartitionGroupMetadata deserialize(byte[] blob);
+ public String getEndCheckpoint() {
+ return _endCheckpoint;
+ }
+
+ public String getStatus() {
+ return _status;
+ }
+
+ public void setStatus(String status) {
+ _status = status;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org