You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/04/28 01:06:36 UTC
[pinot] branch master updated: Fix bug in segment rebalance with replica group segment assignment (#8598)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5d54386ba4 Fix bug in segment rebalance with replica group segment assignment (#8598)
5d54386ba4 is described below
commit 5d54386ba46a566cd7085f5dcda33c940fd5fc84
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Apr 27 18:06:30 2022 -0700
Fix bug in segment rebalance with replica group segment assignment (#8598)
Segment rebalance with replica group segment assignment is currently done in a partition-by-partition way which leads to an imbalanced assignment. This PR fixes the issue by grouping all partitions belonging to each instancePartition and then reassigning each group of partitions separately.
---
.../segment/OfflineSegmentAssignment.java | 10 ++++---
.../segment/RealtimeSegmentAssignment.java | 11 +++++---
.../assignment/segment/SegmentAssignmentUtils.java | 15 +++++------
.../OfflineReplicaGroupSegmentAssignmentTest.java | 31 ++++++++++++++++++----
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 2 +-
5 files changed, 47 insertions(+), 22 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 0b3c9fdc6b..2ebf5e4b78 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -281,22 +281,24 @@ public class OfflineSegmentAssignment implements SegmentAssignment {
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
}
- Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
+ int numPartitions = instancePartitions.getNumPartitions();
+ Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
for (String segmentName : currentAssignment.keySet()) {
int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
- partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
+ int instancePartitionId = partitionId % numPartitions;
+ instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>()).add(segmentName);
}
// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
// name hash as the random seed for the shuffle so that the result is deterministic.
Random random = new Random(_offlineTableName.hashCode());
- for (List<String> segments : partitionIdToSegmentsMap.values()) {
+ for (List<String> segments : instancePartitionIdToSegmentsMap.values()) {
Collections.shuffle(segments, random);
}
return SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap);
+ .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap);
}
private int getPartitionId(SegmentZKMetadata segmentZKMetadata) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index ac31ebca21..c97b53e75c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -328,23 +328,26 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
} else {
// Replica-group based assignment
- Map<Integer, List<String>> partitionGroupIdToSegmentsMap = new HashMap<>();
+ int numPartitions = instancePartitions.getNumPartitions();
+ Map<Integer, List<String>> instancePartitionIdToSegmentsMap = new HashMap<>();
for (String segmentName : currentAssignment.keySet()) {
int partitionGroupId = SegmentUtils
.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
- partitionGroupIdToSegmentsMap.computeIfAbsent(partitionGroupId, k -> new ArrayList<>()).add(segmentName);
+ int instancePartitionId = partitionGroupId % numPartitions;
+ instancePartitionIdToSegmentsMap.computeIfAbsent(instancePartitionId, k -> new ArrayList<>())
+ .add(segmentName);
}
// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
// name hash as the random seed for the shuffle so that the result is deterministic.
Random random = new Random(_realtimeTableName.hashCode());
- for (List<String> segments : partitionGroupIdToSegmentsMap.values()) {
+ for (List<String> segments : instancePartitionIdToSegmentsMap.values()) {
Collections.shuffle(segments, random);
}
newAssignment = SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionGroupIdToSegmentsMap);
+ .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap);
}
}
return newAssignment;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index e27983e8b3..8db3416b93 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -157,20 +157,19 @@ public class SegmentAssignmentUtils {
}
/**
- * Rebalances the table for the replica-group based segment assignment strategy.
- * <p>The number of partitions for the segments can be different from the number of partitions in the instance
- * partitions. Uniformly spray the segment partitions over the instance partitions.
+ * Rebalances the table for the replica-group based segment assignment strategy by uniformly spraying group of
+ * segments belonging to each instancePartitionId to the instances of that instance partition.
*/
public static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions,
- Map<Integer, List<String>> partitionIdToSegmentsMap) {
+ Map<Integer, List<String>> instancePartitionIdToSegmentsMap) {
Map<String, Map<String, String>> newAssignment = new TreeMap<>();
- int numPartitions = instancePartitions.getNumPartitions();
- for (Map.Entry<Integer, List<String>> entry : partitionIdToSegmentsMap.entrySet()) {
+ for (Map.Entry<Integer, List<String>> entry : instancePartitionIdToSegmentsMap.entrySet()) {
// Uniformly spray the segment partitions over the instance partitions
- int partitionId = entry.getKey() % numPartitions;
+ int instancePartitionId = entry.getKey();
+ List<String> segments = entry.getValue();
SegmentAssignmentUtils
- .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, partitionId, entry.getValue(),
+ .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, instancePartitionId, segments,
newAssignment);
}
return newAssignment;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index aa4c8c7f66..62bcf70b11 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.assignment.segment;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -58,7 +59,7 @@ import static org.testng.Assert.assertTrue;
public class OfflineReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final String SEGMENT_NAME_PREFIX = "segment_";
- private static final int NUM_SEGMENTS = 90;
+ private static final int NUM_SEGMENTS = 12;
private static final List<String> SEGMENTS =
SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS);
private static final String INSTANCE_NAME_PREFIX = "instance_";
@@ -248,13 +249,11 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
- // There should be 90 segments assigned
assertEquals(currentAssignment.size(), NUM_SEGMENTS);
// Each segment should have 3 replicas
for (Map<String, String> instanceStateMap : currentAssignment.values()) {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
- // Each instance should have 15 segments assigned
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
@@ -278,13 +277,11 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE));
}
- // There should be 90 segments assigned
assertEquals(currentAssignment.size(), NUM_SEGMENTS);
// Each segment should have 3 replicas
for (Map<String, String> instanceStateMap : currentAssignment.values()) {
assertEquals(instanceStateMap.size(), NUM_REPLICAS);
}
- // Each instance should have 15 segments assigned
int[] numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
@@ -356,4 +353,28 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
}
}
}
+
+ @Test
+ public void testRebalanceTableWithPartitionColumnAndInstancePartitionsMapWithOnePartition() {
+ // make an unbalanced assignment by assigning all segments to the first three instances
+ String instance0 = INSTANCE_NAME_PREFIX + "0";
+ String instance1 = INSTANCE_NAME_PREFIX + "1";
+ String instance2 = INSTANCE_NAME_PREFIX + "2";
+ Map<String, Map<String, String>> unbalancedAssignment = new TreeMap<>();
+ SEGMENTS.forEach(segName ->
+ unbalancedAssignment.put(segName, ImmutableMap.of(
+ instance0, SegmentStateModel.ONLINE,
+ instance1, SegmentStateModel.ONLINE,
+ instance2, SegmentStateModel.ONLINE
+ ))
+ );
+ Map<String, Map<String, String>> balancedAssignment = _segmentAssignmentWithPartition
+ .rebalanceTable(unbalancedAssignment, _instancePartitionsMapWithoutPartition, null, null,
+ new BaseConfiguration());
+ int[] actualNumSegmentsAssignedPerInstance =
+ SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment, INSTANCES);
+ int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
+ Arrays.fill(expectedNumSegmentsAssignedPerInstance, NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES);
+ assertEquals(actualNumSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance);
+ }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index a82d10db04..09cd07bc9e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -43,7 +43,7 @@ import static org.testng.Assert.assertTrue;
public class RealtimeReplicaGroupSegmentAssignmentTest {
private static final int NUM_REPLICAS = 3;
private static final int NUM_PARTITIONS = 4;
- private static final int NUM_SEGMENTS = 100;
+ private static final int NUM_SEGMENTS = 24;
private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_";
private static final int NUM_CONSUMING_INSTANCES = 9;
private static final List<String> CONSUMING_INSTANCES =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org