You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/11/12 23:49:52 UTC

[incubator-pinot] branch relocation_fix created (now 854f566)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a change to branch relocation_fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 854f566  Fix relocation logic to allow relocation from any servers

This branch includes the following new commits:

     new 854f566  Fix relocation logic to allow relocation from any servers

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Fix relocation logic to allow relocation from any servers

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch relocation_fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 854f566e4553bd67fe33531e9e1b20c58040d0cb
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Nov 12 15:49:30 2018 -0800

    Fix relocation logic to allow relocation from any servers
---
 .../core/relocation/RealtimeSegmentRelocator.java  | 45 ++++++++++------------
 .../relocation/RealtimeSegmentRelocatorTest.java   |  5 ++-
 2 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index f71a314..b4d5b23 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Manager to relocate completed segments from consuming servers to completed servers
+ * Manager to relocate completed segments to completed servers
  * Segment relocation will be done by this manager, instead of directly moving segments to completed servers on completion,
  * so that we don't get segment downtime when a segment is in transition
  *
  * We only relocate segments for realtime tables, and only if tenant config indicates that relocation is required
- * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and on consuming servers
+ * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and all/some are on servers other than completed servers
  */
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
@@ -107,7 +107,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
 
   /**
    * Given a realtime tag config and an ideal state, relocate the segments
-   * which are completed but still hanging around on consuming servers, one replica at a time
+   * which are completed but not yet moved to completed servers, one replica at a time
    * @param  realtimeTagConfig
    * @param idealState
    */
@@ -115,12 +115,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
 
     final HelixManager helixManager = _pinotHelixResourceManager.getHelixZkManager();
 
-    List<String> consumingServers = getInstancesWithTag(helixManager, realtimeTagConfig.getConsumingServerTag());
-    if (consumingServers.isEmpty()) {
-      throw new IllegalStateException(
-          "Found no realtime consuming servers with tag " + realtimeTagConfig.getConsumingServerTag());
-    }
-    List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag());
+    final List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag());
     if (completedServers.isEmpty()) {
       throw new IllegalStateException(
           "Found no realtime completed servers with tag " + realtimeTagConfig.getCompletedServerTag());
@@ -152,7 +147,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
     completedServersQueue.addAll(completedServerToNumSegments.entrySet());
 
     // get new mapping for segments that need relocation
-    createNewIdealState(realtimeTagConfig, idealState, consumingServers, completedServersQueue);
+    createNewIdealState(realtimeTagConfig, idealState, completedServers, completedServersQueue);
   }
 
   @VisibleForTesting
@@ -161,26 +156,26 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   /**
-   * Given an ideal state, list of consuming serves and completed servers,
-   * create a mapping for those segments that should relocate a replica from consuming to completed server
+   * Given an ideal state find the segments that need to relocate a replica to completed servers,
+   * and create a new instance state map for those segments
    *
    * @param realtimeTagConfig
    * @param idealState
-   * @param consumingServers
+   * @param completedServers
    * @param completedServersQueue
    * @return
    */
-  private void createNewIdealState(RealtimeTagConfig realtimeTagConfig, IdealState idealState,
-      List<String> consumingServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
+  private void createNewIdealState(final RealtimeTagConfig realtimeTagConfig, IdealState idealState,
+      final List<String> completedServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
 
     // TODO: we are scanning the entire segments list every time. This is unnecessary because only the latest segments will need relocation
     // Can we do something to avoid this?
     // 1. Time boundary: scan only last day whereas runFrequency = hourly
     // 2. For each partition, scan in descending order, and stop when the first segment not needing relocation is found
     for (String segmentName : idealState.getPartitionSet()) {
-      Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+      final Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
       Map<String, String> newInstanceStateMap =
-          createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, consumingServers,
+          createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, completedServers,
               completedServersQueue);
       if (MapUtils.isNotEmpty(newInstanceStateMap)) {
         idealState.setInstanceStateMap(segmentName, newInstanceStateMap);
@@ -189,30 +184,32 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   /**
-   * Given the instanceStateMap and a list of consuming and completed servers for a realtime resource,
-   * creates a new instanceStateMap, where one replica's instance is replaced from a consuming server to a completed server
+   * Given the instance state map of a segment, relocate one replica to a completed server if needed
+   * Relocation should be done only if all replicas are ONLINE and at least one replica is not on the completed servers
    *
    * @param realtimeTagConfig
    * @param instanceStateMap
-   * @param consumingServers
+   * @param completedServers
    * @param completedServersQueue
    * @return
    */
-  private Map<String, String> createNewInstanceStateMap(RealtimeTagConfig realtimeTagConfig, String segmentName,
-      Map<String, String> instanceStateMap, List<String> consumingServers,
+  private Map<String, String> createNewInstanceStateMap(final RealtimeTagConfig realtimeTagConfig,
+      final String segmentName, final Map<String, String> instanceStateMap, final List<String> completedServers,
       MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
 
     Map<String, String> newInstanceStateMap = null;
 
-    // proceed only if all segments are ONLINE, and at least 1 server is from consuming list
+    // proceed only if all segments are ONLINE
     for (String state : instanceStateMap.values()) {
       if (!state.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE)) {
         return newInstanceStateMap;
       }
     }
 
+    // if an instance is found in the instance state map which is not a completed server,
+    // replace the instance with one from the completed servers queue
     for (String instance : instanceStateMap.keySet()) {
-      if (consumingServers.contains(instance)) {
+      if (!completedServers.contains(instance)) {
         // Decide best strategy to pick completed server.
         // 1. pick random from list of completed servers
         // 2. pick completed server with minimum segments, based on ideal state of this resource
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 1b0ae09..45a1212 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -103,6 +103,7 @@ public class RealtimeSegmentRelocatorTest {
     ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
 
     // no consuming instances found
+    // TODO: this should not be tested anymore
     _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, new ArrayList<>());
     _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, completedInstanceList);
     boolean exception = false;
@@ -116,7 +117,7 @@ public class RealtimeSegmentRelocatorTest {
 
     // no completed instances found
     _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, consumingInstanceList);
-    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<String>());
+    _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<>());
     try {
       _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
     } catch (Exception e) {
@@ -229,6 +230,8 @@ public class RealtimeSegmentRelocatorTest {
     segmentNameToExpectedNumCompletedInstances.put("segment2", 1);
     verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, consumingInstanceList,
         nReplicas, segmentNameToExpectedNumCompletedInstances);
+
+    // add test to check that segments not on consuming tag also relocate
   }
 
   private void verifySegmentAssignment(IdealState updatedIdealState,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org