You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/25 01:17:10 UTC
[pinot] branch master updated: For table rebalance, check if instances are equal for NO_OP (#11073)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 06d8540378 For table rebalance, check if instances are equal for NO_OP (#11073)
06d8540378 is described below
commit 06d8540378d9df6220c0e0e47d42841b3376bd96
Author: summerhasama-stripe <13...@users.noreply.github.com>
AuthorDate: Mon Jul 24 21:17:03 2023 -0400
For table rebalance, check if instances are equal for NO_OP (#11073)
---
.../common/assignment/InstancePartitions.java | 19 +++
.../helix/core/rebalance/TableRebalancer.java | 174 +++++++++++++--------
.../TableRebalancerClusterStatelessTest.java | 16 +-
3 files changed, 134 insertions(+), 75 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index a296527e84..a67bb93ce0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.TreeMap;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -146,4 +147,22 @@ public class InstancePartitions {
public String toString() {
return toJsonString();
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof InstancePartitions)) {
+ return false;
+ }
+ InstancePartitions other = (InstancePartitions) obj;
+ return Objects.equals(_instancePartitionsName, other._instancePartitionsName)
+ && Objects.equals(_partitionToInstancesMap, other._partitionToInstancesMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_instancePartitionsName, _partitionToInstancesMap);
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 22c5b87635..95df26ff96 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -226,8 +226,12 @@ public class TableRebalancer {
// Calculate instance partitions map
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
+ boolean instancePartitionsUnchanged;
try {
- instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun);
+ Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> instancePartitionsMapAndUnchanged =
+ getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun);
+ instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
+ instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
LOGGER.warn("For rebalanceId: {}, caught exception while fetching/calculating instance partitions for table: {}, "
+ "aborting the rebalance", rebalanceJobId, tableNameWithType, e);
@@ -237,9 +241,13 @@ public class TableRebalancer {
// Calculate instance partitions for tiers if configured
List<Tier> sortedTiers = getSortedTiers(tableConfig);
- Map<String, InstancePartitions> tierToInstancePartitionsMap =
+
+ Pair<Map<String, InstancePartitions>, Boolean> tierToInstancePartitionsMapAndUnchanged =
getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
+ Map<String, InstancePartitions> tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft();
+ boolean tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight();
+
LOGGER.info("For rebalanceId: {}, calculating the target assignment for table: {}", rebalanceJobId,
tableNameWithType);
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -256,21 +264,26 @@ public class TableRebalancer {
tierToInstancePartitionsMap, null);
}
- if (currentAssignment.equals(targetAssignment)) {
+ boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment);
+ LOGGER.info("For rebalanceId: {}, segmentAssignmentUnchanged: {}, "
+ + "tierInstancePartitionsUnchanged: {}, instancePartitionsUnchanged: {} for table: {}",
+ rebalanceJobId, segmentAssignmentUnchanged, tierInstancePartitionsUnchanged,
+ instancePartitionsUnchanged, tableNameWithType);
+
+ if (segmentAssignmentUnchanged) {
LOGGER.info("Table: {} is already balanced", tableNameWithType);
- if (reassignInstances) {
+ if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
+ return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced",
+ instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
+ } else {
if (dryRun) {
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment);
- } else {
- return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
- "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment);
}
- } else {
- return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced",
- instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment);
+ return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
+ "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap,
+ targetAssignment);
}
}
@@ -399,9 +412,11 @@ public class TableRebalancer {
if (segmentsToMoveChanged) {
try {
// Re-calculate the instance partitions in case the instance configs changed during the rebalance
- instancePartitionsMap = getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false);
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false).getLeft();
tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances,
+ bootstrap, dryRun).getLeft();
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
@@ -480,22 +495,32 @@ public class TableRebalancer {
}
}
- private Map<InstancePartitionsType, InstancePartitions> getInstancePartitionsMap(TableConfig tableConfig,
- boolean reassignInstances, boolean bootstrap, boolean dryRun) {
+ /**
+ * Gets the instance partitions for instance partition types and also returns a boolean for whether they are unchanged
+ */
+ private Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> getInstancePartitionsMap(
+ TableConfig tableConfig, boolean reassignInstances, boolean bootstrap, boolean dryRun) {
+ boolean instancePartitionsUnchanged = true;
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
if (tableConfig.getTableType() == TableType.OFFLINE) {
- instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
- getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun));
+ Pair<InstancePartitions, Boolean> partitionAndUnchangedForOffline =
+ getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE, reassignInstances, bootstrap, dryRun);
+ instancePartitionsMap.put(InstancePartitionsType.OFFLINE, partitionAndUnchangedForOffline.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged && partitionAndUnchangedForOffline.getRight();
} else {
- instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
- getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun));
+ Pair<InstancePartitions, Boolean> partitionAndUnchangedForConsuming =
+ getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING, reassignInstances, bootstrap, dryRun);
+ instancePartitionsMap.put(InstancePartitionsType.CONSUMING, partitionAndUnchangedForConsuming.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged && partitionAndUnchangedForConsuming.getRight();
String tableNameWithType = tableConfig.getTableName();
if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
+ Pair<InstancePartitions, Boolean> partitionAndUnchangedForCompleted =
+ getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun);
LOGGER.info(
"COMPLETED segments should be relocated, fetching/computing COMPLETED instance partitions for table: {}",
tableNameWithType);
- instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
- getInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun));
+ instancePartitionsMap.put(InstancePartitionsType.COMPLETED, partitionAndUnchangedForCompleted.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged && partitionAndUnchangedForCompleted.getRight();
} else {
LOGGER.info(
"COMPLETED segments should not be relocated, skipping fetching/computing COMPLETED instance partitions "
@@ -509,12 +534,21 @@ public class TableRebalancer {
}
}
}
- return instancePartitionsMap;
+ return Pair.of(instancePartitionsMap, instancePartitionsUnchanged);
}
- private InstancePartitions getInstancePartitions(TableConfig tableConfig,
+ /**
+ * Fetches/computes the instance partitions and also returns a boolean for whether they are unchanged
+ */
+ private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType, boolean reassignInstances, boolean bootstrap, boolean dryRun) {
String tableNameWithType = tableConfig.getTableName();
+
+ InstancePartitions existingInstancePartitions =
+ InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+ InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
+ instancePartitionsType.toString()));
+
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
if (reassignInstances) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -525,35 +559,33 @@ public class TableRebalancer {
InstancePartitions instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
- if (!dryRun) {
+ boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, instancePartitionsUnchanged);
}
- // Set existing instance partition to null if bootstrap mode is enabled, so that the instance partition
- // map can be fully recalculated.
- InstancePartitions existingInstancePartitions = bootstrap ? null
- : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
- InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
- instancePartitionsType.toString()));
LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType);
+ // Assign instances with existing instance partition to null if bootstrap mode is enabled,
+ // so that the instance partition map can be fully recalculated.
InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType,
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
- existingInstancePartitions);
- if (!dryRun) {
+ bootstrap ? null : existingInstancePartitions);
+ boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, instancePartitionsUnchanged);
} else {
LOGGER.info("Fetching/computing {} instance partitions for table: {}", instancePartitionsType,
tableNameWithType);
- return InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, tableConfig,
- instancePartitionsType);
+ return Pair.of(InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, tableConfig,
+ instancePartitionsType), true);
}
} else {
LOGGER.info("{} instance assignment is not allowed, using default instance partitions for table: {}",
@@ -564,12 +596,15 @@ public class TableRebalancer {
}
InstancePartitions instancePartitions =
InstancePartitionsUtils.computeDefaultInstancePartitions(_helixManager, tableConfig, instancePartitionsType);
- if (!dryRun) {
+
+ Boolean noExistingInstancePartitions = existingInstancePartitions == null;
+
+ if (!dryRun && !noExistingInstancePartitions) {
String instancePartitionsName = instancePartitions.getInstancePartitionsName();
LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName);
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, noExistingInstancePartitions);
}
}
@@ -586,74 +621,79 @@ public class TableRebalancer {
}
}
- @Nullable
- private Map<String, InstancePartitions> getTierToInstancePartitionsMap(TableConfig tableConfig,
+ /**
+ * Fetches/computes the instance partitions for sorted tiers and also returns a boolean for whether the
+ * instance partitions are unchanged.
+ */
+ private Pair<Map<String, InstancePartitions>, Boolean> getTierToInstancePartitionsMap(TableConfig tableConfig,
@Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean bootstrap, boolean dryRun) {
if (sortedTiers == null) {
- return null;
+ return Pair.of(null, true);
}
+ boolean instancePartitionsUnchanged = true;
Map<String, InstancePartitions> tierToInstancePartitionsMap = new HashMap<>();
for (Tier tier : sortedTiers) {
LOGGER.info("Fetching/computing instance partitions for tier: {} of table: {}", tier.getName(),
tableConfig.getTableName());
- tierToInstancePartitionsMap.put(tier.getName(),
- getInstancePartitionsForTier(tableConfig, tier, reassignInstances, bootstrap, dryRun));
+ Pair<InstancePartitions, Boolean> partitionsAndUnchanged = getInstancePartitionsForTier(
+ tableConfig, tier, reassignInstances, bootstrap, dryRun);
+ tierToInstancePartitionsMap.put(tier.getName(), partitionsAndUnchanged.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged && partitionsAndUnchanged.getRight();
}
- return tierToInstancePartitionsMap;
+ return Pair.of(tierToInstancePartitionsMap, instancePartitionsUnchanged);
}
/**
* Computes the instance partitions for the given tier. If table's instanceAssignmentConfigMap has an entry for the
- * tier, it's used to calculate the instance partitions. Else default instance partitions are returned
+ * tier, it's used to calculate the instance partitions. Else default instance partitions are returned. Also returns
+ * a boolean for whether the instance partition is unchanged.
*/
- private InstancePartitions getInstancePartitionsForTier(TableConfig tableConfig, Tier tier, boolean reassignInstances,
- boolean bootstrap, boolean dryRun) {
+ private Pair<InstancePartitions, Boolean> getInstancePartitionsForTier(TableConfig tableConfig, Tier tier,
+ boolean reassignInstances, boolean bootstrap, boolean dryRun) {
PinotServerTierStorage storage = (PinotServerTierStorage) tier.getStorage();
InstancePartitions defaultInstancePartitions =
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager, tableConfig.getTableName(),
tier.getName(), storage.getServerTag());
+ String instancePartitionsName =
+ InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
+ InstancePartitions existingInstancePartitions = InstancePartitionsUtils.
+ fetchInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName);
if (tableConfig.getInstanceAssignmentConfigMap() == null || !tableConfig.getInstanceAssignmentConfigMap()
.containsKey(tier.getName())) {
LOGGER.info("Skipping fetching/computing instance partitions for tier {} for table: {}", tier.getName(),
tableConfig.getTableName());
- if (!dryRun) {
- String instancePartitionsName =
- InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
+ Boolean noExistingInstancePartitions = existingInstancePartitions == null;
+
+ if (!dryRun && !noExistingInstancePartitions) {
LOGGER.info("Removing instance partitions: {} from ZK if it exists", instancePartitionsName);
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitionsName);
}
- return defaultInstancePartitions;
+ return Pair.of(defaultInstancePartitions, noExistingInstancePartitions);
}
String tableNameWithType = tableConfig.getTableName();
- String instancePartitionsName =
- InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(), tier.getName());
if (reassignInstances) {
- // Set existing instance partition to null if bootstrap mode is enabled, so that the instance partition
- // map can be fully recalculated.
- InstancePartitions existingInstancePartitions = bootstrap ? null
- : InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
- instancePartitionsName);
+ // Assign instances with existing instance partition to null if bootstrap mode is enabled,
+ // so that the instance partition map can be fully recalculated.
InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(tier.getName(),
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true),
- existingInstancePartitions, tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
- if (!dryRun) {
+ bootstrap ? null : existingInstancePartitions,
+ tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
+ boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, instancePartitionsUnchanged);
}
- InstancePartitions instancePartitions =
- InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
- InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType, tier.getName()));
- if (instancePartitions != null) {
- return instancePartitions;
+ if (existingInstancePartitions != null) {
+ return Pair.of(existingInstancePartitions, true);
}
- return defaultInstancePartitions;
+ return Pair.of(defaultInstancePartitions, true);
}
private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 91f27160c0..f2eed4ccab 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -246,19 +246,19 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest {
tableConfig.setInstanceAssignmentConfigMap(null);
_helixResourceManager.updateTableConfig(tableConfig);
- // Without instances reassignment, the rebalance should return status NO_OP as instance partitions are already
- // generated
+ // Without instances reassignment, the rebalance should return status DONE,
+ // and the instance partitions should be removed
rebalanceResult = tableRebalancer.rebalance(tableConfig, new BaseConfiguration());
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+ InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
- // With instances reassignment, the instance partitions should be removed, and the default instance partitions
- // should be used for segment assignment
+ // With instances reassignment, the default instance partitions
+ // should be used for segment assignment and should return NO_OP
rebalanceConfig = new BaseConfiguration();
rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
- assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
- InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// All servers should be assigned to the table
instanceAssignment = rebalanceResult.getInstanceAssignment();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org