You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/11/10 20:07:31 UTC

[pinot] 01/01: Fixing the rebalance issue for real-time table with tier

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch fix-tier-rebalance
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 2d0effd70f55831e2f27db308c525e9af5d27d6f
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Nov 10 12:03:53 2022 -0800

    Fixing the rebalance issue for real-time table with tier
    
    The current rebalancer has the bug with the NPE when the
    reblance is invoked for real-time table with Tier configure
    but not with COMPLETED tenant is configured. This PR addresses
    the issue. Also added the test to cover this.
---
 .../assignment/segment/BaseSegmentAssignment.java  |  8 ++++++-
 .../segment/OfflineSegmentAssignment.java          |  2 +-
 .../segment/RealtimeSegmentAssignment.java         | 19 +++++-----------
 ...NonReplicaGroupTieredSegmentAssignmentTest.java | 26 ++++++++++++++++++++++
 4 files changed, 40 insertions(+), 15 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
index da8d50329a..c91efb904a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
@@ -29,6 +29,7 @@ import org.apache.helix.HelixManager;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
+import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -99,7 +100,7 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
   protected Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> rebalanceTiers(
       Map<String, Map<String, String>> currentAssignment, @Nullable List<Tier> sortedTiers,
       @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, boolean bootstrap,
-      SegmentAssignmentStrategy segmentAssignmentStrategy, InstancePartitionsType instancePartitionsType) {
+      InstancePartitionsType instancePartitionsType) {
     if (sortedTiers == null) {
       return Pair.of(null, currentAssignment);
     }
@@ -124,6 +125,11 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
       Preconditions.checkNotNull(tierInstancePartitions, "Failed to find instance partitions for tier: %s of table: %s",
           tierName, _tableNameWithType);
 
+      // Initialize segment assignment strategy based on the tier instance partitions
+      SegmentAssignmentStrategy segmentAssignmentStrategy =
+          SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig, tierName,
+              tierInstancePartitions);
+
       _logger.info("Rebalancing tier: {} for table: {} with bootstrap: {}, instance partitions: {}", tierName,
           _tableNameWithType, bootstrap, tierInstancePartitions);
       newTierAssignments.add(reassignSegments(tierName, tierCurrentAssignment, tierInstancePartitions, bootstrap,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 48cb028dd4..ec04b728fb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -89,7 +89,7 @@ public class OfflineSegmentAssignment extends BaseSegmentAssignment {
 
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
-        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, segmentAssignmentStrategy,
+        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
             InstancePartitionsType.OFFLINE);
     List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
     Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 44bec2739b..4f44aa1e44 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -186,19 +186,9 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
 
-    // TODO: remove this check after we also refactor consuming segments assignment strategy
-    // See https://github.com/apache/pinot/issues/9047
-    SegmentAssignmentStrategy segmentAssignmentStrategy = null;
-    if (completedInstancePartitions != null) {
-      // Gets Segment assignment strategy for instance partitions
-      segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
-          .getSegmentAssignmentStrategy(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED.toString(),
-              completedInstancePartitions);
-    }
-
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
-        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, segmentAssignmentStrategy,
+        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
             InstancePartitionsType.COMPLETED);
 
     List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
@@ -218,8 +208,11 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
     if (completedInstancePartitions != null) {
       // When COMPLETED instance partitions are provided, reassign COMPLETED segments in a balanced way (relocate
       // COMPLETED segments to offload them from CONSUMING instances to COMPLETED instances)
-      _logger
-          .info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}", _tableNameWithType);
+      SegmentAssignmentStrategy segmentAssignmentStrategy =
+          SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig,
+              InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions);
+      _logger.info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}",
+          _tableNameWithType);
       newAssignment = reassignSegments(InstancePartitionsType.COMPLETED.toString(), completedSegmentAssignment,
           completedInstancePartitions, bootstrap, segmentAssignmentStrategy, InstancePartitionsType.COMPLETED);
     } else {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index f1629ae2ce..c3f1ae30fe 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -269,6 +269,32 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null,
         new BaseConfiguration()), currentAssignment);
 
+    // Rebalance without COMPLETED instance partitions and with tierInstancePartitions should move ONLINE segments to
+    // Tiers and CONSUMING segments to CONSUMING tenant.
+    newAssignment =
+        _segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, _sortedTiers,
+            _tierInstancePartitionsMap, new BaseConfiguration());
+
+    numSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_A);
+    assertEquals(numSegmentsAssignedPerInstance.length, NUM_INSTANCES_TIER_A);
+    expectedMinNumSegmentsPerInstance = expectedOnTierA / NUM_INSTANCES_TIER_A;
+    for (int i = 0; i < NUM_INSTANCES_TIER_A; i++) {
+      assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance);
+    }
+
+    numSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_B);
+    assertEquals(numSegmentsAssignedPerInstance.length, NUM_INSTANCES_TIER_B);
+    expectedMinNumSegmentsPerInstance = expectedOnTierB / NUM_INSTANCES_TIER_B;
+    for (int i = 0; i < NUM_INSTANCES_TIER_B; i++) {
+      assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance);
+    }
+
+    numSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, CONSUMING_INSTANCES);
+    assertEquals(numSegmentsAssignedPerInstance.length, NUM_CONSUMING_INSTANCES);
+
     // Bootstrap
     rebalanceConfig = new BaseConfiguration();
     rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org