You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/31 19:11:48 UTC

[incubator-pinot] branch master updated: Shuffle the segments when rebalancing the table to avoid creating hotspot servers (#5197)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e0f1d  Shuffle the segments when rebalancing the table to avoid creating hotspot servers (#5197)
95e0f1d is described below

commit 95e0f1d14ebaf6d3f6473812ac353cb984b469d4
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Mar 31 12:11:38 2020 -0700

    Shuffle the segments when rebalancing the table to avoid creating hotspot servers (#5197)
    
    When new servers are added to an existing replica-group based table
    and rebalance is triggered, current behavior will assign segments
    in alphabetical order, which might move only the new segments to
    the new added servers. Because queries tend to query the most recent
    segments, this behavior might cause new added servers to become the
    hotspot servers.
    To address this issue, we shuffle the segments so that old and new
    segments can be balanced assigned.
    We use the hash of the table name as the random seed to shuffle the
    segments so that the result is deterministic.
    
    It is a little bit tricky to write a test case for this. Since the
    change is straight-forward and the existing tests already have
    pretty good coverage, after manually verified the expected behavior,
    no new test is added.
---
 .../segment/OfflineSegmentAssignment.java          | 25 +++++++++++++++++-----
 .../segment/RealtimeSegmentAssignment.java         | 17 +++++++++++----
 .../assignment/segment/SegmentAssignmentUtils.java | 19 ++++++----------
 .../segment/SegmentAssignmentUtilsTest.java        |  4 +---
 4 files changed, 41 insertions(+), 24 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index fb0187b..77bf046 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -20,11 +20,12 @@ package org.apache.pinot.controller.helix.core.assignment.segment;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.commons.configuration.Configuration;
@@ -214,10 +215,16 @@ public class OfflineSegmentAssignment implements SegmentAssignment {
         Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
             "Instance partitions: %s should contain 1 partition without partition column",
             instancePartitions.getInstancePartitionsName());
+
+        // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
+        //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
+        //       name hash as the random seed for the shuffle so that the result is deterministic.
+        List<String> segments = new ArrayList<>(currentAssignment.keySet());
+        Collections.shuffle(segments, new Random(_offlineTableName.hashCode()));
+
         newAssignment = new TreeMap<>();
         SegmentAssignmentUtils
-            .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, currentAssignment.keySet(),
-                newAssignment);
+            .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, segments, newAssignment);
       } else {
         LOGGER.info("Rebalancing table: {} with partition column: {}", _offlineTableName, _partitionColumn);
         newAssignment = rebalanceTableWithPartition(currentAssignment, instancePartitions);
@@ -238,10 +245,18 @@ public class OfflineSegmentAssignment implements SegmentAssignment {
     for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
       segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
     }
-    Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
+    Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
     for (String segmentName : currentAssignment.keySet()) {
       int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
-      partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName);
+      partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
+    }
+
+    // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
+    //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
+    //       name hash as the random seed for the shuffle so that the result is deterministic.
+    Random random = new Random(_offlineTableName.hashCode());
+    for (List<String> segments : partitionIdToSegmentsMap.values()) {
+      Collections.shuffle(segments, random);
     }
 
     return SegmentAssignmentUtils
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 9447b9b..8423769 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -20,11 +20,11 @@ package org.apache.pinot.controller.helix.core.assignment.segment;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Random;
 import java.util.TreeMap;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.HelixManager;
@@ -202,11 +202,20 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
               _realtimeTableName, numReplicaGroups);
         }
 
-        Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
+        Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
         for (String segmentName : completedSegmentAssignment.keySet()) {
           int partitionId = new LLCSegmentName(segmentName).getPartitionId();
-          partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName);
+          partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
         }
+
+        // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
+        //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
+        //       name hash as the random seed for the shuffle so that the result is deterministic.
+        Random random = new Random(_realtimeTableName.hashCode());
+        for (List<String> segments : partitionIdToSegmentsMap.values()) {
+          Collections.shuffle(segments, random);
+        }
+
         newAssignment = SegmentAssignmentUtils
             .rebalanceReplicaGroupBasedTable(completedSegmentAssignment, completedInstancePartitions,
                 partitionIdToSegmentsMap);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index e950f15..2b95557 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -114,10 +114,10 @@ public class SegmentAssignmentUtils {
    */
   static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
       Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions,
-      Map<Integer, Set<String>> partitionIdToSegmentsMap) {
+      Map<Integer, List<String>> partitionIdToSegmentsMap) {
     Map<String, Map<String, String>> newAssignment = new TreeMap<>();
     int numPartitions = instancePartitions.getNumPartitions();
-    for (Map.Entry<Integer, Set<String>> entry : partitionIdToSegmentsMap.entrySet()) {
+    for (Map.Entry<Integer, List<String>> entry : partitionIdToSegmentsMap.entrySet()) {
       // Uniformly spray the segment partitions over the instance partitions
       int partitionId = entry.getKey() % numPartitions;
       SegmentAssignmentUtils
@@ -147,7 +147,7 @@ public class SegmentAssignmentUtils {
    * </ul>
    */
   static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, String>> currentAssignment,
-      InstancePartitions instancePartitions, int partitionId, Set<String> segments,
+      InstancePartitions instancePartitions, int partitionId, List<String> segments,
       Map<String, Map<String, String>> newAssignment) {
     // Fetch instances in replica-group 0
     List<String> instances = instancePartitions.getInstances(partitionId, 0);
@@ -162,14 +162,9 @@ public class SegmentAssignmentUtils {
     // Do not move segment if target number of segments is not reached, track the segments need to be moved
     int[] numSegmentsAssignedPerInstance = new int[numInstances];
     List<String> segmentsNotAssigned = new ArrayList<>();
-    for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      // Skip segments not in the partition
-      if (!segments.contains(segmentName)) {
-        continue;
-      }
+    for (String segmentName : segments) {
       boolean segmentAssigned = false;
-      for (String instanceName : entry.getValue().keySet()) {
+      for (String instanceName : currentAssignment.get(segmentName).keySet()) {
         Integer instanceId = instanceNameToIdMap.get(instanceName);
         if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < targetNumSegmentsPerInstance) {
           newAssignment
@@ -261,9 +256,9 @@ public class SegmentAssignmentUtils {
       for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
         String segmentName = entry.getKey();
         Map<String, String> instanceStateMap = entry.getValue();
-        if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
+        if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
           _completedSegmentAssignment.put(segmentName, instanceStateMap);
-        } else if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+        } else if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
           _consumingSegmentAssignment.put(segmentName, instanceStateMap);
         } else {
           _offlineSegmentAssignment.put(segmentName, instanceStateMap);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index c1e27bd..ef8364e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -21,10 +21,8 @@ package org.apache.pinot.controller.helix.core.assignment.segment;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
@@ -207,7 +205,7 @@ public class SegmentAssignmentUtilsTest {
 
     int numSegments = 90;
     List<String> segments = SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, numSegments);
-    Map<Integer, Set<String>> partitionIdToSegmentsMap = Collections.singletonMap(0, new HashSet<>(segments));
+    Map<Integer, List<String>> partitionIdToSegmentsMap = Collections.singletonMap(0, segments);
     int numInstances = 9;
     List<String> instances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, numInstances);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org