You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/28 02:13:06 UTC
(pinot) branch master updated: Fix rebalance on upsert table (#12054)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a37ced6ec9 Fix rebalance on upsert table (#12054)
a37ced6ec9 is described below
commit a37ced6ec998aa771a71e6eba7624942b66d34b4
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Nov 27 18:13:00 2023 -0800
Fix rebalance on upsert table (#12054)
---
.../segment/StrictRealtimeSegmentAssignment.java | 174 ++++++++++++++-------
.../helix/core/rebalance/TableRebalancer.java | 58 ++++---
2 files changed, 155 insertions(+), 77 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index e8913563e3..2bc0ada5bd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -19,15 +19,21 @@
package org.apache.pinot.controller.helix.core.assignment.segment;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
/**
@@ -52,26 +58,53 @@ import org.apache.pinot.spi.utils.CommonConstants;
*/
public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment {
+ // Cache segment partition id to avoid ZK reads.
+ // NOTE:
+ // 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() can
+ // be invoked multiple times when the ideal state changes during the rebalance process.
+ // 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition.
+ // Replacing a segment with a segment from a different partition should not be allowed for upsert table because it
+ // will cause the segment being served by the wrong servers. If this happens during the table rebalance, another
+ // rebalance might be needed to fix the assignment.
+ private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new Object2IntOpenHashMap<>();
+
@Override
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
- Map.Entry<InstancePartitionsType, InstancePartitions> typeToInstancePartitions =
- instancePartitionsMap.entrySet().iterator().next();
- InstancePartitionsType instancePartitionsType = typeToInstancePartitions.getKey();
- InstancePartitions instancePartitions = typeToInstancePartitions.getValue();
- Preconditions.checkState(instancePartitionsType == InstancePartitionsType.CONSUMING,
- "Only CONSUMING instance partition type is allowed for table using upsert but got: " + instancePartitionsType);
+ InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
_logger.info("Assigning segment: {} with instance partitions: {} for table: {}", segmentName, instancePartitions,
_tableNameWithType);
- int segmentPartitionId =
- SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager,
- _partitionColumn);
- List<String> instancesAssigned = assignConsumingSegment(segmentPartitionId, instancePartitions);
- // Iterate the idealState to find the first segment that's in the same table partition with the new segment, and
- // check if their assignments are same. We try to derive the partition id from segment name to avoid ZK reads.
- Set<String> idealAssignment = null;
- List<String> nonStandardSegments = new ArrayList<>();
+
+ int partitionId = getPartitionId(segmentName);
+ List<String> instancesAssigned = assignConsumingSegment(partitionId, instancePartitions);
+ Set<String> existingAssignment = getExistingAssignment(partitionId, currentAssignment);
+ // Check if the candidate assignment is consistent with existing assignment. Use existing assignment if not.
+ if (existingAssignment == null) {
+ _logger.info("No existing assignment from idealState, using the one decided by instancePartitions");
+ } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+ _logger.warn("Assignment: {} is inconsistent with idealState: {}, using the one from idealState",
+ instancesAssigned, existingAssignment);
+ instancesAssigned = new ArrayList<>(existingAssignment);
+ if (_controllerMetrics != null) {
+ _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+ ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ }
+ }
+ _logger.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned,
+ _tableNameWithType);
+ return instancesAssigned;
+ }
+
+ /**
+ * Returns the existing assignment for the given partition id, or {@code null} if there is no existing segment for the
+ * partition. We try to derive the partition id from segment name to avoid ZK reads.
+ */
+ @Nullable
+ private Set<String> getExistingAssignment(int partitionId, Map<String, Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
// Skip OFFLINE segments as they are not rebalanced, so their assignment in idealState can be stale.
if (isOfflineSegment(entry.getValue())) {
@@ -79,59 +112,86 @@ public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment {
}
LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
if (llcSegmentName == null) {
- nonStandardSegments.add(entry.getKey());
+ uploadedSegments.add(entry.getKey());
continue;
}
- if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
- idealAssignment = entry.getValue().keySet();
- break;
- }
- }
- if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
- if (_logger.isDebugEnabled()) {
- int segmentCnt = nonStandardSegments.size();
- if (segmentCnt <= 10) {
- _logger.debug("Check ZK metadata of {} segments: {} for any one also from partition: {}", segmentCnt,
- nonStandardSegments, segmentPartitionId);
- } else {
- _logger.debug("Check ZK metadata of {} segments: {}... for any one also from partition: {}", segmentCnt,
- nonStandardSegments.subList(0, 10), segmentPartitionId);
- }
- }
- // Check ZK metadata for segments with non-standard LLC segment names to look for a segment that's in the same
- // table partition with the new segment.
- for (String nonStandardSegment : nonStandardSegments) {
- if (SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment, _tableNameWithType, _helixManager,
- _partitionColumn) == segmentPartitionId) {
- idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
- break;
- }
+ if (llcSegmentName.getPartitionGroupId() == partitionId) {
+ return entry.getValue().keySet();
}
}
- // Check if the candidate assignment is consistent with idealState. Use idealState if not.
- if (idealAssignment == null) {
- _logger.info("No existing assignment from idealState, using the one decided by instancePartitions");
- } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
- _logger.warn("Assignment: {} is inconsistent with idealState: {}, using the one from idealState",
- instancesAssigned, idealAssignment);
- instancesAssigned.clear();
- instancesAssigned.addAll(idealAssignment);
- if (_controllerMetrics != null) {
- _controllerMetrics.addMeteredTableValue(_tableNameWithType,
- ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+ // Check ZK metadata for uploaded segments to look for a segment that's in the same partition
+ for (String uploadedSegment : uploadedSegments) {
+ if (getPartitionId(uploadedSegment) == partitionId) {
+ return currentAssignment.get(uploadedSegment).keySet();
}
}
- _logger.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned,
- _tableNameWithType);
- return instancesAssigned;
+ return null;
}
+ /**
+ * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor CONSUMING), {@code false} otherwise.
+ */
+ private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+ return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) && !instanceStateMap.containsValue(
+ SegmentStateModel.CONSUMING);
+ }
+
+ /**
+ * Returns the partition id of the given segment.
+ */
+ private int getPartitionId(String segmentName) {
+ Integer partitionId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _partitionColumn);
+ Preconditions.checkState(partitionId != null, "Failed to find partition id for segment: %s of table: %s",
+ segmentName, _tableNameWithType);
+ return partitionId;
+ }
+
+ /**
+ * Returns {@code true} if the ideal assignment and the actual assignment are the same, {@code false} otherwise.
+ */
private boolean isSameAssignment(Set<String> idealAssignment, List<String> instancesAssigned) {
return idealAssignment.size() == instancesAssigned.size() && idealAssignment.containsAll(instancesAssigned);
}
- private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
- return !instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)
- && !instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, @Nullable List<Tier> sortedTiers,
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, RebalanceConfig config) {
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
+ InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
+ _tableNameWithType);
+ Preconditions.checkArgument(config.isIncludeConsuming(),
+ "Consuming segment must be included when rebalancing upsert table: %s", _tableNameWithType);
+ Preconditions.checkState(sortedTiers == null, "Tiers must not be specified for upsert table: %s",
+ _tableNameWithType);
+ _logger.info("Rebalancing table: {} with instance partitions: {}", _tableNameWithType, instancePartitions);
+
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (isOfflineSegment(instanceStateMap)) {
+ // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the
+ // OFFLINE segments and re-assign them
+ newAssignment.put(segmentName, instanceStateMap);
+ } else {
+ // Reassign CONSUMING and COMPLETED segments
+ List<String> instancesAssigned =
+ assignConsumingSegment(getPartitionIdUsingCache(segmentName), instancePartitions);
+ String state = instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? SegmentStateModel.CONSUMING
+ : SegmentStateModel.ONLINE;
+ newAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+ }
+ }
+ return newAssignment;
+ }
+
+ /**
+ * Returns the partition id of the given segment, using cached partition id if exists.
+ */
+ private int getPartitionIdUsingCache(String segmentName) {
+ return _segmentPartitionIdMap.computeIntIfAbsent(segmentName, this::getPartitionId);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a052794ae2..cd4d70ee3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -58,6 +58,7 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -326,14 +327,18 @@ public class TableRebalancer {
targetAssignment);
// Calculate the min available replicas for no-downtime rebalance
- // NOTE: The calculation is based on the number of replicas of the target assignment. In case of increasing the
- // number of replicas for the current assignment, the current instance state map might not have enough
- // replicas to reach the minimum available replicas requirement. In this scenario we don't want to fail the
- // check, but keep all the current instances as this is the best we can do, and can help the table get out of
- // this state.
+ // NOTE:
+ // 1. The calculation is based on the number of replicas of the target assignment. In case of increasing the number
+ // of replicas for the current assignment, the current instance state map might not have enough replicas to reach
+ // the minimum available replicas requirement. In this scenario we don't want to fail the check, but keep all the
+ // current instances as this is the best we can do, and can help the table get out of this state.
+ // 2. Only check the segments to be moved because we don't need to maintain available replicas for segments not
+ // being moved, including segments with all replicas OFFLINE (error segments during consumption).
+ Set<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+
int numReplicas = Integer.MAX_VALUE;
- for (Map<String, String> instanceStateMap : targetAssignment.values()) {
- numReplicas = Math.min(instanceStateMap.size(), numReplicas);
+ for (String segment : segmentsToMove) {
+ numReplicas = Math.min(targetAssignment.get(segment).size(), numReplicas);
}
int minAvailableReplicas;
if (minReplicasToKeepUpForNoDowntime >= 0) {
@@ -362,9 +367,17 @@ public class TableRebalancer {
externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
int expectedVersion = currentIdealState.getRecord().getVersion();
+ // We repeat the following steps until the target assignment is reached:
+ // 1. Wait for ExternalView to converge with the IdealState. Fail the rebalance if it doesn't converge within the
+ // timeout.
+ // 2. When IdealState changes during step 1, re-calculate the target assignment based on the new IdealState (current
+ // assignment).
+ // 3. Check if the target assignment is reached. Rebalance is done if it is reached.
+ // 4. Calculate the next assignment based on the current assignment, target assignment and min available replicas.
+ // 5. Update the IdealState to the next assignment. If the IdealState changes before the update, go back to step 1.
while (true) {
// Wait for ExternalView to converge before updating the next IdealState
- Set<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+ segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
IdealState idealState;
try {
idealState =
@@ -399,16 +412,22 @@ public class TableRebalancer {
// If all the segments to be moved remain unchanged (same instance state map) in the new ideal state, apply the
// same target instance state map for these segments to the new ideal state as the target assignment
boolean segmentsToMoveChanged = false;
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap = currentAssignment.get(segment);
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- LOGGER.info(
- "For rebalanceId: {}, segment state changed in IdealState from: {} to: {} for table: {}, segment: {}, "
- + "re-calculating the target assignment based on the new IdealState", rebalanceJobId,
- oldInstanceStateMap, currentInstanceStateMap, tableNameWithType, segment);
- segmentsToMoveChanged = true;
- break;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the target assignment because the assignment for
+ // new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap = currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING to ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ LOGGER.info("For rebalanceId: {}, segment state changed in IdealState from: {} to: {} for table: {}, "
+ + "segment: {}, re-calculating the target assignment based on the new IdealState", rebalanceJobId,
+ oldInstanceStateMap, currentInstanceStateMap, tableNameWithType, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
if (segmentsToMoveChanged) {
@@ -417,8 +436,7 @@ public class TableRebalancer {
instancePartitionsMap =
getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false).getLeft();
tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap,
- dryRun).getLeft();
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, false).getLeft();
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org