You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/30 20:01:35 UTC
[incubator-pinot] branch master updated: Take OFFLINE segment into
account for real-time rebalancer (#4337)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1c81dfb Take OFFLINE segment into account for real-time rebalancer (#4337)
1c81dfb is described below
commit 1c81dfbfdfb8f5afd37993025c0caaec765383f0
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jul 30 13:01:29 2019 -0700
Take OFFLINE segment into account for real-time rebalancer (#4337)
When the consuming segment encounters error, the ideal state can be turned into OFFLINE
If the instance states for a segment is all OFFLINE, the segment is counted OFFLINE and won't be rebalanced
RealtimeSegmentValidationManager will periodically detect the OFFLINE segments and re-assign them
---
...ealtimeBalanceNumSegmentAssignmentStrategy.java | 14 ++++++++++----
...ltimeReplicaGroupSegmentAssignmentStrategy.java | 14 ++++++++++----
.../assignment/segment/SegmentAssignmentUtils.java | 22 +++++++++++++++++-----
...imeBalanceNumSegmentAssignmentStrategyTest.java | 9 +++++++++
...eReplicaGroupSegmentAssignmentStrategyTest.java | 9 +++++++++
5 files changed, 55 insertions(+), 13 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
index 72bdf29..3319ad7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.InstancePartitionsType;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,10 +94,12 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
Configuration config) {
- CompletedConsumingSegmentAssignmentPair pair = new CompletedConsumingSegmentAssignmentPair(currentAssignment);
+ SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment =
+ new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
// Rebalance COMPLETED segments first
- Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment();
+ Map<String, Map<String, String>> completedSegmentAssignment =
+ completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
List<String> instancesForCompletedSegments = SegmentAssignmentUtils
.getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.COMPLETED);
Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
@@ -106,7 +107,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
_replication);
// Rebalance CONSUMING segments if needed
- Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment();
+ Map<String, Map<String, String>> consumingSegmentAssignment =
+ completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
List<String> instancesForConsumingSegments = SegmentAssignmentUtils
@@ -132,6 +134,10 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig
newAssignment.putAll(consumingSegmentAssignment);
}
+ // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE
+ // segments and re-assign them
+ newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
return newAssignment;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
index afd74be..bd5126e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
@@ -90,11 +90,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
Configuration config) {
- SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair =
- new SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment);
+ SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment =
+ new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
// Rebalance COMPLETED segments first
- Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment();
+ Map<String, Map<String, String>> completedSegmentAssignment =
+ completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
InstancePartitions instancePartitionsForCompletedSegments = InstancePartitionsUtils
.fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED);
Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
@@ -107,7 +108,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
partitionIdToSegmentsMap);
// Rebalance CONSUMING segments if needed
- Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment();
+ Map<String, Map<String, String>> consumingSegmentAssignment =
+ completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
InstancePartitions instancePartitionsForConsumingSegments = InstancePartitionsUtils
@@ -134,6 +136,10 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss
newAssignment.putAll(consumingSegmentAssignment);
}
+ // Keep the OFFLINE segments not moved, and RealtimeSegmentValidationManager will periodically detect the OFFLINE
+ // segments and re-assign them
+ newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
return newAssignment;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4333f91..0f3fe6f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -248,19 +248,27 @@ class SegmentAssignmentUtils {
}
/**
- * Class that splits segment assignment into CONSUMING segments and COMPLETED segments.
+ * Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments.
*/
- static class CompletedConsumingSegmentAssignmentPair {
+ static class CompletedConsumingOfflineSegmentAssignment {
private final Map<String, Map<String, String>> _completedSegmentAssignment = new TreeMap<>();
private final Map<String, Map<String, String>> _consumingSegmentAssignment = new TreeMap<>();
+ private final Map<String, Map<String, String>> _offlineSegmentAssignment = new TreeMap<>();
- CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>> segmentAssignment) {
+ // NOTE: split the segments based on the following criteria:
+ // 1. At least one instance ONLINE -> COMPLETED segment
+ // 2. At least one instance CONSUMING -> CONSUMING segment
+ // 3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment
+ CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) {
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
- _completedSegmentAssignment.put(entry.getKey(), instanceStateMap);
+ _completedSegmentAssignment.put(segmentName, instanceStateMap);
+ } else if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+ _consumingSegmentAssignment.put(segmentName, instanceStateMap);
} else {
- _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap);
+ _offlineSegmentAssignment.put(segmentName, instanceStateMap);
}
}
}
@@ -272,5 +280,9 @@ class SegmentAssignmentUtils {
Map<String, Map<String, String>> getConsumingSegmentAssignment() {
return _consumingSegmentAssignment;
}
+
+ Map<String, Map<String, String>> getOfflineSegmentAssignment() {
+ return _offlineSegmentAssignment;
+ }
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
index 87ef462..0aa454c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
@@ -188,6 +188,15 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest {
BaseConfiguration config = new BaseConfiguration();
config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
+
+ // Rebalance should not change the assignment for the OFFLINE segments
+ String offlineSegmentName = "offlineSegment";
+ Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+ .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
+ RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
}
private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
index c4c9a82..6d83cc6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
@@ -210,6 +210,15 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest {
BaseConfiguration config = new BaseConfiguration();
config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
+
+ // Rebalance should not change the assignment for the OFFLINE segments
+ String offlineSegmentName = "offlineSegment";
+ Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+ .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS),
+ RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment);
}
private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org