You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/11/12 23:49:53 UTC

[incubator-pinot] 01/01: Fix relocation logic to allow relocation from any servers

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch relocation_fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 854f566e4553bd67fe33531e9e1b20c58040d0cb
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Nov 12 15:49:30 2018 -0800

    Fix relocation logic to allow relocation from any servers
---
 .../core/relocation/RealtimeSegmentRelocator.java  | 45 ++++++++++------------
 .../relocation/RealtimeSegmentRelocatorTest.java   |  5 ++-
 2 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index f71a314..b4d5b23 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Manager to relocate completed segments from consuming servers to completed servers
+ * Manager to relocate completed segments to completed servers
  * Segment relocation will be done by this manager, instead of directly moving segments to completed servers on completion,
  * so that we don't get segment downtime when a segment is in transition
  *
  * We only relocate segments for realtime tables, and only if tenant config indicates that relocation is required
- * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and on consuming servers
+ * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and all/some are on servers other than completed servers
  */
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
@@ -107,7 +107,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
 
   /**
    * Given a realtime tag config and an ideal state, relocate the segments
-   * which are completed but still hanging around on consuming servers, one replica at a time
+   * which are completed but not yet moved to completed servers, one replica at a time
    * @param  realtimeTagConfig
    * @param idealState
    */
@@ -115,12 +115,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
 
     final HelixManager helixManager = _pinotHelixResourceManager.getHelixZkManager();
 
-    List<String> consumingServers = getInstancesWithTag(helixManager, realtimeTagConfig.getConsumingServerTag());
-    if (consumingServers.isEmpty()) {
-      throw new IllegalStateException(
-          "Found no realtime consuming servers with tag " + realtimeTagConfig.getConsumingServerTag());
-    }
-    List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag());
+    final List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag());
     if (completedServers.isEmpty()) {
       throw new IllegalStateException(
           "Found no realtime completed servers with tag " + realtimeTagConfig.getCompletedServerTag());
@@ -152,7 +147,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
     completedServersQueue.addAll(completedServerToNumSegments.entrySet());
 
     // get new mapping for segments that need relocation
-    createNewIdealState(realtimeTagConfig, idealState, consumingServers, completedServersQueue);
+    createNewIdealState(realtimeTagConfig, idealState, completedServers, completedServersQueue);
   }
 
   @VisibleForTesting
@@ -161,26 +156,26 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   /**
-   * Given an ideal state, list of consuming serves and completed servers,
-   * create a mapping for those segments that should relocate a replica from consuming to completed server
+   * Given an ideal state find the segments that need to relocate a replica to completed servers,
+   * and create a new instance state map for those segments
    *
    * @param realtimeTagConfig
    * @param idealState
-   * @param consumingServers
+   * @param completedServers
    * @param completedServersQueue
    * @return
    */
-  private void createNewIdealState(RealtimeTagConfig realtimeTagConfig, IdealState idealState,
-      List<String> consumingServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
+  private void createNewIdealState(final RealtimeTagConfig realtimeTagConfig, IdealState idealState,
+      final List<String> completedServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
 
     // TODO: we are scanning the entire segments list every time. This is unnecessary because only the latest segments will need relocation
     // Can we do something to avoid this?
     // 1. Time boundary: scan only last day whereas runFrequency = hourly
     // 2. For each partition, scan in descending order, and stop when the first segment not needing relocation is found
     for (String segmentName : idealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      final Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
       Map<String, String> newInstanceStateMap =
-          createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, consumingServers,
+          createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, completedServers,
               completedServersQueue);
       if (MapUtils.isNotEmpty(newInstanceStateMap)) {
         idealState.setInstanceStateMap(segmentName, newInstanceStateMap);
@@ -189,30 +184,32 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   /**
-   * Given the instanceStateMap and a list of consuming and completed servers for a realtime resource,
-   * creates a new instanceStateMap, where one replica's instance is replaced from a consuming server to a completed server
+   * Given the instance state map of a segment, relocate one replica to a completed server if needed
+   * Relocation should be done only if all replicas are ONLINE and at least one replica is not on the completed servers
    *
    * @param realtimeTagConfig
    * @param instanceStateMap
-   * @param consumingServers
+   * @param completedServers
    * @param completedServersQueue
    * @return
    */
-  private Map<String, String> createNewInstanceStateMap(RealtimeTagConfig realtimeTagConfig, String segmentName,
-      Map<String, String> instanceStateMap, List<String> consumingServers,
+  private Map<String, String> createNewInstanceStateMap(final RealtimeTagConfig realtimeTagConfig,
+      final String segmentName, final Map<String, String> instanceStateMap, final List<String> completedServers,
       MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
 
     Map<String, String> newInstanceStateMap = null;
 
-    // proceed only if all segments are ONLINE, and at least 1 server is from consuming list
+    // proceed only if all segments are ONLINE
     for (String state : instanceStateMap.values()) {
       if (!state.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE)) {
         return newInstanceStateMap;
       }
     }
 
+    // if an instance is found in the instance state map which is not a completed server,
+    // replace the instance with one from the completed servers queue
     for (String instance : instanceStateMap.keySet()) {
-      if (consumingServers.contains(instance)) {
+      if (!completedServers.contains(instance)) {
         // Decide best strategy to pick completed server.
         // 1. pick random from list of completed servers
         // 2. pick completed server with minimum segments, based on ideal state of this resource
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 1b0ae09..45a1212 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -103,6 +103,7 @@ public class RealtimeSegmentRelocatorTest {
     ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
 
     // no consuming instances found
+    // TODO: this should not be tested anymore
     _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, new ArrayList<>());
     _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, completedInstanceList);
     boolean exception = false;
@@ -116,7 +117,7 @@ public class RealtimeSegmentRelocatorTest {
 
     // no completed instances found
     _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, consumingInstanceList);
-    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<String>());
+    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<>());
     try {
       _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
     } catch (Exception e) {
@@ -229,6 +230,8 @@ public class RealtimeSegmentRelocatorTest {
     segmentNameToExpectedNumCompletedInstances.put("segment2", 1);
     verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, consumingInstanceList,
         nReplicas, segmentNameToExpectedNumCompletedInstances);
+
+    // add test to check that segments not on consuming tag also relocate
   }
 
   private void verifySegmentAssignment(IdealState updatedIdealState,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org