You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/11/12 23:49:53 UTC
[incubator-pinot] 01/01: Fix relocation logic to allow relocation
from any servers
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch relocation_fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 854f566e4553bd67fe33531e9e1b20c58040d0cb
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Nov 12 15:49:30 2018 -0800
Fix relocation logic to allow relocation from any servers
---
.../core/relocation/RealtimeSegmentRelocator.java | 45 ++++++++++------------
.../relocation/RealtimeSegmentRelocatorTest.java | 5 ++-
2 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index f71a314..b4d5b23 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory;
/**
- * Manager to relocate completed segments from consuming servers to completed servers
+ * Manager to relocate completed segments to completed servers
* Segment relocation will be done by this manager, instead of directly moving segments to completed servers on completion,
* so that we don't get segment downtime when a segment is in transition
*
* We only relocate segments for realtime tables, and only if tenant config indicates that relocation is required
- * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and on consuming servers
+ * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and all/some are on servers other than completed servers
*/
public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
@@ -107,7 +107,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
/**
* Given a realtime tag config and an ideal state, relocate the segments
- * which are completed but still hanging around on consuming servers, one replica at a time
+ * which are completed but not yet moved to completed servers, one replica at a time
* @param realtimeTagConfig
* @param idealState
*/
@@ -115,12 +115,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
final HelixManager helixManager = _pinotHelixResourceManager.getHelixZkManager();
- List<String> consumingServers = getInstancesWithTag(helixManager, realtimeTagConfig.getConsumingServerTag());
- if (consumingServers.isEmpty()) {
- throw new IllegalStateException(
- "Found no realtime consuming servers with tag " + realtimeTagConfig.getConsumingServerTag());
- }
- List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag());
+ final List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag());
if (completedServers.isEmpty()) {
throw new IllegalStateException(
"Found no realtime completed servers with tag " + realtimeTagConfig.getCompletedServerTag());
@@ -152,7 +147,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
completedServersQueue.addAll(completedServerToNumSegments.entrySet());
// get new mapping for segments that need relocation
- createNewIdealState(realtimeTagConfig, idealState, consumingServers, completedServersQueue);
+ createNewIdealState(realtimeTagConfig, idealState, completedServers, completedServersQueue);
}
@VisibleForTesting
@@ -161,26 +156,26 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
}
/**
- * Given an ideal state, list of consuming serves and completed servers,
- * create a mapping for those segments that should relocate a replica from consuming to completed server
+ * Given an ideal state find the segments that need to relocate a replica to completed servers,
+ * and create a new instance state map for those segments
*
* @param realtimeTagConfig
* @param idealState
- * @param consumingServers
+ * @param completedServers
* @param completedServersQueue
* @return
*/
- private void createNewIdealState(RealtimeTagConfig realtimeTagConfig, IdealState idealState,
- List<String> consumingServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
+ private void createNewIdealState(final RealtimeTagConfig realtimeTagConfig, IdealState idealState,
+ final List<String> completedServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
// TODO: we are scanning the entire segments list every time. This is unnecessary because only the latest segments will need relocation
// Can we do something to avoid this?
// 1. Time boundary: scan only last day whereas runFrequency = hourly
// 2. For each partition, scan in descending order, and stop when the first segment not needing relocation is found
for (String segmentName : idealState.getPartitionSet()) {
- Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
+ final Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
Map<String, String> newInstanceStateMap =
- createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, consumingServers,
+ createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, completedServers,
completedServersQueue);
if (MapUtils.isNotEmpty(newInstanceStateMap)) {
idealState.setInstanceStateMap(segmentName, newInstanceStateMap);
@@ -189,30 +184,32 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
}
/**
- * Given the instanceStateMap and a list of consuming and completed servers for a realtime resource,
- * creates a new instanceStateMap, where one replica's instance is replaced from a consuming server to a completed server
+ * Given the instance state map of a segment, relocate one replica to a completed server if needed
+ * Relocation should be done only if all replicas are ONLINE and at least one replica is not on the completed servers
*
* @param realtimeTagConfig
* @param instanceStateMap
- * @param consumingServers
+ * @param completedServers
* @param completedServersQueue
* @return
*/
- private Map<String, String> createNewInstanceStateMap(RealtimeTagConfig realtimeTagConfig, String segmentName,
- Map<String, String> instanceStateMap, List<String> consumingServers,
+ private Map<String, String> createNewInstanceStateMap(final RealtimeTagConfig realtimeTagConfig,
+ final String segmentName, final Map<String, String> instanceStateMap, final List<String> completedServers,
MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
Map<String, String> newInstanceStateMap = null;
- // proceed only if all segments are ONLINE, and at least 1 server is from consuming list
+ // proceed only if all segments are ONLINE
for (String state : instanceStateMap.values()) {
if (!state.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE)) {
return newInstanceStateMap;
}
}
+ // if an instance is found in the instance state map which is not a completed server,
+ // replace the instance with one from the completed servers queue
for (String instance : instanceStateMap.keySet()) {
- if (consumingServers.contains(instance)) {
+ if (!completedServers.contains(instance)) {
// Decide best strategy to pick completed server.
// 1. pick random from list of completed servers
// 2. pick completed server with minimum segments, based on ideal state of this resource
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 1b0ae09..45a1212 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -103,6 +103,7 @@ public class RealtimeSegmentRelocatorTest {
ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
// no consuming instances found
+ // TODO: this should not be tested anymore
_realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, new ArrayList<>());
_realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, completedInstanceList);
boolean exception = false;
@@ -116,7 +117,7 @@ public class RealtimeSegmentRelocatorTest {
// no completed instances found
_realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, consumingInstanceList);
- _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<String>());
+ _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<>());
try {
_realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
} catch (Exception e) {
@@ -229,6 +230,8 @@ public class RealtimeSegmentRelocatorTest {
segmentNameToExpectedNumCompletedInstances.put("segment2", 1);
verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, consumingInstanceList,
nReplicas, segmentNameToExpectedNumCompletedInstances);
+
+ // add test to check that segments not on consuming tag also relocate
}
private void verifySegmentAssignment(IdealState updatedIdealState,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org