You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 18:00:03 UTC
[28/38] helix git commit: New DelayedAutoRebalancer featured with
delayed partition movements during rebalancing.
New DelayedAutoRebalancer featured with delayed partition movements during rebalancing.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b0d11228
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b0d11228
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b0d11228
Branch: refs/heads/helix-0.6.x
Commit: b0d1122841be3bd09276a546cfa3c5433ffefea9
Parents: 8c58cf3
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Sep 12 16:42:17 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:53:43 2017 -0800
----------------------------------------------------------------------
.../rebalancer/AbstractRebalancer.java | 80 ++---
.../controller/rebalancer/AutoRebalancer.java | 33 +-
.../rebalancer/DelayedAutoRebalancer.java | 336 ++++++++++++++++++
.../strategy/AutoRebalanceStrategy.java | 6 +-
.../strategy/CrushRebalanceStrategy.java | 9 +
.../rebalancer/util/RebalanceScheduler.java | 146 ++++++++
.../stages/BestPossibleStateCalcStage.java | 35 +-
.../controller/stages/ClusterDataCache.java | 123 ++++---
.../stages/PersistAssignmentStage.java | 2 +-
.../java/org/apache/helix/model/IdealState.java | 36 ++
.../helix/model/StateModelDefinition.java | 46 +++
.../helix/model/builder/IdealStateBuilder.java | 37 +-
.../helix/task/DeprecatedTaskRebalancer.java | 3 +-
.../org/apache/helix/task/JobRebalancer.java | 4 +-
.../java/org/apache/helix/task/TaskDriver.java | 7 +-
.../org/apache/helix/task/TaskRebalancer.java | 111 +-----
.../java/org/apache/helix/task/TaskUtil.java | 20 --
.../BestPossibleExternalViewVerifier.java | 52 ++-
.../rebalancer/TestAutoRebalanceStrategy.java | 8 +-
.../integration/TestDelayedAutoRebalance.java | 348 +++++++++++++++++++
.../TestDelayedAutoRebalanceWithRackaware.java | 73 ++++
.../TestRebalancerPersistAssignments.java | 19 +-
.../integration/ZkIntegrationTestBase.java | 14 +
23 files changed, 1236 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 2338ce7..7bf2153 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -19,8 +19,10 @@ package org.apache.helix.controller.rebalancer;
* under the License.
*/
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
@@ -30,12 +32,11 @@ import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -100,50 +101,6 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
return partitionMapping;
}
- /**
- * @return state count map: state->count
- */
- protected static LinkedHashMap<String, Integer> stateCount(StateModelDefinition stateModelDef,
- int liveNodesNb, int totalReplicas) {
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-
- int replicas = totalReplicas;
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
- if ("N".equals(num)) {
- stateCountMap.put(state, liveNodesNb);
- } else if ("R".equals(num)) {
- // wait until we get the counts for all other states
- continue;
- } else {
- int stateCount = -1;
- try {
- stateCount = Integer.parseInt(num);
- } catch (Exception e) {
- // LOG.error("Invalid count for state: " + state + ", count: " + num +
- // ", use -1 instead");
- }
-
- if (stateCount > 0) {
- stateCountMap.put(state, stateCount);
- replicas -= stateCount;
- }
- }
- }
-
- // get state count for R
- for (String state : statesPriorityList) {
- String num = stateModelDef.getNumInstancesPerState(state);
- if ("R".equals(num)) {
- stateCountMap.put(state, replicas);
- // should have at most one state using R
- break;
- }
- }
- return stateCountMap;
- }
-
protected Map<String, Map<String, String>> currentMapping(CurrentStateOutput currentStateOutput,
String resourceName, List<String> partitions, Map<String, Integer> stateCountMap) {
@@ -167,4 +124,35 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
}
return map;
}
+
+ protected RebalanceStrategy getRebalanceStrategy(String rebalanceStrategyName,
+ List<String> partitions, String resourceName, LinkedHashMap<String, Integer> stateCountMap,
+ int maxPartition) {
+ RebalanceStrategy rebalanceStrategy;
+ if (rebalanceStrategyName == null || rebalanceStrategyName
+ .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
+ rebalanceStrategy =
+ new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
+ } else {
+ try {
+ rebalanceStrategy = RebalanceStrategy.class
+ .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
+ rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
+ } catch (ClassNotFoundException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ } catch (InstantiationException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ } catch (IllegalAccessException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ }
+ }
+
+ return rebalanceStrategy;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 74f8b6e..8096d5a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -62,8 +62,8 @@ public class AutoRebalancer extends AbstractRebalancer {
Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
String replicas = currentIdealState.getReplicas();
- LinkedHashMap<String, Integer> stateCountMap =
- stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
+ LinkedHashMap<String, Integer> stateCountMap = StateModelDefinition
+ .getStateCountMap(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
allNodes.removeAll(clusterData.getDisabledInstances());
@@ -110,32 +110,9 @@ public class AutoRebalancer extends AbstractRebalancer {
Collections.sort(liveNodes);
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
-
- String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
- if (rebalanceStrategyName == null || rebalanceStrategyName
- .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
- _rebalanceStrategy =
- new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
- } else {
- try {
- _rebalanceStrategy = RebalanceStrategy.class
- .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
- _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
- } catch (ClassNotFoundException ex) {
- throw new HelixException(
- "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
- ex);
- } catch (InstantiationException ex) {
- throw new HelixException(
- "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
- ex);
- } catch (IllegalAccessException ex) {
- throw new HelixException(
- "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
- ex);
- }
- }
-
+ _rebalanceStrategy =
+ getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
+ stateCountMap, maxPartition);
ZNRecord newMapping = _rebalanceStrategy
.computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
new file mode 100644
index 0000000..1e127bc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -0,0 +1,336 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is the Full-Auto Rebalancer that is featured with delayed partition movement.
+ */
+public class DelayedAutoRebalancer extends AbstractRebalancer {
+ private static final Logger LOG = Logger.getLogger(DelayedAutoRebalancer.class);
+ private static RebalanceScheduler _scheduledRebalancer = new RebalanceScheduler();
+
+ @Override public IdealState computeNewIdealState(String resourceName,
+ IdealState currentIdealState, CurrentStateOutput currentStateOutput,
+ ClusterDataCache clusterData) {
+
+ List<String> partitions = new ArrayList<String>(currentIdealState.getPartitionSet());
+ if (partitions.size() == 0) {
+ LOG.info("Partition count is 0 for resource " + resourceName
+ + ", stop calculate ideal mapping for the resource.");
+ return generateNewIdealState(resourceName, currentIdealState,
+ emptyMapping(currentIdealState));
+ }
+
+ Set<String> liveNodes;
+ Set<String> allNodes;
+
+ String instanceTag = currentIdealState.getInstanceGroupTag();
+ if (instanceTag != null) {
+ liveNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
+ allNodes = clusterData.getAllInstancesWithTag(instanceTag);
+
+ if (!liveNodes.isEmpty()) {
+ // live nodes exist that have this tag
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("Found the following participants with tag %s for %s: %s",
+ currentIdealState.getInstanceGroupTag(), resourceName,
+ Arrays.toString(liveNodes.toArray())));
+ }
+ }
+ } else {
+ liveNodes = clusterData.getEnabledLiveInstances();
+ allNodes = clusterData.getEnabledInstances();
+ }
+
+ Set<String> activeNodes = getActiveInstances(currentIdealState, allNodes, liveNodes,
+ clusterData.getInstanceOfflineTimeMap());
+
+ setRebalanceScheduler(currentIdealState, activeNodes, clusterData.getInstanceOfflineTimeMap());
+
+ if (allNodes.isEmpty() || activeNodes.isEmpty()) {
+ LOG.error(String.format(
+ "No instances or active instances available for resource %s, allNodes: %s, liveNodes: %s, activeInstances: %s",
+ resourceName, Arrays.toString(allNodes.toArray()), Arrays.toString(liveNodes.toArray()),
+ Arrays.toString(activeNodes.toArray())));
+ return generateNewIdealState(resourceName, currentIdealState,
+ emptyMapping(currentIdealState));
+ }
+
+ StateModelDefinition stateModelDef =
+ clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());
+
+ int replicaCount = getReplicaCount(currentIdealState, activeNodes);
+ if (replicaCount == 0) {
+ LOG.error("Replica count is 0 for resource " + resourceName
+ + ", stop calculate ideal mapping for the resource.");
+ return generateNewIdealState(resourceName, currentIdealState,
+ emptyMapping(currentIdealState));
+ }
+
+ int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
+
+ LinkedHashMap<String, Integer> stateCountMap =
+ StateModelDefinition.getStateCountMap(stateModelDef, activeNodes.size(), replicaCount);
+ Map<String, Map<String, String>> currentMapping =
+ currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
+
+ int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
+ _rebalanceStrategy =
+ getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
+ stateCountMap, maxPartition);
+
+ // sort node lists to ensure consistent preferred assignments
+ List<String> allNodeList = new ArrayList<String>(allNodes);
+ List<String> liveNodeList = new ArrayList<String>(liveNodes);
+ List<String> activeNodeList = new ArrayList<String>(activeNodes);
+ Collections.sort(allNodeList);
+ Collections.sort(liveNodeList);
+ Collections.sort(activeNodeList);
+
+ ZNRecord newIdealMapping = _rebalanceStrategy
+ .computePartitionAssignment(allNodeList, liveNodeList, currentMapping, clusterData);
+ ZNRecord newActiveMapping = _rebalanceStrategy
+ .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
+ ZNRecord finalMapping =
+ getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveNodes,
+ replicaCount, minActiveReplicas);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentMapping: " + currentMapping);
+ LOG.debug("stateCountMap: " + stateCountMap);
+ LOG.debug("liveNodes: " + liveNodes);
+ LOG.debug("allNodes: " + allNodes);
+ LOG.debug("maxPartition: " + maxPartition);
+ LOG.debug("newIdealMapping: " + newIdealMapping);
+ LOG.debug("newActiveMapping: " + newActiveMapping);
+ LOG.debug("finalMapping: " + finalMapping);
+ }
+
+ return generateNewIdealState(resourceName, currentIdealState, finalMapping);
+ }
+
+ private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
+ ZNRecord newMapping) {
+ IdealState newIdealState = new IdealState(resourceName);
+ newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
+ newIdealState.setRebalanceMode(currentIdealState.getRebalanceMode());
+ newIdealState.getRecord().setListFields(newMapping.getListFields());
+
+ return newIdealState;
+ }
+
+ /* get all active instances (live instances plus offline-yet-active instances */
+ private Set<String> getActiveInstances(IdealState idealState, Set<String> allNodes,
+ Set<String> liveNodes, Map<String, Long> instanceOfflineTimeMap) {
+ Set<String> activeInstances = new HashSet<String>(liveNodes);
+ Set<String> offlineInstances = new HashSet<String>(allNodes);
+ offlineInstances.removeAll(liveNodes);
+
+ long currentTime = System.currentTimeMillis();
+ long delayTime = idealState.getRebalanceDelay();
+ for (String ins : offlineInstances) {
+ Long offlineTime = instanceOfflineTimeMap.get(ins);
+ if (offlineTime != null && offlineTime > 0) {
+ if (delayTime > 0 && offlineTime + delayTime > currentTime) {
+ activeInstances.add(ins);
+ }
+ }
+ }
+
+ return activeInstances;
+ }
+
+ /* Set a rebalance scheduler for the closest future rebalance time. */
+ private void setRebalanceScheduler(IdealState idealState, Set<String> activeInstances,
+ Map<String, Long> instanceOfflineTimeMap) {
+ long nextRebalanceTime = Long.MAX_VALUE;
+ long delayTime = idealState.getRebalanceDelay();
+
+ for (String ins : activeInstances) {
+ Long offlineTime = instanceOfflineTimeMap.get(ins);
+ if (offlineTime != null && offlineTime > 0) {
+ // calculate the closest future rebalance time
+ if (offlineTime + delayTime < nextRebalanceTime) {
+ long rebalanceTime = offlineTime + delayTime;
+ if (rebalanceTime < nextRebalanceTime) {
+ nextRebalanceTime = rebalanceTime;
+ }
+ }
+ }
+ }
+
+ String resourceName = idealState.getResourceName();
+ LOG.debug(String
+ .format("Next rebalance time for resource %s is %d\n", resourceName, nextRebalanceTime));
+ if (nextRebalanceTime == Long.MAX_VALUE) {
+ _scheduledRebalancer.removeScheduledRebalance(resourceName);
+ } else {
+ _scheduledRebalancer.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
+ }
+ }
+
+ private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
+ ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
+ if (minActiveReplica >= numReplica) {
+ return newIdealMapping;
+ }
+ ZNRecord finalMapping = new ZNRecord(idealState.getResourceName());
+ for (String partition : idealState.getPartitionSet()) {
+ List<String> idealList = newIdealMapping.getListField(partition);
+ List<String> activeList = newActiveMapping.getListField(partition);
+
+ List<String> liveList = new ArrayList<String>();
+ int activeReplica = 0;
+ for (String ins : activeList) {
+ if (liveInstances.contains(ins)) {
+ activeReplica++;
+ liveList.add(ins);
+ }
+ }
+
+ if (activeReplica >= minActiveReplica) {
+ finalMapping.setListField(partition, activeList);
+ } else {
+ List<String> candidates = new ArrayList<String>(idealList);
+ candidates.removeAll(activeList);
+ for (String liveIns : candidates) {
+ liveList.add(liveIns);
+ if (liveList.size() >= minActiveReplica) {
+ break;
+ }
+ }
+ finalMapping.setListField(partition, liveList);
+ }
+ }
+ return finalMapping;
+ }
+
+ private ZNRecord emptyMapping(IdealState idealState) {
+ ZNRecord emptyMapping = new ZNRecord(idealState.getResourceName());
+ for (String partition : idealState.getPartitionSet()) {
+ emptyMapping.setListField(partition, new ArrayList<String>());
+ }
+ return emptyMapping;
+ }
+
+ /**
+ * Compute the best state for all partitions.
+ * This is the default ConstraintBasedAssignment implementation, subclasses should re-implement
+ * this method if its logic to generate bestpossible map for each partition is different from the default one here.
+ *
+ * @param cache
+ * @param idealState
+ * @param resource
+ * @param currentStateOutput Provides the current state and pending state transitions for all partitions
+ * @return
+ */
+ @Override
+ public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache cache,
+ IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing resource:" + resource.getResourceName());
+ }
+
+ Set<String> allNodes = cache.getEnabledInstances();
+ Set<String> liveNodes = cache.getEnabledLiveInstances();
+ Set<String> offlineNodes = cache.getAllInstances();
+ offlineNodes.removeAll(cache.getLiveInstances().keySet());
+
+ Set<String> activeNodes =
+ getActiveInstances(idealState, allNodes, liveNodes, cache.getInstanceOfflineTimeMap());
+
+ String stateModelDefName = idealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+ ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
+ Set<String> disabledInstancesForPartition =
+ cache.getDisabledInstancesForPartition(partition.toString());
+ List<String> preferenceList =
+ ConstraintBasedAssignment.getPreferenceList(partition, idealState, activeNodes);
+ Map<String, String> bestStateForPartition = ConstraintBasedAssignment
+ .computeAutoBestStateForPartition(cache, stateModelDef, preferenceList, currentStateMap,
+ disabledInstancesForPartition, idealState.isEnabled());
+
+ if (preferenceList == null) {
+ LOG.info(String.format(
+ "No preferenceList defined for partition %s, resource %s, skip computing best possible mapping!",
+ partition.getPartitionName(), idealState.getResourceName()));
+ continue;
+ }
+
+ for (String ins : preferenceList) {
+ if (offlineNodes.contains(ins) && !bestStateForPartition.containsKey(ins)) {
+ bestStateForPartition.put(ins, stateModelDef.getInitialState());
+ }
+ }
+ partitionMapping.addReplicaMap(partition, bestStateForPartition);
+ }
+ return partitionMapping;
+ }
+
+ private int getReplicaCount(IdealState idealState, Set<String> eligibleInstances) {
+ String replicaStr = idealState.getReplicas();
+ int replica = 0;
+
+ try {
+ replica = Integer.parseInt(replicaStr);
+ } catch (NumberFormatException ex) {
+ if (replicaStr.equalsIgnoreCase(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.name())) {
+ replica = eligibleInstances.size();
+ } else {
+ LOG.error("Can not determine the replica count for resource " + idealState.getResourceName()
+ + ", set to 0.");
+ }
+ }
+
+ return replica;
+ }
+
+ private int getMinActiveReplica(IdealState idealState, int replicaCount) {
+ int minActiveReplicas = idealState.getMinActiveReplicas();
+ if (minActiveReplicas < 0) {
+ minActiveReplicas = replicaCount;
+ }
+ return minActiveReplicas;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
index 0385959..65149ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
public class AutoRebalanceStrategy implements RebalanceStrategy {
private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
- private final ReplicaPlacementScheme _placementScheme;
+ private final ReplicaPlacementScheme _placementScheme = new DefaultPlacementScheme();
private String _resourceName;
private List<String> _partitions;
@@ -59,7 +59,6 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode) {
init(resourceName, partitions, states, maximumPerNode);
- _placementScheme = new DefaultPlacementScheme();
}
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
@@ -67,6 +66,9 @@ public class AutoRebalanceStrategy implements RebalanceStrategy {
this(resourceName, partitions, states, Integer.MAX_VALUE);
}
+ public AutoRebalanceStrategy() {
+ }
+
@Override
public void init(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode) {
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index e9a39a4..fe493b1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -30,6 +30,7 @@ import org.apache.helix.controller.rebalancer.topology.Node;
import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
@@ -43,6 +44,8 @@ import java.util.Set;
* CRUSH-based partition mapping strategy.
*/
public class CrushRebalanceStrategy implements RebalanceStrategy {
+ private static final Logger Log = Logger.getLogger(CrushRebalanceStrategy.class.getName());
+
private String _resourceName;
private List<String> _partitions;
private Topology _clusterTopo;
@@ -83,6 +86,12 @@ public class CrushRebalanceStrategy implements RebalanceStrategy {
// apply the placement rules
List<Node> selected = select(topNode, data, _replicas);
+ if (selected.size() < _replicas) {
+ Log.warn(String
+ .format("Can not find enough node for resource %s partition %s, required %d, find %d",
+ _resourceName, partitionName, _replicas, selected.size()));
+ }
+
List<String> nodeList = new ArrayList<String>();
for (int j = 0; j < selected.size(); j++) {
nodeList.add(selected.get(j).getName());
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
new file mode 100644
index 0000000..bbc03d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -0,0 +1,146 @@
+package org.apache.helix.controller.rebalancer.util;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskUtil;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for trigger rebalancing of a set of resource in a future time.
+ */
+public class RebalanceScheduler {
+ private static final Logger LOG = Logger.getLogger(RebalanceScheduler.class);
+
+ private class ScheduledTask {
+ long _startTime;
+ Future _future;
+
+ public ScheduledTask(long _startTime, Future _future) {
+ this._startTime = _startTime;
+ this._future = _future;
+ }
+
+ public long getStartTime() {
+ return _startTime;
+ }
+
+ public Future getFuture() {
+ return _future;
+ }
+ }
+
+ private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>();
+ private final ScheduledExecutorService _rebalanceExecutor =
+ Executors.newSingleThreadScheduledExecutor();
+
+ /**
+ * Add a future rebalance task for resource at given startTime
+ *
+ * @param resource
+ * @param startTime time in milliseconds
+ */
+ public void scheduleRebalance(HelixManager manager, String resource, long startTime) {
+ // Do nothing if there is already a timer set for the this workflow with the same start time.
+ ScheduledTask existTask = _rebalanceTasks.get(resource);
+ if (existTask != null && existTask.getStartTime() == startTime) {
+ LOG.debug("Schedule timer for job: " + resource + " is up to date.");
+ return;
+ }
+
+ long delay = startTime - System.currentTimeMillis();
+ LOG.info(
+ "Schedule rebalance for resource : " + resource + " at time: " + startTime + " delay: "
+ + delay);
+
+ // For workflow not yet scheduled, schedule them and record it
+ RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource);
+ ScheduledFuture future =
+ _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
+ ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future));
+ if (prevTask != null && !prevTask.getFuture().isDone()) {
+ if (!prevTask.getFuture().cancel(false)) {
+ LOG.warn("Failed to cancel scheduled timer task for " + resource);
+ }
+ LOG.info("Remove previously scheduled timer task for " + resource);
+ }
+ }
+
+ /**
+ * Get the current schedule time for given resource.
+ *
+ * @param resource
+ * @return existing schedule time or NULL if there is no scheduled task for this resource
+ */
+ public long getRebalanceTime(String resource) {
+ ScheduledTask task = _rebalanceTasks.get(resource);
+ if (task != null) {
+ return task.getStartTime();
+ }
+ return -1;
+ }
+
+ /**
+ * Remove all existing future schedule tasks for the given resource
+ *
+ * @param resource
+ */
+ public void removeScheduledRebalance(String resource) {
+ ScheduledTask existTask = _rebalanceTasks.remove(resource);
+ if (existTask != null && !existTask.getFuture().isDone()) {
+ if (!existTask.getFuture().cancel(true)) {
+ LOG.warn("Failed to cancel scheduled timer task for " + resource);
+ }
+ LOG.info(
+ "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
+ + resource);
+ }
+ }
+
+ /**
+ * The simplest possible runnable that will trigger a run of the controller pipeline
+ */
+ private class RebalanceInvoker implements Runnable {
+ private final HelixManager _manager;
+ private final String _resource;
+
+ public RebalanceInvoker(HelixManager manager, String resource) {
+ _manager = manager;
+ _resource = resource;
+ }
+
+ @Override
+ public void run() {
+ invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+ }
+ }
+
+ /**
+ * Trigger the controller to perform rebalance for a given resource.
+ *
+ * @param accessor Helix data accessor
+ * @param resource the name of the resource changed to triggering the execution
+ */
+ public static void invokeRebalance(HelixDataAccessor accessor, String resource) {
+ LOG.info("invoke rebalance for " + resource);
+ PropertyKey key = accessor.keyBuilder().idealStates(resource);
+ IdealState is = accessor.getProperty(key);
+ if (is != null) {
+ if (!accessor.updateProperty(key, is)) {
+ LOG.warn("Failed to invoke rebalance on resource " + resource);
+ }
+ } else {
+ LOG.warn("Can't find ideal state for " + resource);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fe0f6e1..cba0659 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -147,30 +147,37 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
private Rebalancer getRebalancer(IdealState idealState, String resourceName) {
+
+ Rebalancer customizedRebalancer = null;
+ String rebalancerClassName = idealState.getRebalancerClassName();
+ if (rebalancerClassName != null) {
+ logger.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
+ try {
+ customizedRebalancer = Rebalancer.class
+ .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+ } catch (Exception e) {
+ logger.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+ }
+ }
+
Rebalancer rebalancer = null;
switch (idealState.getRebalanceMode()) {
case FULL_AUTO:
- AutoRebalancer autoRebalancer = new AutoRebalancer();
- rebalancer = autoRebalancer;
+ if (customizedRebalancer != null) {
+ rebalancer = customizedRebalancer;
+ } else {
+ rebalancer = new AutoRebalancer();
+ }
break;
case SEMI_AUTO:
- SemiAutoRebalancer semiAutoRebalancer = new SemiAutoRebalancer();
- rebalancer = semiAutoRebalancer;
+ rebalancer = new SemiAutoRebalancer();
break;
case CUSTOMIZED:
- CustomRebalancer customRebalancer = new CustomRebalancer();
- rebalancer = customRebalancer;
+ rebalancer = new CustomRebalancer();
break;
case USER_DEFINED:
case TASK:
- String rebalancerClassName = idealState.getRebalancerClassName();
- logger.info("resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
- try {
- rebalancer = Rebalancer.class
- .cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
- } catch (Exception e) {
- logger.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
- }
+ rebalancer = customizedRebalancer;
break;
default:
break;
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6b72442..648fd22 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -244,17 +244,12 @@ public class ClusterDataCache {
}
/**
- * Return the last offline time for given instance.
- * Return NULL if the instance is ONLINE currently, or the record is not persisted somehow.
+ * Return the last offline time map for all offline instances.
*
- * @param instanceName
* @return
*/
- public Long getInstanceOfflineTime(String instanceName) {
- if (_instanceOfflineTimeMap != null) {
- return _instanceOfflineTimeMap.get(instanceName);
- }
- return null;
+ public Map<String, Long> getInstanceOfflineTimeMap() {
+ return _instanceOfflineTimeMap;
}
private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
@@ -307,6 +302,81 @@ public class ClusterDataCache {
return _liveInstanceMap;
}
+
+ /**
+ * Return the set of all instances names.
+ */
+ public Set<String> getAllInstances() {
+ return new HashSet<String>(_instanceConfigMap.keySet());
+ }
+
+ /**
+ * Return all the live nodes that are enabled
+ * @return A new set contains live instance name and that are marked enabled
+ */
+ public Set<String> getEnabledLiveInstances() {
+ return getAllEnabledInstances(null);
+ }
+
+ /**
+ * Return all nodes that are enabled.
+ *
+ * @return
+ */
+ public Set<String> getEnabledInstances() {
+ Set<String> enabledNodes = new HashSet<String>(getInstanceConfigMap().keySet());
+ enabledNodes.removeAll(getDisabledInstances());
+
+ return enabledNodes;
+ }
+
+ /**
+ * Return all the live nodes that are enabled and tagged with given instanceTag.
+ *
+ * @param instanceTag The instance group tag.
+ * @return A new set contains live instance name and that are marked enabled and have the specified
+ * tag.
+ */
+ public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
+ return getAllEnabledInstances(instanceTag);
+ }
+
+ private Set<String> getAllEnabledInstances(String instanceTag) {
+ Set<String> enabledTagInstances = new HashSet<String>();
+ for (String instance : _liveInstanceMap.keySet()) {
+ InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+
+ // Check instance is enabled
+ if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
+ // Check whether it has instance group or not
+ // If it has instance group, check whether it belongs to that group or not
+ if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
+ enabledTagInstances.add(instance);
+ }
+ }
+ }
+
+ return enabledTagInstances;
+ }
+
+ /**
+ * Return all the nodes that are tagged with given instance tag.
+ *
+ * @param instanceTag The instance group tag.
+ */
+ public Set<String> getAllInstancesWithTag(String instanceTag) {
+ Set<String> taggedInstances = new HashSet<String>();
+ for (String instance : _instanceConfigMap.keySet()) {
+ InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+ if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
+ taggedInstances.add(instance);
+ }
+ }
+
+ return taggedInstances;
+ }
+
+
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
for (LiveInstance liveInstance : liveInstances) {
@@ -438,7 +508,6 @@ public class ClusterDataCache {
return disabledInstancesSet;
}
-
/**
* This method allows one to fetch the set of nodes that are disabled
* @return
@@ -495,42 +564,6 @@ public class ClusterDataCache {
}
/**
- * Return all the live nodes that are enabled
- * @return A new set contains live instance name and that are marked enabled
- */
- public Set<String> getAllEnabledLiveInstances() {
- return getAllEnabledInstances(null);
- }
-
- /**
- * Return all the live nodes that are enabled and tagged same as the job.
- * @param instanceTag The instance group tag, could be null, when no instance group specified
- * @return A new set contains live instance name and that are marked enabled and have same
- * tag with job, only if instance tag input is not null.
- */
- public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) {
- return getAllEnabledInstances(instanceTag);
- }
-
- private Set<String> getAllEnabledInstances(String instanceTag) {
- Set<String> enabledTagInstances = new HashSet<String>();
- for (String instance : _liveInstanceMap.keySet()) {
- InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
-
- // Check instance is enabled
- if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
- // Check whether it has instance group or not
- // If it has instance group, check whether it belongs to that group or not
- if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
- enabledTagInstances.add(instance);
- }
- }
- }
-
- return enabledTagInstances;
- }
-
- /**
* Indicate that a full read should be done on the next refresh
*/
public synchronized void requireFullRefresh() {
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 3c6c1ce..9c297f8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -142,7 +142,7 @@ public class PersistAssignmentStage extends AbstractBaseStage {
}
// if no master, just pick the first node in the preference list as the master.
- if (!hasMaster) {
+ if (!hasMaster && preferenceList.size() > 0) {
instanceMap.put(preferenceList.get(0), MasterSlaveSMD.States.MASTER.name());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 86b3c7c..ab46f94 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -50,6 +50,8 @@ public class IdealState extends HelixProperty {
STATE_MODEL_DEF_REF,
STATE_MODEL_FACTORY_NAME,
REPLICAS,
+ MIN_ACTIVE_REPLICAS,
+ REBALANCE_DELAY,
@Deprecated
IDEAL_STATE_MODE,
REBALANCE_MODE,
@@ -198,6 +200,22 @@ public class IdealState extends HelixProperty {
}
/**
+ * Set the delay time (in ms) that Helix should move the partition after an instance goes offline.
+ * @param delayInMilliseconds
+ */
+ public void setRebalanceDelay(long delayInMilliseconds) {
+ _record.setLongField(IdealStateProperty.REBALANCE_DELAY.toString(), delayInMilliseconds);
+ }
+
+ /**
+ * Get rebalance delay time (in ms).
+ * @return
+ */
+ public long getRebalanceDelay() {
+ return _record.getLongField(IdealStateProperty.REBALANCE_DELAY.toString(), 0);
+ }
+
+ /**
* Get the resource group name
*
* @return
@@ -409,6 +427,24 @@ public class IdealState extends HelixProperty {
}
/**
+ * Set the number of minimal active partitions for this resource.
+ *
+ * @param minActiveReplicas
+ */
+ public void setMinActiveReplicas(int minActiveReplicas) {
+ _record.setIntField(IdealStateProperty.MIN_ACTIVE_REPLICAS.toString(), minActiveReplicas);
+ }
+
+ /**
+ * Get the number of minimal active partitions for this resource.
+ *
+ * @return
+ */
+ public int getMinActiveReplicas() {
+ return _record.getIntField(IdealStateProperty.MIN_ACTIVE_REPLICAS.toString(), -1);
+ }
+
+ /**
* Set the number of replicas for each partition of this resource. There are documented special
* values for the replica count, so this is a String.
* @param replicas replica count (as a string)
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index fc1d021..420f0e5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -382,4 +383,49 @@ public class StateModelDefinition extends HelixProperty {
_stateTransitionTable.equals(stateModelDefinition._stateTransitionTable);
}
+ /**
+ * Get the state to its count map, order in its state priority.
+ *
+ * @return state count map: state->count
+ */
+ public static LinkedHashMap<String, Integer> getStateCountMap(
+ StateModelDefinition stateModelDef, int candidateNodeNum, int totalReplicas) {
+ LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+
+ int replicas = totalReplicas;
+ for (String state : statesPriorityList) {
+ String num = stateModelDef.getNumInstancesPerState(state);
+ if ("N".equals(num)) {
+ stateCountMap.put(state, candidateNodeNum);
+ } else if ("R".equals(num)) {
+ // wait until we get the counts for all other states
+ continue;
+ } else {
+ int stateCount = -1;
+ try {
+ stateCount = Integer.parseInt(num);
+ } catch (Exception e) {
+ // LOG.error("Invalid count for state: " + state + ", count: " + num +
+ // ", use -1 instead");
+ }
+
+ if (stateCount > 0) {
+ stateCountMap.put(state, stateCount);
+ replicas -= stateCount;
+ }
+ }
+ }
+
+ // get state count for R
+ for (String state : statesPriorityList) {
+ String num = stateModelDef.getNumInstancesPerState(state);
+ if ("R".equals(num)) {
+ stateCountMap.put(state, replicas);
+ // should have at most one state using R
+ break;
+ }
+ }
+ return stateCountMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index 712546c..e3000c2 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -33,10 +33,23 @@ public abstract class IdealStateBuilder {
* Number of partitions/subresources
*/
private int numPartitions;
+
/**
* Number of replicas for each partition
*/
private int numReplica;
+
+
+ /**
+ * Number of minimal active replicas for each partition
+ */
+ private int minActiveReplica = -1;
+
+ /**
+ * The delay time (in ms) that Helix should move the partition after an instance goes offline.
+ */
+ private long rebalanceDelayInMs = -1;
+
/**
* State model that is applicable for this resource
*/
@@ -108,6 +121,20 @@ public abstract class IdealStateBuilder {
}
/**
+ * @param minActiveReplica
+ * @return
+ */
+ public IdealStateBuilder setMinActiveReplica(int minActiveReplica) {
+ this.minActiveReplica = minActiveReplica;
+ return this;
+ }
+
+ public IdealStateBuilder setRebalanceDelay(int delayInMilliseconds) {
+ this.rebalanceDelayInMs = delayInMilliseconds;
+ return this;
+ }
+
+ /**
* @param numPartitions
*/
public IdealStateBuilder setNumPartitions(int numPartitions) {
@@ -217,6 +244,10 @@ public abstract class IdealStateBuilder {
idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode);
}
+ if (minActiveReplica >= 0) {
+ idealstate.setMinActiveReplicas(minActiveReplica);
+ }
+
if (rebalancerClassName != null) {
idealstate.setRebalancerClassName(rebalancerClassName);
}
@@ -241,10 +272,14 @@ public abstract class IdealStateBuilder {
idealstate.enableGroupRouting(enableGroupRouting);
}
+ if (rebalanceDelayInMs > 0) {
+ idealstate.setRebalanceDelay(rebalanceDelayInMs);
+ }
+
if (!idealstate.isValid()) {
throw new HelixException("invalid ideal-state: " + idealstate);
}
+
return idealstate;
}
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
index 98b32e2..0a43c0b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/DeprecatedTaskRebalancer.java
@@ -48,6 +48,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
@@ -1134,7 +1135,7 @@ public abstract class DeprecatedTaskRebalancer implements Rebalancer, MappingCal
@Override
public void run() {
- TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
+ RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index a36cb85..bd7e819 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -129,8 +129,8 @@ public class JobRebalancer extends TaskRebalancer {
// Fetch the previous resource assignment from the property store. This is required because of
// HELIX-230.
Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
- ? clusterData.getAllEnabledLiveInstances()
- : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+ ? clusterData.getEnabledLiveInstances()
+ : clusterData.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job!");
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 96cbbb8..5e39e17 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -51,6 +51,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -261,7 +262,7 @@ public class TaskDriver {
LOG.error("Failed to update workflow configuration for workflow " + workflow);
}
- TaskUtil.invokeRebalance(_accessor, workflow);
+ RebalanceScheduler.invokeRebalance(_accessor, workflow);
}
/**
@@ -601,7 +602,7 @@ public class TaskDriver {
addWorkflowResourceIfNecessary(queueName);
// Schedule the job
- TaskUtil.invokeRebalance(_accessor, queueName);
+ RebalanceScheduler.invokeRebalance(_accessor, queueName);
}
/**
@@ -752,7 +753,7 @@ public class TaskDriver {
paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
updaters.add(updater);
_accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
- TaskUtil.invokeRebalance(_accessor, workflowName);
+ RebalanceScheduler.invokeRebalance(_accessor, workflowName);
} else {
LOG.error("Configuration path " + cfgKey + " not found!");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 137a8fc..d4ac1b8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,14 +20,8 @@ package org.apache.helix.task;
*/
import java.util.Date;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
@@ -35,6 +29,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
@@ -55,7 +50,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
// For connection management
protected HelixManager _manager;
- protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer();
+ protected static RebalanceScheduler _scheduledRebalancer = new RebalanceScheduler();
protected ClusterStatusMonitor _clusterStatusMonitor;
@Override public void init(HelixManager manager) {
@@ -294,108 +289,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
return currentIdealState;
}
- // Management of already-scheduled rebalances across all task entities.
- protected static class ScheduledRebalancer {
- private class ScheduledTask {
- long _startTime;
- Future _future;
-
- public ScheduledTask(long _startTime, Future _future) {
- this._startTime = _startTime;
- this._future = _future;
- }
-
- public long getStartTime() {
- return _startTime;
- }
-
- public Future getFuture() {
- return _future;
- }
- }
-
- private final Map<String, ScheduledTask> _rebalanceTasks = new HashMap<String, ScheduledTask>();
- private final ScheduledExecutorService _rebalanceExecutor =
- Executors.newSingleThreadScheduledExecutor();
-
- /**
- * Add a future rebalance task for resource at given startTime
- *
- * @param resource
- * @param startTime time in milliseconds
- */
- public void scheduleRebalance(HelixManager manager, String resource, long startTime) {
- // Do nothing if there is already a timer set for the this workflow with the same start time.
- ScheduledTask existTask = _rebalanceTasks.get(resource);
- if (existTask != null && existTask.getStartTime() == startTime) {
- LOG.debug("Schedule timer for job: " + resource + " is up to date.");
- return;
- }
-
- long delay = startTime - System.currentTimeMillis();
- LOG.info("Schedule rebalance with job: " + resource + " at time: " + startTime + " delay: "
- + delay);
-
- // For workflow not yet scheduled, schedule them and record it
- RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(manager, resource);
- ScheduledFuture future =
- _rebalanceExecutor.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS);
- ScheduledTask prevTask = _rebalanceTasks.put(resource, new ScheduledTask(startTime, future));
- if (prevTask != null && !prevTask.getFuture().isDone()) {
- if (!prevTask.getFuture().cancel(false)) {
- LOG.warn("Failed to cancel scheduled timer task for " + resource);
- }
- }
- }
-
- /**
- * Get the current schedule time for given resource.
- *
- * @param resource
- * @return existing schedule time or NULL if there is no scheduled task for this resource
- */
- public long getRebalanceTime(String resource) {
- ScheduledTask task = _rebalanceTasks.get(resource);
- if (task != null) {
- return task.getStartTime();
- }
- return -1;
- }
-
- /**
- * Remove all existing future schedule tasks for the given resource
- *
- * @param resource
- */
- public void removeScheduledRebalance(String resource) {
- ScheduledTask existTask = _rebalanceTasks.remove(resource);
- if (existTask != null && !existTask.getFuture().isDone()) {
- if (!existTask.getFuture().cancel(true)) {
- LOG.warn("Failed to cancel scheduled timer task for " + resource);
- }
- LOG.info(
- "Remove scheduled rebalance task at time " + existTask.getStartTime() + " for resource: "
- + resource);
- }
- }
-
- /**
- * The simplest possible runnable that will trigger a run of the controller pipeline
- */
- private class RebalanceInvoker implements Runnable {
- private final HelixManager _manager;
- private final String _resource;
-
- public RebalanceInvoker(HelixManager manager, String resource) {
- _manager = manager;
- _resource = resource;
- }
-
- @Override public void run() {
- TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
- }
- }
- }
/**
* Set the ClusterStatusMonitor for metrics update
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d765cd5..9d69083 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -444,26 +444,6 @@ public class TaskUtil {
return Collections.emptyMap();
}
- /**
- * Trigger a controller pipeline execution for a given resource.
- *
- * @param accessor Helix data accessor
- * @param resource the name of the resource changed to triggering the execution
- */
- protected static void invokeRebalance(HelixDataAccessor accessor, String resource) {
- // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
- LOG.info("invoke rebalance for " + resource);
- PropertyKey key = accessor.keyBuilder().idealStates(resource);
- IdealState is = accessor.getProperty(key);
- if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
- if (!accessor.updateProperty(key, is)) {
- LOG.warn("Failed to invoke rebalance on resource " + resource);
- }
- } else {
- LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource);
- }
- }
-
private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.resourceConfig(resource));
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
index 8349243..970973c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
@@ -31,9 +31,11 @@ import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.log4j.Logger;
@@ -41,6 +43,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -252,8 +255,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
for (String resourceName : idealStates.keySet()) {
ExternalView extView = extViews.get(resourceName);
+ IdealState is = idealStates.get(resourceName);
if (extView == null) {
- IdealState is = idealStates.get(resourceName);
if (is.isExternalViewDisabled()) {
continue;
} else {
@@ -266,8 +269,10 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
Map<Partition, Map<String, String>> bpStateMap =
bestPossOutput.getResourceMap(resourceName);
- boolean result = verifyExternalView(extView, bpStateMap);
+ boolean result = verifyExternalView(is, extView, bpStateMap);
if (!result) {
+ LOG.debug("verifyExternalView fails! ExternalView: " + extView + " BestPossibleState: "
+ + bpStateMap);
return false;
}
}
@@ -278,35 +283,44 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
}
}
- private boolean verifyExternalView(ExternalView externalView,
+ private boolean verifyExternalView(IdealState idealState, ExternalView externalView,
Map<Partition, Map<String, String>> bestPossibleState) {
- // TODO: Is this necessary?
- // remove empty and dropped items.
- Iterator<Map.Entry<Partition, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<Partition, Map<String, String>> entry = iter.next();
+ StateModelDefinition stateModelDef =
+ BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+ .getStateModelDefinition();
+ Set<String> ignoreStaes = new HashSet<String>(
+ Arrays.asList(stateModelDef.getInitialState(), HelixDefinedState.DROPPED.toString()));
+
+ Map<String, Map<String, String>> bestPossibleStateMap =
+ convertBestPossibleState(bestPossibleState);
+ removeEntryWithIgnoredStates(bestPossibleStateMap.entrySet().iterator(), ignoreStaes);
+
+ Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
+ removeEntryWithIgnoredStates(externalViewMap.entrySet().iterator(), ignoreStaes);
+
+ return externalViewMap.equals(bestPossibleStateMap);
+ }
+
+ private void removeEntryWithIgnoredStates(
+ Iterator<Map.Entry<String, Map<String, String>>> partitionInstanceStateMapIter,
+ Set<String> ignoredStates) {
+ while (partitionInstanceStateMapIter.hasNext()) {
+ Map.Entry<String, Map<String, String>> entry = partitionInstanceStateMapIter.next();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.isEmpty()) {
- iter.remove();
+ partitionInstanceStateMapIter.remove();
} else {
- // remove instances with DROPPED state
+ // remove instances with DROPPED and OFFLINE state
Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
while (insIter.hasNext()) {
- Map.Entry<String, String> insEntry = insIter.next();
- String state = insEntry.getValue();
- if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+ String state = insIter.next().getValue();
+ if (ignoredStates.contains(state)) {
insIter.remove();
}
}
}
}
-
- Map<String, Map<String, String>> bestPossibleStateMap =
- convertBestPossibleState(bestPossibleState);
- Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
-
- return externalViewMap.equals(bestPossibleStateMap);
}
private Map<String, Map<String, String>> convertBestPossibleState(
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 9f70f29..ff20fa5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -603,7 +603,7 @@ public class TestAutoRebalanceStrategy {
// make sure that when the first node joins, a single replica is assigned fairly
List<String> partitions = ImmutableList.copyOf(PARTITIONS);
LinkedHashMap<String, Integer> stateCount =
- AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
ZNRecord znRecord =
new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
.computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
@@ -618,7 +618,7 @@ public class TestAutoRebalanceStrategy {
// now assign a replica to the first node in the current mapping, and add a second node
allNodes.add(NODES[1]);
liveNodes.add(NODES[1]);
- stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ stateCount = StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
for (String partition : PARTITIONS) {
currentMapping.get(partition).put(NODES[0], "MASTER");
}
@@ -657,7 +657,7 @@ public class TestAutoRebalanceStrategy {
// new node is never the most preferred
allNodes.add(NODES[2]);
liveNodes.add(NODES[2]);
- stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ stateCount = StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
// recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
@@ -708,7 +708,7 @@ public class TestAutoRebalanceStrategy {
// remove a node now, but use the current mapping with everything balanced just prior
liveNodes.remove(0);
- stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ stateCount = StateModelDefinition.getStateCountMap(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
// remove all references of n0 from the mapping, keep everything else in a legal state
for (String partition : PARTITIONS) {
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
new file mode 100644
index 0000000..ba7f46e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalance.java
@@ -0,0 +1,348 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestDelayedAutoRebalance extends ZkIntegrationTestBase {
+ final int NUM_NODE = 5;
+ protected static final int START_PORT = 12918;
+ protected static final int _PARTITIONS = 5;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ protected ClusterControllerManager _controller;
+
+ protected ClusterSetup _setupTool = null;
+ List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+ int _replica = 3;
+ HelixClusterVerifier _clusterVerifier;
+ List<String> _testDBs = new ArrayList<String>();
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(_gZkClient);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+ // start dummy participants
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ }
+
+ protected String[] TestStateModels = {
+ BuiltInStateModelDefinitions.MasterSlave.name(),
+ BuiltInStateModelDefinitions.OnlineOffline.name(),
+ BuiltInStateModelDefinitions.LeaderStandby.name()
+ };
+
+ /**
+ * The partition movement should be delayed (not happen immediately) after one single node goes offline.
+ * @throws Exception
+ */
+ @Test
+ public void testDelayedPartitionMovement() throws Exception {
+ Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+ Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>();
+
+ int minActiveReplica = _replica - 1;
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ IdealState idealState =
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ minActiveReplica, 100000);
+ _testDBs.add(db);
+ idealStates.put(db, idealState);
+ }
+
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViewsBefore.put(db, ev);
+ }
+
+ // bring down one node, no partition should be moved.
+ _participants.get(0).syncStop();
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+ _participants.get(0).getInstanceName());
+ }
+ }
+
+ /**
+ * Test when two nodes go offline, the minimal active replica should be maintained.
+ * @throws Exception
+ */
+ @Test
+ public void testMinimalActiveReplicaMaintain() throws Exception {
+ Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+ Map<String, ExternalView> externalViewsBefore = new HashMap<String, ExternalView>();
+
+ int minActiveReplica = _replica - 1;
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ IdealState idealState =
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ minActiveReplica, 100000);
+ _testDBs.add(db);
+ idealStates.put(db, idealState);
+ }
+
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViewsBefore.put(db, ev);
+ }
+
+ // bring down one node, no partition should be moved.
+ _participants.get(0).syncStop();
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ validateNoPartitionMove(idealStates.get(db), externalViewsBefore.get(db), ev,
+ _participants.get(0).getInstanceName());
+ }
+
+ // bring down another node, the minimal active replica for each partition should be maintained.
+ _participants.get(3).syncStop();
+ Thread.sleep(1000);
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ }
+ }
+
+ /**
+ * The partititon should be moved to other nodes after the delay time
+ */
+ @Test
+ public void testPartitionMovementAfterDelayTime() throws Exception {
+ Map<String, IdealState> idealStates = new HashMap<String, IdealState>();
+
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ long delay = 4000;
+ int minActiveReplica = _replica - 1;
+
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ IdealState idealState =
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ minActiveReplica, delay);
+ _testDBs.add(db);
+ idealStates.put(db, idealState);
+ }
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ // bring down one node, no partition should be moved.
+ _participants.get(0).syncStop();
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify());
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, minActiveReplica);
+ }
+
+ Thread.sleep(delay + 1000);
+ // after delay time, it should maintain required number of replicas.
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateMinActiveAndTopStateReplica(idealStates.get(db), ev, _replica);
+ }
+ }
+
+ @AfterMethod
+ public void afterTest() {
+ // delete all DBs create in last test
+ for (String db : _testDBs) {
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+ }
+ }
+
+ @BeforeMethod
+ public void beforeTest() {
+ // restart any participant that has been disconnected from last test.
+ for (int i = 0; i < _participants.size(); i++) {
+ if (!_participants.get(i).isConnected()) {
+ _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
+ _participants.get(i).getInstanceName()));
+ _participants.get(i).syncStart();
+ }
+ }
+ }
+
+ protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+ return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
+ minActiveReplica, delay, AutoRebalanceStrategy.class.getName());
+ }
+
+ protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica, long delay,
+ String rebalanceStrategy) {
+ _setupTool.addResourceToCluster(clusterName, db, numPartition, stateModel,
+ RebalanceMode.FULL_AUTO + "", rebalanceStrategy);
+
+ IdealState idealState =
+ _setupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+ idealState.setMinActiveReplicas(minActiveReplica);
+ idealState.setRebalanceDelay(delay);
+ idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ _setupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState);
+ _setupTool.rebalanceStorageCluster(clusterName, db, replica);
+ idealState =
+ _setupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+
+ return idealState;
+ }
+
+ /**
+ * Validate instances for each partition is on different zone and with necessary tagged instances.
+ */
+ protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, ExternalView evAfter,
+ String offlineInstance) {
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentsBefore = evBefore.getRecord().getMapField(partition);
+ Map<String, String> assignmentsAfter = evAfter.getRecord().getMapField(partition);
+
+ Set<String> instancesBefore = new HashSet<String>(assignmentsBefore.keySet());
+ Set<String> instancesAfter = new HashSet<String>(assignmentsAfter.keySet());
+ instancesBefore.remove(offlineInstance);
+
+ Assert.assertEquals(instancesBefore, instancesAfter, String
+ .format("%s has been moved to new instances, before: %s, after: %s, offline instance:",
+ partition, assignmentsBefore.toString(), assignmentsAfter.toString(),
+ offlineInstance));
+ }
+ }
+
+ /**
+ * Validate there should be always minimal active replica and top state replica for each partition
+ */
+ protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev,
+ int minActiveReplica) {
+ StateModelDefinition stateModelDef =
+ BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+ String topState = stateModelDef.getStatesPriorityList().get(0);
+ int replica = Integer.valueOf(is.getReplicas());
+
+ Map<String, Integer> stateCount =
+ StateModelDefinition.getStateCountMap(stateModelDef, NUM_NODE, replica);
+ Set<String> activeStates = stateCount.keySet();
+
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+
+ boolean hasTopState = false;
+ int activeReplica = 0;
+ for (String state : assignmentMap.values()) {
+ if (topState.equalsIgnoreCase(state)) {
+ hasTopState = true;
+ }
+ if (activeStates.contains(state)) {
+ activeReplica++;
+ }
+ }
+ Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState));
+ Assert.assertTrue(activeReplica >= minActiveReplica, String
+ .format("%s has less active replica %d then required %d", partition, activeReplica,
+ minActiveReplica));
+ }
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ /**
+ * shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ }
+ _setupTool.deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/b0d11228/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java
new file mode 100644
index 0000000..af11966
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDelayedAutoRebalanceWithRackaware.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.annotations.BeforeClass;
+
+import java.util.Date;
+
+public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance {
+ final int NUM_NODE = 9;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(_gZkClient);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ String zone = "zone-" + i % 3;
+ _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone);
+
+ // start dummy participants
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ }
+
+ @Override
+ protected IdealState createResourceWithDelayedRebalance(String clusterName, String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+ return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica,
+ minActiveReplica, delay, CrushRebalanceStrategy.class.getName());
+ }
+}