You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/30 20:01:35 UTC

[incubator-pinot] branch master updated: Take OFFLINE segment into account for real-time rebalancer (#4337)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c81dfb  Take OFFLINE segment into account for real-time rebalancer (#4337)
1c81dfb is described below

commit 1c81dfbfdfb8f5afd37993025c0caaec765383f0
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jul 30 13:01:29 2019 -0700

    Take OFFLINE segment into account for real-time rebalancer (#4337)
    
    When the consuming segment encounters error, the ideal state can be turned into OFFLINE
    If the instance states for a segment is all OFFLINE, the segment is counted OFFLINE and won't be rebalanced
    RealtimeSegmentValidationManager will periodically detect the OFFLINE segments and re-assign them
---
 ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 14 ++++++++++----
 ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 14 ++++++++++----
 .../assignment/segment/SegmentAssignmentUtils.java | 22 +++++++++++++++++-----
 ...imeBalanceNumSegmentAssignmentStrategyTest.java |  9 +++++++++
 ...eReplicaGroupSegmentAssignmentStrategyTest.java |  9 +++++++++
 5 files changed, 55 insertions(+), 13 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
index 72bdf29..3319ad7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.InstancePartitionsType;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,10 +94,12 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
       Configuration config) {
-    CompletedConsumingSegmentAssignmentPair pair = new CompletedConsumingSegmentAssignmentPair(currentAssignment);
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment =
+        new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
 
     // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment();
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
     List<String> instancesForCompletedSegments = SegmentAssignmentUtils
         .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.COMPLETED);
     Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
@@ -106,7 +107,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
             _replication);
 
     // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment();
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
         RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
       List<String> instancesForConsumingSegments = SegmentAssignmentUtils
@@ -132,6 +134,10 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
       newAssignment.putAll(consumingSegmentAssignment);
     }
 
+    // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE
+    // segments and re-assign them
+    newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
     return newAssignment;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
index afd74be..bd5126e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
@@ -90,11 +90,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
       Configuration config) {
-    SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair =
-        new SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment);
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment =
+        new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
 
     // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment();
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
     InstancePartitions instancePartitionsForCompletedSegments = InstancePartitionsUtils
         .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED);
     Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
@@ -107,7 +108,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
             partitionIdToSegmentsMap);
 
     // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment();
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
         RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
       InstancePartitions instancePartitionsForConsumingSegments = InstancePartitionsUtils
@@ -134,6 +136,10 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
       newAssignment.putAll(consumingSegmentAssignment);
     }
 
+    // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE
+    // segments and re-assign them
+    newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
     return newAssignment;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4333f91..0f3fe6f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -248,19 +248,27 @@ class SegmentAssignmentUtils {
   }
 
   /**
-   * Class that splits segment assignment into CONSUMING segments and COMPLETED segments.
+   * Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments.
    */
-  static class CompletedConsumingSegmentAssignmentPair {
+  static class CompletedConsumingOfflineSegmentAssignment {
     private final Map<String, Map<String, String>> _completedSegmentAssignment = new TreeMap<>();
     private final Map<String, Map<String, String>> _consumingSegmentAssignment = new TreeMap<>();
+    private final Map<String, Map<String, String>> _offlineSegmentAssignment = new TreeMap<>();
 
-    CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>> segmentAssignment) {
+    // NOTE: split the segments based on the following criteria:
+    //       1. At least one instance ONLINE -> COMPLETED segment
+    //       2. At least one instance CONSUMING -> CONSUMING segment
+    //       3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment
+    CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) {
       for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+        String segmentName = entry.getKey();
         Map<String, String> instanceStateMap = entry.getValue();
         if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
-          _completedSegmentAssignment.put(entry.getKey(), instanceStateMap);
+          _completedSegmentAssignment.put(segmentName, instanceStateMap);
+        } else if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+          _consumingSegmentAssignment.put(segmentName, instanceStateMap);
         } else {
-          _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap);
+          _offlineSegmentAssignment.put(segmentName, instanceStateMap);
         }
       }
     }
@@ -272,5 +280,9 @@ class SegmentAssignmentUtils {
     Map<String, Map<String, String>> getConsumingSegmentAssignment() {
       return _consumingSegmentAssignment;
     }
+
+    Map<String, Map<String, String>> getOfflineSegmentAssignment() {
+      return _offlineSegmentAssignment;
+    }
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
index 87ef462..0aa454c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
@@ -188,6 +188,15 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
     assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
+
+    // Rebalance should not change the assignment for the OFFLINE segments
+    String offlineSegmentName = "offlineSegment";
+    Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+        .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
+            RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+    currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
index c4c9a82..6d83cc6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
@@ -210,6 +210,15 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest {
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
     assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
+
+    // Rebalance should not change the assignment for the OFFLINE segments
+    String offlineSegmentName = "offlineSegment";
+    Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+        .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
+            RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+    currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org