You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/12/24 19:57:26 UTC

[pinot] branch master updated: Reduce the number of segments to wait for convergence when rebalancing (#10028)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fb211845d Reduce the number of segments to wait for convergence when rebalancing (#10028)
3fb211845d is described below

commit 3fb211845d75e9e5657fab79b5766a20ee232024
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Sun Dec 25 01:27:21 2022 +0530

    Reduce the number of segments to wait for convergence when rebalancing (#10028)
---
 .../assignment/segment/SegmentAssignmentUtils.java | 12 ++++++++
 .../helix/core/rebalance/TableRebalancer.java      | 14 ++++++---
 .../TableRebalancerClusterStatelessTest.java       | 10 ------
 .../helix/core/rebalance/TableRebalancerTest.java  | 36 +++++++++++-----------
 4 files changed, 40 insertions(+), 32 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index bbe3f2fe4d..228a4fb264 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -292,6 +292,18 @@ public class SegmentAssignmentUtils {
     return numSegmentsToBeMovedPerInstance;
   }
 
+  public static Set<String> getSegmentsToMove(Map<String, Map<String, String>> oldAssignment,
+      Map<String, Map<String, String>> newAssignment) {
+    Set<String> result = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      if (!entry.getValue().equals(oldAssignment.get(segmentName))) {
+        result.add(segmentName);
+      }
+    }
+    return result;
+  }
+
   /**
    * Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments.
    */
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 26d0329dd2..f187404b3d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -306,9 +306,10 @@ public class TableRebalancer {
     int expectedVersion = currentIdealState.getRecord().getVersion();
     while (true) {
       // Wait for ExternalView to converge before updating the next IdealState
+      Set<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
       IdealState idealState;
       try {
-        idealState = waitForExternalViewToConverge(tableNameWithType, bestEfforts);
+        idealState = waitForExternalViewToConverge(tableNameWithType, bestEfforts, segmentsToMove);
       } catch (Exception e) {
         LOGGER.warn("Caught exception while waiting for ExternalView to converge for table: {}, aborting the rebalance",
             tableNameWithType, e);
@@ -515,7 +516,8 @@ public class TableRebalancer {
         tier.getName(), storage.getServerTag());
   }
 
-  private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts)
+  private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts,
+      Set<String> segmentsToMonitor)
       throws InterruptedException, TimeoutException {
     long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_STABILIZATION_MAX_WAIT_MS;
 
@@ -530,7 +532,7 @@ public class TableRebalancer {
       // ExternalView might be null when table is just created, skipping check for this iteration
       if (externalView != null) {
         if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), bestEfforts)) {
+            idealState.getRecord().getMapFields(), bestEfforts, segmentsToMonitor)) {
           LOGGER.info("ExternalView converged for table: {}", tableNameWithType);
           return idealState;
         }
@@ -557,9 +559,13 @@ public class TableRebalancer {
   @VisibleForTesting
   static boolean isExternalViewConverged(String tableNameWithType,
       Map<String, Map<String, String>> externalViewSegmentStates,
-      Map<String, Map<String, String>> idealStateSegmentStates, boolean bestEfforts) {
+      Map<String, Map<String, String>> idealStateSegmentStates, boolean bestEfforts,
+      @Nullable Set<String> segmentsToMonitor) {
     for (Map.Entry<String, Map<String, String>> entry : idealStateSegmentStates.entrySet()) {
       String segmentName = entry.getKey();
+      if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) {
+        continue;
+      }
       Map<String, String> externalViewInstanceStateMap = externalViewSegmentStates.get(segmentName);
       Map<String, String> idealStateInstanceStateMap = entry.getValue();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 100a8dcf1a..4c006967d1 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -191,11 +191,6 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
         instancePartitions.getPartitionToInstancesMap());
     assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
 
-    // ExternalView should match the new segment assignment
-    assertTrue(TableRebalancer.isExternalViewConverged(OFFLINE_TABLE_NAME,
-        _helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME).getRecord().getMapFields(), newSegmentAssignment,
-        false));
-
     // Update the table config to use replica-group based assignment
     InstanceTagPoolConfig tagPoolConfig =
         new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null);
@@ -247,11 +242,6 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
     }
     assertEquals(numSegmentsOnServer0, numSegments / 2);
 
-    // ExternalView should match the segment assignment
-    assertTrue(TableRebalancer.isExternalViewConverged(OFFLINE_TABLE_NAME,
-        _helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME).getRecord().getMapFields(), newSegmentAssignment,
-        false));
-
     // Update the table config to use non-replica-group based assignment
     tableConfig.setInstanceAssignmentConfigMap(null);
     _helixResourceManager.updateTableConfig(tableConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 549aabc8c1..84a660cc5e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -524,10 +524,10 @@ public class TableRebalancerTest {
     // Empty segment states should match
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Do not check segment that does not exist in IdealState
     Map<String, String> instanceStateMap = new TreeMap<>();
@@ -535,10 +535,10 @@ public class TableRebalancerTest {
     externalViewSegmentStates.put("segment1", instanceStateMap);
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Do not check segment that is OFFLINE in IdealState
     instanceStateMap = new TreeMap<>();
@@ -546,62 +546,62 @@ public class TableRebalancerTest {
     idealStateSegmentStates.put("segment2", instanceStateMap);
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Should fail when a segment has CONSUMING instance in IdealState but does not exist in ExternalView
     instanceStateMap.put("instance2", CONSUMING);
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Should fail when instance state does not exist
     instanceStateMap = new TreeMap<>();
     externalViewSegmentStates.put("segment2", instanceStateMap);
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Should fail when instance state does not match
     instanceStateMap.put("instance2", OFFLINE);
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertFalse(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Should pass when instance state matches
     instanceStateMap.put("instance2", CONSUMING);
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Should pass when there are extra instances in ExternalView
     instanceStateMap.put("instance3", CONSUMING);
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            false));
+            false, null));
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
 
     // Should throw exception when instance state is ERROR in ExternalView and best-efforts is disabled
     instanceStateMap.put("instance2", ERROR);
     try {
       TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-          false);
+          false, null);
       fail();
     } catch (Exception e) {
       // Expected
@@ -610,6 +610,6 @@ public class TableRebalancerTest {
     // Should pass when instance state is ERROR in ExternalView and best-efforts is enabled
     assertTrue(
         TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
-            true));
+            true, null));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org