You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/06/19 00:02:12 UTC

[incubator-pinot] branch rebalance_llc created (now 82cbaca)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch rebalance_llc
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 82cbaca  Take OFFLINE segment into account for real-time rebalancer

This branch includes the following new commits:

     new 82cbaca  Take OFFLINE segment into account for real-time rebalancer

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Take OFFLINE segment into account for real-time rebalancer

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch rebalance_llc
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 82cbaca278eea6571f4cd44a2eff694b3f8dc4f3
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Tue Jun 18 16:53:58 2019 -0700

    Take OFFLINE segment into account for real-time rebalancer
    
    When the consuming segment encounters error, the ideal state can be turned into OFFLINE
    If the instance states for a segment is all OFFLINE, the segment is counted OFFLINE and won't be rebalanced
---
 ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 13 +++++++++----
 ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 13 +++++++++----
 .../assignment/segment/SegmentAssignmentUtils.java | 22 +++++++++++++++++-----
 ...imeBalanceNumSegmentAssignmentStrategyTest.java |  9 +++++++++
 ...eReplicaGroupSegmentAssignmentStrategyTest.java |  9 +++++++++
 5 files changed, 53 insertions(+), 13 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
index 72bdf29..1395294 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.InstancePartitionsType;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,10 +94,12 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
       Configuration config) {
-    CompletedConsumingSegmentAssignmentPair pair = new CompletedConsumingSegmentAssignmentPair(currentAssignment);
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment =
+        new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
 
     // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment();
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
     List<String> instancesForCompletedSegments = SegmentAssignmentUtils
         .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.COMPLETED);
     Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
@@ -106,7 +107,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
             _replication);
 
     // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment();
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
         RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
       List<String> instancesForConsumingSegments = SegmentAssignmentUtils
@@ -132,6 +134,9 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
       newAssignment.putAll(consumingSegmentAssignment);
     }
 
+    // Keep the OFFLINE segments not moved
+    newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
     return newAssignment;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
index afd74be..3e80b44 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
@@ -90,11 +90,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
       Configuration config) {
-    SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair =
-        new SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment);
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment =
+        new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
 
     // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment();
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
     InstancePartitions instancePartitionsForCompletedSegments = InstancePartitionsUtils
         .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED);
     Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
@@ -107,7 +108,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
             partitionIdToSegmentsMap);
 
     // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment();
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
         RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
       InstancePartitions instancePartitionsForConsumingSegments = InstancePartitionsUtils
@@ -134,6 +136,9 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
       newAssignment.putAll(consumingSegmentAssignment);
     }
 
+    // Keep the OFFLINE segments not moved
+    newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
     return newAssignment;
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4333f91..0f3fe6f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -248,19 +248,27 @@ class SegmentAssignmentUtils {
   }
 
   /**
-   * Class that splits segment assignment into CONSUMING segments and COMPLETED segments.
+   * Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments.
    */
-  static class CompletedConsumingSegmentAssignmentPair {
+  static class CompletedConsumingOfflineSegmentAssignment {
     private final Map<String, Map<String, String>> _completedSegmentAssignment = new TreeMap<>();
     private final Map<String, Map<String, String>> _consumingSegmentAssignment = new TreeMap<>();
+    private final Map<String, Map<String, String>> _offlineSegmentAssignment = new TreeMap<>();
 
-    CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>> segmentAssignment) {
+    // NOTE: split the segments based on the following criteria:
+    //       1. At least one instance ONLINE -> COMPLETED segment
+    //       2. At least one instance CONSUMING -> CONSUMING segment
+    //       3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment
+    CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) {
       for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+        String segmentName = entry.getKey();
         Map<String, String> instanceStateMap = entry.getValue();
         if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
-          _completedSegmentAssignment.put(entry.getKey(), instanceStateMap);
+          _completedSegmentAssignment.put(segmentName, instanceStateMap);
+        } else if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+          _consumingSegmentAssignment.put(segmentName, instanceStateMap);
         } else {
-          _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap);
+          _offlineSegmentAssignment.put(segmentName, instanceStateMap);
         }
       }
     }
@@ -272,5 +280,9 @@ class SegmentAssignmentUtils {
     Map<String, Map<String, String>> getConsumingSegmentAssignment() {
       return _consumingSegmentAssignment;
     }
+
+    Map<String, Map<String, String>> getOfflineSegmentAssignment() {
+      return _offlineSegmentAssignment;
+    }
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
index 87ef462..0aa454c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
@@ -188,6 +188,15 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
     assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
+
+    // Rebalance should not change the assignment for the OFFLINE segments
+    String offlineSegmentName = "offlineSegment";
+    Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+        .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
+            RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+    currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
index c4c9a82..6d83cc6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
@@ -210,6 +210,15 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest {
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
     assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
+
+    // Rebalance should not change the assignment for the OFFLINE segments
+    String offlineSegmentName = "offlineSegment";
+    Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+        .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
+            RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+    currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org