You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/12/24 19:57:26 UTC
[pinot] branch master updated: Reduce the number of segments to wait for convergence when rebalancing (#10028)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3fb211845d Reduce the number of segments to wait for convergence when rebalancing (#10028)
3fb211845d is described below
commit 3fb211845d75e9e5657fab79b5766a20ee232024
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Sun Dec 25 01:27:21 2022 +0530
Reduce the number of segments to wait for convergence when rebalancing (#10028)
---
.../assignment/segment/SegmentAssignmentUtils.java | 12 ++++++++
.../helix/core/rebalance/TableRebalancer.java | 14 ++++++---
.../TableRebalancerClusterStatelessTest.java | 10 ------
.../helix/core/rebalance/TableRebalancerTest.java | 36 +++++++++++-----------
4 files changed, 40 insertions(+), 32 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index bbe3f2fe4d..228a4fb264 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -292,6 +292,18 @@ public class SegmentAssignmentUtils {
return numSegmentsToBeMovedPerInstance;
}
+ public static Set<String> getSegmentsToMove(Map<String, Map<String, String>> oldAssignment,
+ Map<String, Map<String, String>> newAssignment) {
+ Set<String> result = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ if (!entry.getValue().equals(oldAssignment.get(segmentName))) {
+ result.add(segmentName);
+ }
+ }
+ return result;
+ }
+
/**
* Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments.
*/
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 26d0329dd2..f187404b3d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -306,9 +306,10 @@ public class TableRebalancer {
int expectedVersion = currentIdealState.getRecord().getVersion();
while (true) {
// Wait for ExternalView to converge before updating the next IdealState
+ Set<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
IdealState idealState;
try {
- idealState = waitForExternalViewToConverge(tableNameWithType, bestEfforts);
+ idealState = waitForExternalViewToConverge(tableNameWithType, bestEfforts, segmentsToMove);
} catch (Exception e) {
LOGGER.warn("Caught exception while waiting for ExternalView to converge for table: {}, aborting the rebalance",
tableNameWithType, e);
@@ -515,7 +516,8 @@ public class TableRebalancer {
tier.getName(), storage.getServerTag());
}
- private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts)
+ private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts,
+ Set<String> segmentsToMonitor)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_STABILIZATION_MAX_WAIT_MS;
@@ -530,7 +532,7 @@ public class TableRebalancer {
// ExternalView might be null when table is just created, skipping check for this iteration
if (externalView != null) {
if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(),
- idealState.getRecord().getMapFields(), bestEfforts)) {
+ idealState.getRecord().getMapFields(), bestEfforts, segmentsToMonitor)) {
LOGGER.info("ExternalView converged for table: {}", tableNameWithType);
return idealState;
}
@@ -557,9 +559,13 @@ public class TableRebalancer {
@VisibleForTesting
static boolean isExternalViewConverged(String tableNameWithType,
Map<String, Map<String, String>> externalViewSegmentStates,
- Map<String, Map<String, String>> idealStateSegmentStates, boolean bestEfforts) {
+ Map<String, Map<String, String>> idealStateSegmentStates, boolean bestEfforts,
+ @Nullable Set<String> segmentsToMonitor) {
for (Map.Entry<String, Map<String, String>> entry : idealStateSegmentStates.entrySet()) {
String segmentName = entry.getKey();
+ if (segmentsToMonitor != null && !segmentsToMonitor.contains(segmentName)) {
+ continue;
+ }
Map<String, String> externalViewInstanceStateMap = externalViewSegmentStates.get(segmentName);
Map<String, String> idealStateInstanceStateMap = entry.getValue();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 100a8dcf1a..4c006967d1 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -191,11 +191,6 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
instancePartitions.getPartitionToInstancesMap());
assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
- // ExternalView should match the new segment assignment
- assertTrue(TableRebalancer.isExternalViewConverged(OFFLINE_TABLE_NAME,
- _helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME).getRecord().getMapFields(), newSegmentAssignment,
- false));
-
// Update the table config to use replica-group based assignment
InstanceTagPoolConfig tagPoolConfig =
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null), false, 0, null);
@@ -247,11 +242,6 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
}
assertEquals(numSegmentsOnServer0, numSegments / 2);
- // ExternalView should match the segment assignment
- assertTrue(TableRebalancer.isExternalViewConverged(OFFLINE_TABLE_NAME,
- _helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME).getRecord().getMapFields(), newSegmentAssignment,
- false));
-
// Update the table config to use non-replica-group based assignment
tableConfig.setInstanceAssignmentConfigMap(null);
_helixResourceManager.updateTableConfig(tableConfig);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
index 549aabc8c1..84a660cc5e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java
@@ -524,10 +524,10 @@ public class TableRebalancerTest {
// Empty segment states should match
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Do not check segment that does not exist in IdealState
Map<String, String> instanceStateMap = new TreeMap<>();
@@ -535,10 +535,10 @@ public class TableRebalancerTest {
externalViewSegmentStates.put("segment1", instanceStateMap);
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Do not check segment that is OFFLINE in IdealState
instanceStateMap = new TreeMap<>();
@@ -546,62 +546,62 @@ public class TableRebalancerTest {
idealStateSegmentStates.put("segment2", instanceStateMap);
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Should fail when a segment has CONSUMING instance in IdealState but does not exist in ExternalView
instanceStateMap.put("instance2", CONSUMING);
assertFalse(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertFalse(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Should fail when instance state does not exist
instanceStateMap = new TreeMap<>();
externalViewSegmentStates.put("segment2", instanceStateMap);
assertFalse(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertFalse(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Should fail when instance state does not match
instanceStateMap.put("instance2", OFFLINE);
assertFalse(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertFalse(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Should pass when instance state matches
instanceStateMap.put("instance2", CONSUMING);
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Should pass when there are extra instances in ExternalView
instanceStateMap.put("instance3", CONSUMING);
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false));
+ false, null));
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
// Should throw exception when instance state is ERROR in ExternalView and best-efforts is disabled
instanceStateMap.put("instance2", ERROR);
try {
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- false);
+ false, null);
fail();
} catch (Exception e) {
// Expected
@@ -610,6 +610,6 @@ public class TableRebalancerTest {
// Should pass when instance state is ERROR in ExternalView and best-efforts is enabled
assertTrue(
TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates,
- true));
+ true, null));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org