You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/28 02:13:06 UTC

(pinot) branch master updated: Fix rebalance on upsert table (#12054)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a37ced6ec9 Fix rebalance on upsert table (#12054)
a37ced6ec9 is described below

commit a37ced6ec998aa771a71e6eba7624942b66d34b4
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Nov 27 18:13:00 2023 -0800

    Fix rebalance on upsert table (#12054)
---
 .../segment/StrictRealtimeSegmentAssignment.java   | 174 ++++++++++++++-------
 .../helix/core/rebalance/TableRebalancer.java      |  58 ++++---
 2 files changed, 155 insertions(+), 77 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
index e8913563e3..2bc0ada5bd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java
@@ -19,15 +19,21 @@
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 
 
 /**
@@ -52,26 +58,53 @@ import org.apache.pinot.spi.utils.CommonConstants;
  */
 public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment {
 
+  // Cache segment partition id to avoid ZK reads.
+  // NOTE:
+  // 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() can
+  //    be invoked multiple times when the ideal state changes during the rebalance process.
+  // 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition.
+  //    Replacing a segment with a segment from a different partition should not be allowed for upsert table because it
+  //    will cause the segment being served by the wrong servers. If this happens during the table rebalance, another
+  //    rebalance might be needed to fix the assignment.
+  private final Object2IntOpenHashMap<String> _segmentPartitionIdMap = new Object2IntOpenHashMap<>();
+
   @Override
   public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
-    Map.Entry<InstancePartitionsType, InstancePartitions> typeToInstancePartitions =
-        instancePartitionsMap.entrySet().iterator().next();
-    InstancePartitionsType instancePartitionsType = typeToInstancePartitions.getKey();
-    InstancePartitions instancePartitions = typeToInstancePartitions.getValue();
-    Preconditions.checkState(instancePartitionsType == InstancePartitionsType.CONSUMING,
-        "Only CONSUMING instance partition type is allowed for table using upsert but got: " + instancePartitionsType);
+    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
     _logger.info("Assigning segment: {} with instance partitions: {} for table: {}", segmentName, instancePartitions,
         _tableNameWithType);
-    int segmentPartitionId =
-        SegmentAssignmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager,
-            _partitionColumn);
-    List<String> instancesAssigned = assignConsumingSegment(segmentPartitionId, instancePartitions);
-    // Iterate the idealState to find the first segment that's in the same table partition with the new segment, and
-    // check if their assignments are same. We try to derive the partition id from segment name to avoid ZK reads.
-    Set<String> idealAssignment = null;
-    List<String> nonStandardSegments = new ArrayList<>();
+
+    int partitionId = getPartitionId(segmentName);
+    List<String> instancesAssigned = assignConsumingSegment(partitionId, instancePartitions);
+    Set<String> existingAssignment = getExistingAssignment(partitionId, currentAssignment);
+    // Check if the candidate assignment is consistent with existing assignment. Use existing assignment if not.
+    if (existingAssignment == null) {
+      _logger.info("No existing assignment from idealState, using the one decided by instancePartitions");
+    } else if (!isSameAssignment(existingAssignment, instancesAssigned)) {
+      _logger.warn("Assignment: {} is inconsistent with idealState: {}, using the one from idealState",
+          instancesAssigned, existingAssignment);
+      instancesAssigned = new ArrayList<>(existingAssignment);
+      if (_controllerMetrics != null) {
+        _controllerMetrics.addMeteredTableValue(_tableNameWithType,
+            ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+      }
+    }
+    _logger.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned,
+        _tableNameWithType);
+    return instancesAssigned;
+  }
+
+  /**
+   * Returns the existing assignment for the given partition id, or {@code null} if there is no existing segment for the
+   * partition. We try to derive the partition id from segment name to avoid ZK reads.
+   */
+  @Nullable
+  private Set<String> getExistingAssignment(int partitionId, Map<String, Map<String, String>> currentAssignment) {
+    List<String> uploadedSegments = new ArrayList<>();
     for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
       // Skip OFFLINE segments as they are not rebalanced, so their assignment in idealState can be stale.
       if (isOfflineSegment(entry.getValue())) {
@@ -79,59 +112,86 @@ public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment {
       }
       LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
       if (llcSegmentName == null) {
-        nonStandardSegments.add(entry.getKey());
+        uploadedSegments.add(entry.getKey());
         continue;
       }
-      if (llcSegmentName.getPartitionGroupId() == segmentPartitionId) {
-        idealAssignment = entry.getValue().keySet();
-        break;
-      }
-    }
-    if (idealAssignment == null && !nonStandardSegments.isEmpty()) {
-      if (_logger.isDebugEnabled()) {
-        int segmentCnt = nonStandardSegments.size();
-        if (segmentCnt <= 10) {
-          _logger.debug("Check ZK metadata of {} segments: {} for any one also from partition: {}", segmentCnt,
-              nonStandardSegments, segmentPartitionId);
-        } else {
-          _logger.debug("Check ZK metadata of {} segments: {}... for any one also from partition: {}", segmentCnt,
-              nonStandardSegments.subList(0, 10), segmentPartitionId);
-        }
-      }
-      // Check ZK metadata for segments with non-standard LLC segment names to look for a segment that's in the same
-      // table partition with the new segment.
-      for (String nonStandardSegment : nonStandardSegments) {
-        if (SegmentAssignmentUtils.getRealtimeSegmentPartitionId(nonStandardSegment, _tableNameWithType, _helixManager,
-            _partitionColumn) == segmentPartitionId) {
-          idealAssignment = currentAssignment.get(nonStandardSegment).keySet();
-          break;
-        }
+      if (llcSegmentName.getPartitionGroupId() == partitionId) {
+        return entry.getValue().keySet();
       }
     }
-    // Check if the candidate assignment is consistent with idealState. Use idealState if not.
-    if (idealAssignment == null) {
-      _logger.info("No existing assignment from idealState, using the one decided by instancePartitions");
-    } else if (!isSameAssignment(idealAssignment, instancesAssigned)) {
-      _logger.warn("Assignment: {} is inconsistent with idealState: {}, using the one from idealState",
-          instancesAssigned, idealAssignment);
-      instancesAssigned.clear();
-      instancesAssigned.addAll(idealAssignment);
-      if (_controllerMetrics != null) {
-        _controllerMetrics.addMeteredTableValue(_tableNameWithType,
-            ControllerMeter.CONTROLLER_REALTIME_TABLE_SEGMENT_ASSIGNMENT_MISMATCH, 1L);
+    // Check ZK metadata for uploaded segments to look for a segment that's in the same partition
+    for (String uploadedSegment : uploadedSegments) {
+      if (getPartitionId(uploadedSegment) == partitionId) {
+        return currentAssignment.get(uploadedSegment).keySet();
       }
     }
-    _logger.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned,
-        _tableNameWithType);
-    return instancesAssigned;
+    return null;
   }
 
+  /**
+   * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor CONSUMING), {@code false} otherwise.
+   */
+  private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
+    return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) && !instanceStateMap.containsValue(
+        SegmentStateModel.CONSUMING);
+  }
+
+  /**
+   * Returns the partition id of the given segment.
+   */
+  private int getPartitionId(String segmentName) {
+    Integer partitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _partitionColumn);
+    Preconditions.checkState(partitionId != null, "Failed to find partition id for segment: %s of table: %s",
+        segmentName, _tableNameWithType);
+    return partitionId;
+  }
+
+  /**
+   * Returns {@code true} if the ideal assignment and the actual assignment are the same, {@code false} otherwise.
+   */
   private boolean isSameAssignment(Set<String> idealAssignment, List<String> instancesAssigned) {
     return idealAssignment.size() == instancesAssigned.size() && idealAssignment.containsAll(instancesAssigned);
   }
 
-  private boolean isOfflineSegment(Map<String, String> instanceStateMap) {
-    return !instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)
-        && !instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, @Nullable List<Tier> sortedTiers,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, RebalanceConfig config) {
+    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
+    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+    Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
+        _tableNameWithType);
+    Preconditions.checkArgument(config.isIncludeConsuming(),
+        "Consuming segment must be included when rebalancing upsert table: %s", _tableNameWithType);
+    Preconditions.checkState(sortedTiers == null, "Tiers must not be specified for upsert table: %s",
+        _tableNameWithType);
+    _logger.info("Rebalancing table: {} with instance partitions: {}", _tableNameWithType, instancePartitions);
+
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (isOfflineSegment(instanceStateMap)) {
+        // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the
+        // OFFLINE segments and re-assign them
+        newAssignment.put(segmentName, instanceStateMap);
+      } else {
+        // Reassign CONSUMING and COMPLETED segments
+        List<String> instancesAssigned =
+            assignConsumingSegment(getPartitionIdUsingCache(segmentName), instancePartitions);
+        String state = instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? SegmentStateModel.CONSUMING
+            : SegmentStateModel.ONLINE;
+        newAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state));
+      }
+    }
+    return newAssignment;
+  }
+
+  /**
+   * Returns the partition id of the given segment, using cached partition id if exists.
+   */
+  private int getPartitionIdUsingCache(String segmentName) {
+    return _segmentPartitionIdMap.computeIntIfAbsent(segmentName, this::getPartitionId);
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a052794ae2..cd4d70ee3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -58,6 +58,7 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -326,14 +327,18 @@ public class TableRebalancer {
         targetAssignment);
 
     // Calculate the min available replicas for no-downtime rebalance
-    // NOTE: The calculation is based on the number of replicas of the target assignment. In case of increasing the
-    //       number of replicas for the current assignment, the current instance state map might not have enough
-    //       replicas to reach the minimum available replicas requirement. In this scenario we don't want to fail the
-    //       check, but keep all the current instances as this is the best we can do, and can help the table get out of
-    //       this state.
+    // NOTE:
+    // 1. The calculation is based on the number of replicas of the target assignment. In case of increasing the number
+    //    of replicas for the current assignment, the current instance state map might not have enough replicas to reach
+    //    the minimum available replicas requirement. In this scenario we don't want to fail the check, but keep all the
+    //    current instances as this is the best we can do, and can help the table get out of this state.
+    // 2. Only check the segments to be moved because we don't need to maintain available replicas for segments not
+    //    being moved, including segments with all replicas OFFLINE (error segments during consumption).
+    Set<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+
     int numReplicas = Integer.MAX_VALUE;
-    for (Map<String, String> instanceStateMap : targetAssignment.values()) {
-      numReplicas = Math.min(instanceStateMap.size(), numReplicas);
+    for (String segment : segmentsToMove) {
+      numReplicas = Math.min(targetAssignment.get(segment).size(), numReplicas);
     }
     int minAvailableReplicas;
     if (minReplicasToKeepUpForNoDowntime >= 0) {
@@ -362,9 +367,17 @@ public class TableRebalancer {
         externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
     int expectedVersion = currentIdealState.getRecord().getVersion();
 
+    // We repeat the following steps until the target assignment is reached:
+    // 1. Wait for ExternalView to converge with the IdealState. Fail the rebalance if it doesn't converge within the
+    //    timeout.
+    // 2. When IdealState changes during step 1, re-calculate the target assignment based on the new IdealState (current
+    //    assignment).
+    // 3. Check if the target assignment is reached. Rebalance is done if it is reached.
+    // 4. Calculate the next assignment based on the current assignment, target assignment and min available replicas.
+    // 5. Update the IdealState to the next assignment. If the IdealState changes before the update, go back to step 1.
     while (true) {
       // Wait for ExternalView to converge before updating the next IdealState
-      Set<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+      segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
       IdealState idealState;
       try {
         idealState =
@@ -399,16 +412,22 @@ public class TableRebalancer {
         // If all the segments to be moved remain unchanged (same instance state map) in the new ideal state, apply the
         // same target instance state map for these segments to the new ideal state as the target assignment
         boolean segmentsToMoveChanged = false;
-        for (String segment : segmentsToMove) {
-          Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
-          Map<String, String> currentInstanceStateMap = currentAssignment.get(segment);
-          if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
-            LOGGER.info(
-                "For rebalanceId: {}, segment state changed in IdealState from: {} to: {} for table: {}, segment: {}, "
-                    + "re-calculating the target assignment based on the new IdealState", rebalanceJobId,
-                oldInstanceStateMap, currentInstanceStateMap, tableNameWithType, segment);
-            segmentsToMoveChanged = true;
-            break;
+        if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+          // For StrictRealtimeSegmentAssignment, we need to recompute the target assignment because the assignment for
+          // new added segments is based on the existing assignment
+          segmentsToMoveChanged = true;
+        } else {
+          for (String segment : segmentsToMove) {
+            Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
+            Map<String, String> currentInstanceStateMap = currentAssignment.get(segment);
+            // TODO: Consider allowing segment state change from CONSUMING to ONLINE
+            if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+              LOGGER.info("For rebalanceId: {}, segment state changed in IdealState from: {} to: {} for table: {}, "
+                      + "segment: {}, re-calculating the target assignment based on the new IdealState", rebalanceJobId,
+                  oldInstanceStateMap, currentInstanceStateMap, tableNameWithType, segment);
+              segmentsToMoveChanged = true;
+              break;
+            }
           }
         }
         if (segmentsToMoveChanged) {
@@ -417,8 +436,7 @@ public class TableRebalancer {
             instancePartitionsMap =
                 getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false).getLeft();
             tierToInstancePartitionsMap =
-                getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap,
-                    dryRun).getLeft();
+                getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, false).getLeft();
             targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
                 tierToInstancePartitionsMap, rebalanceConfig);
           } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org