You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/11/10 20:05:51 UTC

[pinot] branch fix-tier-rebalance created (now 0f68829694)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a change to branch fix-tier-rebalance
in repository https://gitbox.apache.org/repos/asf/pinot.git


      at 0f68829694 Fixing the rebalance issue when the tier is configured for real-time table

This branch includes the following new commits:

     new 0f68829694 Fixing the rebalance issue when the tier is configured for real-time table

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Fixing the rebalance issue when the tier is configured for real-time table

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch fix-tier-rebalance
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 0f6882969489e948089684e8557577c8dc72c78f
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Nov 10 12:03:53 2022 -0800

    Fixing the rebalance issue when the tier is configured for real-time table
    
    The current rebalancer has the bug with the NPE when the
    reblance is invoked for real-time table with Tier configure
    but not with COMPLETED tenant is configured. This PR addresses
    the issue. Also added the test to cover this.
---
 .../assignment/segment/BaseSegmentAssignment.java  |  8 ++++++-
 .../segment/OfflineSegmentAssignment.java          |  2 +-
 .../segment/RealtimeSegmentAssignment.java         | 19 +++++-----------
 ...NonReplicaGroupTieredSegmentAssignmentTest.java | 26 ++++++++++++++++++++++
 4 files changed, 40 insertions(+), 15 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
index da8d50329a..c91efb904a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/BaseSegmentAssignment.java
@@ -29,6 +29,7 @@ import org.apache.helix.HelixManager;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
+import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -99,7 +100,7 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
   protected Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> rebalanceTiers(
       Map<String, Map<String, String>> currentAssignment, @Nullable List<Tier> sortedTiers,
       @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, boolean bootstrap,
-      SegmentAssignmentStrategy segmentAssignmentStrategy, InstancePartitionsType instancePartitionsType) {
+      InstancePartitionsType instancePartitionsType) {
     if (sortedTiers == null) {
       return Pair.of(null, currentAssignment);
     }
@@ -124,6 +125,11 @@ public abstract class BaseSegmentAssignment implements SegmentAssignment {
       Preconditions.checkNotNull(tierInstancePartitions, "Failed to find instance partitions for tier: %s of table: %s",
           tierName, _tableNameWithType);
 
+      // Initialize segment assignment strategy based on the tier instance partitions
+      SegmentAssignmentStrategy segmentAssignmentStrategy =
+          SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig, tierName,
+              tierInstancePartitions);
+
       _logger.info("Rebalancing tier: {} for table: {} with bootstrap: {}, instance partitions: {}", tierName,
           _tableNameWithType, bootstrap, tierInstancePartitions);
       newTierAssignments.add(reassignSegments(tierName, tierCurrentAssignment, tierInstancePartitions, bootstrap,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 48cb028dd4..ec04b728fb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -89,7 +89,7 @@ public class OfflineSegmentAssignment extends BaseSegmentAssignment {
 
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
-        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, segmentAssignmentStrategy,
+        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
             InstancePartitionsType.OFFLINE);
     List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
     Map<String, Map<String, String>> nonTierAssignment = pair.getRight();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 44bec2739b..4f44aa1e44 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -186,19 +186,9 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
     boolean bootstrap =
         config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
 
-    // TODO: remove this check after we also refactor consuming segments assignment strategy
-    // See https://github.com/apache/pinot/issues/9047
-    SegmentAssignmentStrategy segmentAssignmentStrategy = null;
-    if (completedInstancePartitions != null) {
-      // Gets Segment assignment strategy for instance partitions
-      segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
-          .getSegmentAssignmentStrategy(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED.toString(),
-              completedInstancePartitions);
-    }
-
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, String>>> pair =
-        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap, segmentAssignmentStrategy,
+        rebalanceTiers(currentAssignment, sortedTiers, tierInstancePartitionsMap, bootstrap,
             InstancePartitionsType.COMPLETED);
 
     List<Map<String, Map<String, String>>> newTierAssignments = pair.getLeft();
@@ -218,8 +208,11 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment {
     if (completedInstancePartitions != null) {
       // When COMPLETED instance partitions are provided, reassign COMPLETED segments in a balanced way (relocate
       // COMPLETED segments to offload them from CONSUMING instances to COMPLETED instances)
-      _logger
-          .info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}", _tableNameWithType);
+      SegmentAssignmentStrategy segmentAssignmentStrategy =
+          SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(_helixManager, _tableConfig,
+              InstancePartitionsType.COMPLETED.toString(), completedInstancePartitions);
+      _logger.info("Reassigning COMPLETED segments with COMPLETED instance partitions for table: {}",
+          _tableNameWithType);
       newAssignment = reassignSegments(InstancePartitionsType.COMPLETED.toString(), completedSegmentAssignment,
           completedInstancePartitions, bootstrap, segmentAssignmentStrategy, InstancePartitionsType.COMPLETED);
     } else {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index f1629ae2ce..c3f1ae30fe 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -269,6 +269,32 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, noRelocationInstancePartitionsMap, null, null,
         new BaseConfiguration()), currentAssignment);
 
+    // Rebalance without COMPLETED instance partitions and with tierInstancePartitions should move ONLINE segments to
+    // Tiers and CONSUMING segments to CONSUMING tenant.
+    newAssignment =
+        _segmentAssignment.rebalanceTable(currentAssignment, noRelocationInstancePartitionsMap, _sortedTiers,
+            _tierInstancePartitionsMap, new BaseConfiguration());
+
+    numSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_A);
+    assertEquals(numSegmentsAssignedPerInstance.length, NUM_INSTANCES_TIER_A);
+    expectedMinNumSegmentsPerInstance = expectedOnTierA / NUM_INSTANCES_TIER_A;
+    for (int i = 0; i < NUM_INSTANCES_TIER_A; i++) {
+      assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance);
+    }
+
+    numSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, INSTANCES_TIER_B);
+    assertEquals(numSegmentsAssignedPerInstance.length, NUM_INSTANCES_TIER_B);
+    expectedMinNumSegmentsPerInstance = expectedOnTierB / NUM_INSTANCES_TIER_B;
+    for (int i = 0; i < NUM_INSTANCES_TIER_B; i++) {
+      assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance);
+    }
+
+    numSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, CONSUMING_INSTANCES);
+    assertEquals(numSegmentsAssignedPerInstance.length, NUM_CONSUMING_INSTANCES);
+
     // Bootstrap
     rebalanceConfig = new BaseConfiguration();
     rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org