You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/04/28 01:06:36 UTC

[pinot] branch master updated: Fix bug in segment rebalance with replica group segment assignment (#8598)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d54386ba4 Fix bug in segment rebalance with replica group segment assignment (#8598)
5d54386ba4 is described below

commit 5d54386ba46a566cd7085f5dcda33c940fd5fc84
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Apr 27 18:06:30 2022 -0700

    Fix bug in segment rebalance with replica group segment assignment (#8598)
    
    Segment rebalance with replica group segment assignment is currently done in a partition-by-partition way which leads to an imbalanced assignment. This PR fixes the issue by grouping all partitions belonging to each instancePartition and then reassigning each group of partitions separately.
---
 .../segment/OfflineSegmentAssignment.java          | 10 ++++---
 .../segment/RealtimeSegmentAssignment.java         | 11 +++++---
 .../assignment/segment/SegmentAssignmentUtils.java | 15 +++++------
 .../OfflineReplicaGroupSegmentAssignmentTest.java  | 31 ++++++++++++++++++----
 .../RealtimeReplicaGroupSegmentAssignmentTest.java |  2 +-
 5 files changed, 47 insertions(+), 22 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 0b3c9fdc6b..2ebf5e4b78 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -281,22 +281,24 @@ public class OfflineSegmentAssignment implements SegmentAssignment {
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
       segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
     }
-    Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
     for (String segmentName : currentAssignment.keySet()) {
       int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
-      partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
+      int instancePartitionId = partitionId % numPartitions;
+      instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName);
     }
 
     // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
     //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
     //       name hash as the random seed for the shuffle so that the result is deterministic.
     Random random = new Random(_offlineTableName.hashCode());
-    for (List<String> segments : partitionIdToSegmentsMap.values()) {
+    for (List<String> segments : instancePartitionIdToSegmentsMap.values()) {
       Collections.shuffle(segments, random);
     }
 
     return SegmentAssignmentUtils
-        .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap);
+        .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap);
   }
 
   private int getPartitionId(SegmentZKMetadata segmentZKMetadata) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index ac31ebca21..c97b53e75c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -328,23 +328,26 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
       } else {
         // Replica-group based assignment
 
-        Map<Integer, List<String>> partitionGroupIdToSegmentsMap = new HashMap<>();
+        int numPartitions = instancePartitions.getNumPartitions();
+        Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
         for (String segmentName : currentAssignment.keySet()) {
           int partitionGroupId = SegmentUtils
               .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
-          partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k -> new ArrayList<>()).add(segmentName);
+          int instancePartitionId = partitionGroupId % numPartitions;
+          instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>())
+              .add(segmentName);
         }
 
         // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
         //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
         //       name hash as the random seed for the shuffle so that the result is deterministic.
         Random random = new Random(_realtimeTableName.hashCode());
-        for (List<String> segments : partitionGroupIdToSegmentsMap.values()) {
+        for (List<String> segments : instancePartitionIdToSegmentsMap.values()) {
           Collections.shuffle(segments, random);
         }
 
         newAssignment = SegmentAssignmentUtils
-            .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionGroupIdToSegmentsMap);
+            .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap);
       }
     }
     return newAssignment;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index e27983e8b3..8db3416b93 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -157,20 +157,19 @@ public class SegmentAssignmentUtils {
   }
 
   /**
-   * Rebalances the table for the replica-group based segment assignment strategy.
-   * <p>The number of partitions for the segments can be different from the number of partitions in the instance
-   * partitions. Uniformly spray the segment partitions over the instance partitions.
+   * Rebalances the table for the replica-group based segment assignment strategy by uniformly spraying group of
+   * segments belonging to each instancePartitionId to the instances of that instance partition.
    */
   public static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
       Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions,
-      Map<Integer, List<String>> partitionIdToSegmentsMap) {
+      Map<Integer, List<String>> instancePartitionIdToSegmentsMap) {
     Map<String, Map<String, String>> newAssignment = new TreeMap<>();
-    int numPartitions = instancePartitions.getNumPartitions();
-    for (Map.Entry<Integer, List<String>> entry : partitionIdToSegmentsMap.entrySet()) {
+    for (Map.Entry<Integer, List<String>> entry : instancePartitionIdToSegmentsMap.entrySet()) {
       // Uniformly spray the segment partitions over the instance partitions
-      int partitionId = entry.getKey() % numPartitions;
+      int instancePartitionId = entry.getKey();
+      List<String> segments = entry.getValue();
       SegmentAssignmentUtils
-          .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, partitionId, entry.getValue(),
+          .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, instancePartitionId, segments,
               newAssignment);
     }
     return newAssignment;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index aa4c8c7f66..62bcf70b11 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.assignment.segment;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,7 +59,7 @@ import static org.testng.Assert.assertTrue;
 public class OfflineReplicaGroupSegmentAssignmentTest {
   private static final int NUM_REPLICAS = 3;
   private static final String SEGMENT_NAME_PREFIX = "segment_";
-  private static final int NUM_SEGMENTS = 90;
+  private static final int NUM_SEGMENTS = 12;
   private static final List<String> SEGMENTS =
       SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS);
   private static final String INSTANCE_NAME_PREFIX = "instance_";
@@ -248,13 +249,11 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
           .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
     }
 
-    // There should be 90 segments assigned
     assertEquals(currentAssignment.size(), NUM_SEGMENTS);
     // Each segment should have 3 replicas
     for (Map<String, String> instanceStateMap : currentAssignment.values()) {
       assertEquals(instanceStateMap.size(), NUM_REPLICAS);
     }
-    // Each instance should have 15 segments assigned
     int[] numSegmentsAssignedPerInstance =
         SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
     int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
@@ -278,13 +277,11 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
           .put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
     }
 
-    // There should be 90 segments assigned
     assertEquals(currentAssignment.size(), NUM_SEGMENTS);
     // Each segment should have 3 replicas
     for (Map<String, String> instanceStateMap : currentAssignment.values()) {
       assertEquals(instanceStateMap.size(), NUM_REPLICAS);
     }
-    // Each instance should have 15 segments assigned
     int[] numSegmentsAssignedPerInstance =
         SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
     int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
@@ -356,4 +353,28 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
       }
     }
   }
+
+  @Test
+  public void testRebalanceTableWithPartitionColumnAndInstancePartitionsMapWithOnePartition() {
+    // make an unbalanced assignment by assigning all segments to the first three instances
+    String instance0 = INSTANCE_NAME_PREFIX + "0";
+    String instance1 = INSTANCE_NAME_PREFIX + "1";
+    String instance2 = INSTANCE_NAME_PREFIX + "2";
+    Map<String, Map<String, String>> unbalancedAssignment = new TreeMap<>();
+    SEGMENTS.forEach(segName ->
+        unbalancedAssignment.put(segName, ImmutableMap.of(
+            instance0, SegmentStateModel.ONLINE,
+            instance1, SegmentStateModel.ONLINE,
+            instance2, SegmentStateModel.ONLINE
+        ))
+    );
+    Map<String, Map<String, String>> balancedAssignment = _segmentAssignmentWithPartition
+        .rebalanceTable(unbalancedAssignment, _instancePartitionsMapWithoutPartition, null, null,
+            new BaseConfiguration());
+    int[] actualNumSegmentsAssignedPerInstance =
+        SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment, INSTANCES);
+    int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
+    Arrays.fill(expectedNumSegmentsAssignedPerInstance, NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES);
+    assertEquals(actualNumSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance);
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index a82d10db04..09cd07bc9e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -43,7 +43,7 @@ import static org.testng.Assert.assertTrue;
 public class RealtimeReplicaGroupSegmentAssignmentTest {
   private static final int NUM_REPLICAS = 3;
   private static final int NUM_PARTITIONS = 4;
-  private static final int NUM_SEGMENTS = 100;
+  private static final int NUM_SEGMENTS = 24;
   private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_";
   private static final int NUM_CONSUMING_INSTANCES = 9;
   private static final List<String> CONSUMING_INSTANCES =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org