You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/11/10 20:07:31 UTC
[pinot] 01/01: Fixing the rebalance issue for real-time table with tier
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch fix-tier-rebalance
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 2d0effd70f55831e2f27db308c525e9af5d27d6f
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Nov 10 12:03:53 2022 -0800
Fixing the rebalance issue for real-time table with tier
The current rebalancer has the bug with the NPE when the
reblance is invoked for real-time table with Tier configure
but not with COMPLETED tenant is configured. This PR addresses
the issue. Also added the test to cover this.
---
.../assignment/segment/BaseSegmentAssignment.java | 8 ++++++-
.../segment/OfflineSegmentAssignment.java | 2 +-
.../segment/RealtimeSegmentAssignment.java | 19 +++++-----------
...NonReplicaGroupTieredSegmentAssignmentTest.java | 26 ++++++++++++++++++++++
4 files changed, 40 insertions(+), 15 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
index da8d50329a..c91efb904a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
@@ -29,6 +29,7 @@ import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
+import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -99,7 +100,7 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
protected Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> rebalanceTiers(
Map<String, Map<String, String>> currentAssignment, @Nullable List<Tier> sortedTiers,
@Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, boolean bootstrap,
- SegmentAssignmentStrategy segmentAssignmentStrategy, InstancePartitionsType instancePartitionsType) {
+ InstancePartitionsType instancePartitionsType) {
if (sortedTiers == null) {
return Pair.of(null, currentAssignment);
}
@@ -124,6 +125,11 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
Preconditions.checkNotNull(tierInstancePartitions, "Failed to find instance partitions for tier: %s of table: %s",
tierName, _tableNameWithType);
+ // Initialize segment assignment strategy based on the tier instance partitions
+ SegmentAssignmentStrategy segmentAssignmentStrategy =
+ SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig, tierName,
+ tierInstancePartitions);
+
_logger.info("Rebalancing tier: {} for table: {} with bootstrap: {}, instance partitions: {}", tierName,
_tableNameWithType, bootstrap, tierInstancePartitions);
newTierAssignments.add(reassignSegments(tierName, tierCurrentAssignment, tierInstancePartitions, bootstrap,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 48cb028dd4..ec04b728fb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -89,7 +89,7 @@ public class OfflineSegmentAssignment extends BaseSegmentAssignment {
// Rebalance tiers first
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
- rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, segmentAssignmentStrategy,
+ rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
InstancePartitionsType.OFFLINE);
List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 44bec2739b..4f44aa1e44 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -186,19 +186,9 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
boolean bootstrap =
config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
- // TODO: remove this check after we also refactor consuming segments assignment strategy
- // See https://github.com/apache/pinot/issues/9047
- SegmentAssignmentStrategy segmentAssignmentStrategy = null;
- if (completedInstancePartitions != null) {
- // Gets Segment assignment strategy for instance partitions
- segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED.toString(),
- completedInstancePartitions);
- }
-
// Rebalance tiers first
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
- rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, segmentAssignmentStrategy,
+ rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
InstancePartitionsType.COMPLETED);
List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
@@ -218,8 +208,11 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
if (completedInstancePartitions != null) {
// When COMPLETED instance partitions are provided, reassign COMPLETED segments in a balanced way (relocate
// COMPLETED segments to offload them from CONSUMING instances to COMPLETED instances)
- _logger
- .info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}", _tableNameWithType);
+ SegmentAssignmentStrategy segmentAssignmentStrategy =
+ SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig,
+ InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions);
+ _logger.info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}",
+ _tableNameWithType);
newAssignment = reassignSegments(InstancePartitionsType.COMPLETED.toString(), completedSegmentAssignment,
completedInstancePartitions, bootstrap, segmentAssignmentStrategy, InstancePartitionsType.COMPLETED);
} else {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index f1629ae2ce..c3f1ae30fe 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -269,6 +269,32 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null,
new BaseConfiguration()), currentAssignment);
+ // Rebalance without COMPLETED instance partitions and with tierInstancePartitions should move ONLINE segments to
+ // Tiers and CONSUMING segments to CONSUMING tenant.
+ newAssignment =
+ _segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, _sortedTiers,
+ _tierInstancePartitionsMap, new BaseConfiguration());
+
+ numSegmentsAssignedPerInstance =
+ SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_A);
+ assertEquals(numSegmentsAssignedPerInstance.length, NUM_INSTANCES_TIER_A);
+ expectedMinNumSegmentsPerInstance = expectedOnTierA / NUM_INSTANCES_TIER_A;
+ for (int i = 0; i < NUM_INSTANCES_TIER_A; i++) {
+ assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance);
+ }
+
+ numSegmentsAssignedPerInstance =
+ SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_B);
+ assertEquals(numSegmentsAssignedPerInstance.length, NUM_INSTANCES_TIER_B);
+ expectedMinNumSegmentsPerInstance = expectedOnTierB / NUM_INSTANCES_TIER_B;
+ for (int i = 0; i < NUM_INSTANCES_TIER_B; i++) {
+ assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance);
+ }
+
+ numSegmentsAssignedPerInstance =
+ SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, CONSUMING_INSTANCES);
+ assertEquals(numSegmentsAssignedPerInstance.length, NUM_CONSUMING_INSTANCES);
+
// Bootstrap
rebalanceConfig = new BaseConfiguration();
rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org