You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/20 19:41:59 UTC
[1/2] [HELIX-345] Speed up the controller pipeline
Repository: helix
Updated Branches:
refs/heads/master c8a644f4b -> 51329f6f0
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index ef5a5fd..a49feae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -56,7 +56,7 @@ public class MessageGenerationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
@@ -67,7 +67,7 @@ public class MessageGenerationStage extends AbstractBaseStage {
if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
|| bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+ + ". Requires HelixManager|Cluster|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
}
MessageOutput output = new MessageOutput();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 966160c..b5ed39e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -89,7 +89,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
Map<StateModelDefId, StateModelDefinition> stateModelDefMap = cluster.getStateModelMap();
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
@@ -101,7 +101,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
if (cluster == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null || bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE|MESSAGES_ALL");
+ + ". Requires Cluster|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE|MESSAGES_ALL");
}
MessageOutput output = new MessageOutput();
@@ -112,7 +112,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
stateModelDefMap.get(resource.getRebalancerConfig().getStateModelDefId());
if (stateModelDef == null) {
- LOG.info("resource: " + resourceId
+ LOG.info("resource: "
+ + resourceId
+ " doesn't have state-model-def; e.g. we add a resource config but not add the resource in ideal-states");
continue;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index 764b422..39bb228 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -116,7 +116,7 @@ public class MessageThrottleStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
MessageOutput msgSelectionOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
Map<ResourceId, ResourceConfig> resourceMap =
@@ -127,7 +127,7 @@ public class MessageThrottleStage extends AbstractBaseStage {
if (cluster == null || resourceMap == null || msgSelectionOutput == null
|| bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event: " + event
- + ". Requires ClusterDataCache|RESOURCES|BEST_POSSIBLE_STATE|MESSAGES_SELECTED");
+ + ". Requires Cluster|RESOURCES|BEST_POSSIBLE_STATE|MESSAGES_SELECTED");
}
MessageOutput output = new MessageOutput();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index b3252a8..a9b46e7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -19,29 +19,66 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.List;
+
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.Resource;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
/**
* Persist the ResourceAssignment of each resource that went through rebalancing
*/
public class PersistAssignmentStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(PersistAssignmentStage.class);
+
@Override
public void process(ClusterEvent event) throws Exception {
- HelixManager helixManager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
- HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
- ResourceAccessor resourceAccessor = new ResourceAccessor(cluster.getId(), accessor);
- BestPossibleStateOutput assignments =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- for (ResourceId resourceId : assignments.getAssignedResources()) {
- ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
- resourceAccessor.setResourceAssignment(resourceId, assignment);
+ LOG.info("START PersistAssignmentStage.process()");
+ long startTime = System.currentTimeMillis();
+
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ if (cache.assignmentWriteEnabled()) {
+ Cluster cluster = event.getAttribute("Cluster");
+ HelixManager helixManager = event.getAttribute("helixmanager");
+ HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ BestPossibleStateOutput assignments =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ List<ResourceAssignment> changedAssignments = Lists.newLinkedList();
+ List<PropertyKey> changedKeys = Lists.newLinkedList();
+ for (ResourceId resourceId : assignments.getAssignedResources()) {
+ ResourceAssignment assignment = assignments.getResourceAssignment(resourceId);
+ Resource resource = cluster.getResource(resourceId);
+ boolean toAdd = false;
+ if (resource != null) {
+ ResourceAssignment existAssignment = resource.getResourceAssignment();
+ if (existAssignment == null || !existAssignment.equals(assignment)) {
+ toAdd = true;
+ }
+ } else {
+ toAdd = true;
+ }
+ if (toAdd) {
+ changedAssignments.add(assignment);
+ changedKeys.add(keyBuilder.resourceAssignment(resourceId.toString()));
+ }
+ }
+
+ // update as a batch operation
+ if (changedAssignments.size() > 0) {
+ accessor.setChildren(changedKeys, changedAssignments);
+ }
}
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("END PersistAssignmentStage.process(), took " + (endTime - startTime) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
index 7c2cc0f..1a4aff3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistContextStage.java
@@ -48,8 +48,12 @@ public class PersistContextStage extends AbstractBaseStage {
// remove marked contexts
Set<ContextId> removedContexts = contextProvider.getRemovedContexts();
+ List<String> removedPaths = Lists.newLinkedList();
for (ContextId contextId : removedContexts) {
- accessor.removeProperty(keyBuilder.controllerContext(contextId.stringify()));
+ removedPaths.add(keyBuilder.controllerContext(contextId.stringify()).getPath());
+ }
+ if (removedPaths.size() > 0) {
+ accessor.getBaseDataAccessor().remove(removedPaths, 0);
}
// persist pending contexts
@@ -63,6 +67,9 @@ public class PersistContextStage extends AbstractBaseStage {
properties.add(holder);
}
}
- accessor.setChildren(keys, properties);
+
+ if (keys.size() > 0) {
+ accessor.setChildren(keys, properties);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 85252a0..91505a0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -44,6 +44,8 @@ import com.google.common.collect.Sets;
public class ReadClusterDataStage extends AbstractBaseStage {
private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
+ private ClusterDataCache _cache = null;
+
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
@@ -54,8 +56,15 @@ public class ReadClusterDataStage extends AbstractBaseStage {
throw new StageException("HelixManager attribute value is null");
}
HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+ if (cache == null && _cache == null) {
+ cache = new ClusterDataCache();
+ }
+ _cache = cache;
+
ClusterId clusterId = ClusterId.from(manager.getClusterName());
- ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor);
+ ClusterAccessor clusterAccessor = new ClusterAccessor(clusterId, accessor, _cache);
Cluster cluster = clusterAccessor.readCluster();
@@ -87,7 +96,8 @@ public class ReadClusterDataStage extends AbstractBaseStage {
disabledInstanceSet, disabledPartitions, tags);
}
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
+ event.addAttribute("ClusterDataCache", _cache);
// read contexts (if any)
Map<ContextId, ControllerContext> persistedContexts = null;
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 5b75535..16f15e1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -48,7 +48,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws StageException {
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (cluster == null) {
throw new StageException("Missing attributes in event: " + event + ". Requires Cluster");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index 470de2c..575a163 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -37,9 +37,9 @@ public class ResourceValidationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (cluster == null) {
- throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
+ throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
}
Map<ResourceId, ResourceConfig> resourceConfigMap =
event.getAttribute(AttributeName.RESOURCES.toString());
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index bc2ee50..aa47b4b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -55,15 +55,16 @@ public class TaskAssignmentStage extends AbstractBaseStage {
MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
+ ClusterDataCache cache = event.getAttribute("ClusterDataCache");
Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
if (manager == null || resourceMap == null || messageOutput == null || cluster == null
- || liveParticipantMap == null) {
+ || cache == null || liveParticipantMap == null) {
throw new StageException(
"Missing attributes in event:"
+ event
- + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|DataCache|liveInstanceMap");
+ + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|Cluster|DataCache|liveInstanceMap");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@@ -81,6 +82,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
manager.getProperties());
sendMessages(dataAccessor, outputMessages);
+ long cacheStart = System.currentTimeMillis();
+ cache.cacheMessages(outputMessages);
+ long cacheEnd = System.currentTimeMillis();
+ logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms");
+
long endTime = System.currentTimeMillis();
logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index bff7e46..d428d02 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.strategy;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -41,6 +42,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
+import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.Lists;
@@ -48,15 +50,15 @@ public class AutoRebalanceStrategy {
private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
- private final String _resourceName;
- private final List<String> _partitions;
- private final LinkedHashMap<String, Integer> _states;
+ private final ResourceId _resourceId;
+ private final List<PartitionId> _partitions;
+ private final LinkedHashMap<State, Integer> _states;
private final int _maximumPerNode;
private final ReplicaPlacementScheme _placementScheme;
- private Map<String, Node> _nodeMap;
+ private Map<ParticipantId, Node> _nodeMap;
private List<Node> _liveNodesList;
- private Map<Integer, String> _stateMap;
+ private Map<Integer, State> _stateMap;
private Map<Replica, Node> _preferredAssignment;
private Map<Replica, Node> _existingPreferredAssignment;
@@ -75,9 +77,18 @@ public class AutoRebalanceStrategy {
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode,
ReplicaPlacementScheme placementScheme) {
- _resourceName = resourceName;
- _partitions = partitions;
- _states = states;
+ _resourceId = ResourceId.from(resourceName);
+ _partitions =
+ Lists.newArrayList(Lists.transform(partitions, new Function<String, PartitionId>() {
+ @Override
+ public PartitionId apply(String input) {
+ return PartitionId.from(input);
+ }
+ }));
+ _states = new LinkedHashMap<State, Integer>();
+ for (String state : states.keySet()) {
+ _states.put(State.from(state), states.get(state));
+ }
_maximumPerNode = maximumPerNode;
if (placementScheme != null) {
_placementScheme = placementScheme;
@@ -107,14 +118,9 @@ public class AutoRebalanceStrategy {
public AutoRebalanceStrategy(ResourceId resourceId, final List<PartitionId> partitions,
final LinkedHashMap<State, Integer> states, int maximumPerNode,
ReplicaPlacementScheme placementScheme) {
- LinkedHashMap<String, Integer> rawStateCountMap = new LinkedHashMap<String, Integer>();
- for (State state : states.keySet()) {
- rawStateCountMap.put(state.toString(), states.get(state));
- }
- List<String> partitionNames = Lists.transform(partitions, Functions.toStringFunction());
- _resourceName = resourceId.stringify();
- _partitions = partitionNames;
- _states = rawStateCountMap;
+ _resourceId = resourceId;
+ _partitions = partitions;
+ _states = states;
_maximumPerNode = maximumPerNode;
if (placementScheme != null) {
_placementScheme = placementScheme;
@@ -134,37 +140,24 @@ public class AutoRebalanceStrategy {
public ZNRecord typedComputePartitionAssignment(final List<ParticipantId> liveNodes,
final Map<PartitionId, Map<ParticipantId, State>> currentMapping,
final List<ParticipantId> allNodes) {
- final List<String> rawLiveNodes = Lists.transform(liveNodes, Functions.toStringFunction());
- final List<String> rawAllNodes = Lists.transform(allNodes, Functions.toStringFunction());
- final Map<String, Map<String, String>> rawCurrentMapping =
- ResourceAssignment.stringMapsFromReplicaMaps(currentMapping);
- return computePartitionAssignment(rawLiveNodes, rawCurrentMapping, rawAllNodes);
- }
-
- /**
- * Determine a preference list and mapping of partitions to nodes for all replicas
- * @param liveNodes the current list of live participants
- * @param currentMapping the current assignment of replicas to nodes
- * @param allNodes the full list of known nodes in the system
- * @return the preference list and replica mapping
- */
- public ZNRecord computePartitionAssignment(final List<String> liveNodes,
- final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
- List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
- Collections.sort(sortedLiveNodes);
- List<String> sortedAllNodes = new ArrayList<String>(allNodes);
- Collections.sort(sortedAllNodes);
+ Comparator<ParticipantId> nodeComparator = new NodeComparator();
+ List<ParticipantId> sortedLiveNodes = new ArrayList<ParticipantId>(liveNodes);
+ Collections.sort(sortedLiveNodes, nodeComparator);
+ List<ParticipantId> sortedAllNodes = new ArrayList<ParticipantId>(allNodes);
+ Collections.sort(sortedAllNodes, nodeComparator);
+ List<String> sortedNodeNames =
+ Lists.newArrayList(Lists.transform(sortedAllNodes, Functions.toStringFunction()));
int numReplicas = countStateReplicas();
- ZNRecord znRecord = new ZNRecord(_resourceName);
+ ZNRecord znRecord = new ZNRecord(_resourceId.stringify());
if (sortedLiveNodes.size() == 0) {
return znRecord;
}
int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
- _nodeMap = new HashMap<String, Node>();
+ _nodeMap = new HashMap<ParticipantId, Node>();
_liveNodesList = new ArrayList<Node>();
- for (String id : sortedAllNodes) {
+ for (ParticipantId id : sortedAllNodes) {
Node node = new Node(id);
node.capacity = 0;
node.hasCeilingCapacity = false;
@@ -189,7 +182,7 @@ public class AutoRebalanceStrategy {
_stateMap = generateStateMap();
// compute the preferred mapping if all nodes were up
- _preferredAssignment = computePreferredPlacement(sortedAllNodes);
+ _preferredAssignment = computePreferredPlacement(sortedNodeNames);
// logger.info("preferred mapping:"+ preferredAssignment);
// from current mapping derive the ones in preferred location
@@ -216,6 +209,31 @@ public class AutoRebalanceStrategy {
}
/**
+ * Determine a preference list and mapping of partitions to nodes for all replicas
+ * @param liveNodes the current list of live participants
+ * @param currentMapping the current assignment of replicas to nodes
+ * @param allNodes the full list of known nodes in the system
+ * @return the preference list and replica mapping
+ */
+ public ZNRecord computePartitionAssignment(final List<String> liveNodes,
+ final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
+
+ Function<String, ParticipantId> participantConverter = new Function<String, ParticipantId>() {
+ @Override
+ public ParticipantId apply(String participantId) {
+ return ParticipantId.from(participantId);
+ }
+ };
+ List<ParticipantId> typedLiveNodes =
+ Lists.newArrayList(Lists.transform(liveNodes, participantConverter));
+ List<ParticipantId> typedAllNodes =
+ Lists.newArrayList(Lists.transform(allNodes, participantConverter));
+ Map<PartitionId, Map<ParticipantId, State>> typedCurrentMapping =
+ ResourceAssignment.replicaMapsFromStringMaps(currentMapping);
+ return typedComputePartitionAssignment(typedLiveNodes, typedCurrentMapping, typedAllNodes);
+ }
+
+ /**
* Move replicas assigned to non-preferred nodes if their current node is at capacity
* and its preferred node is under capacity.
*/
@@ -355,10 +373,11 @@ public class AutoRebalanceStrategy {
// This is useful to verify that there is no node serving multiple replicas of the same
// partition.
Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
- for (String partition : _partitions) {
- znRecord.setMapField(partition, new TreeMap<String, String>());
- znRecord.setListField(partition, new ArrayList<String>());
- newPreferences.put(partition, new ArrayList<String>());
+ for (PartitionId partition : _partitions) {
+ String partitionName = partition.stringify();
+ znRecord.setMapField(partitionName, new TreeMap<String, String>());
+ znRecord.setListField(partitionName, new ArrayList<String>());
+ newPreferences.put(partitionName, new ArrayList<String>());
}
// for preference lists, the rough priority that we want is:
@@ -367,29 +386,29 @@ public class AutoRebalanceStrategy {
for (Node node : _liveNodesList) {
for (Replica replica : node.preferred) {
if (node.newReplicas.contains(replica)) {
- newPreferences.get(replica.partition).add(node.id);
+ newPreferences.get(replica.partition.toString()).add(node.id.toString());
} else {
- znRecord.getListField(replica.partition).add(node.id);
+ znRecord.getListField(replica.partition.toString()).add(node.id.toString());
}
}
}
for (Node node : _liveNodesList) {
for (Replica replica : node.nonPreferred) {
if (node.newReplicas.contains(replica)) {
- newPreferences.get(replica.partition).add(node.id);
+ newPreferences.get(replica.partition.toString()).add(node.id.toString());
} else {
- znRecord.getListField(replica.partition).add(node.id);
+ znRecord.getListField(replica.partition.toString()).add(node.id.toString());
}
}
}
normalizePreferenceLists(znRecord.getListFields(), newPreferences);
// generate preference maps based on the preference lists
- for (String partition : _partitions) {
- List<String> preferenceList = znRecord.getListField(partition);
+ for (PartitionId partition : _partitions) {
+ List<String> preferenceList = znRecord.getListField(partition.toString());
int i = 0;
for (String participant : preferenceList) {
- znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+ znRecord.getMapField(partition.toString()).put(participant, _stateMap.get(i).toString());
i++;
}
}
@@ -429,12 +448,12 @@ public class AutoRebalanceStrategy {
List<String> newPreferenceList = new ArrayList<String>();
int replicas = Math.min(countStateReplicas(), preferenceList.size());
for (int i = 0; i < replicas; i++) {
- String state = _stateMap.get(i);
+ State state = _stateMap.get(i);
String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
newPreferenceList.add(node);
notAssigned.remove(node);
Map<String, Integer> counts = nodeReplicaCounts.get(node);
- counts.put(state, counts.get(state) + 1);
+ counts.put(state.toString(), counts.get(state.toString()) + 1);
}
preferenceList.clear();
preferenceList.addAll(newPreferenceList);
@@ -447,7 +466,7 @@ public class AutoRebalanceStrategy {
* @param nodeReplicaCounts current assignment of replicas
* @return the node most willing to accept the replica
*/
- private String getMinimumNodeForReplica(String state, Set<String> nodes,
+ private String getMinimumNodeForReplica(State state, Set<String> nodes,
Map<String, Map<String, Integer>> nodeReplicaCounts) {
String minimalNode = null;
int minimalCount = Integer.MAX_VALUE;
@@ -468,17 +487,17 @@ public class AutoRebalanceStrategy {
* @param nodeReplicaCounts a map of node to replica id and counts
* @return the number of currently assigned replicas of the given id
*/
- private int getReplicaCountForNode(String state, String node,
+ private int getReplicaCountForNode(State state, String node,
Map<String, Map<String, Integer>> nodeReplicaCounts) {
if (!nodeReplicaCounts.containsKey(node)) {
Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
- replicaCounts.put(state, 0);
+ replicaCounts.put(state.toString(), 0);
nodeReplicaCounts.put(node, replicaCounts);
return 0;
}
Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
if (!replicaCounts.containsKey(state)) {
- replicaCounts.put(state, 0);
+ replicaCounts.put(state.toString(), 0);
return 0;
}
return replicaCounts.get(state);
@@ -491,12 +510,12 @@ public class AutoRebalanceStrategy {
* @return The current assignments that do not conform to the preferred assignment
*/
private Map<Replica, Node> computeExistingNonPreferredPlacement(
- Map<String, Map<String, String>> currentMapping) {
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- for (String nodeId : nodeStateMap.keySet()) {
+ for (PartitionId partition : currentMapping.keySet()) {
+ Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
+ for (ParticipantId nodeId : nodeStateMap.keySet()) {
Node node = _nodeMap.get(nodeId);
boolean skip = false;
for (Replica replica : node.preferred) {
@@ -560,12 +579,12 @@ public class AutoRebalanceStrategy {
* @return Assignments that conform to the preferred placement
*/
private Map<Replica, Node> computeExistingPreferredPlacement(
- final Map<String, Map<String, String>> currentMapping) {
+ final Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- for (String nodeId : nodeStateMap.keySet()) {
+ for (PartitionId partition : currentMapping.keySet()) {
+ Map<ParticipantId, State> nodeStateMap = currentMapping.get(partition);
+ for (ParticipantId nodeId : nodeStateMap.keySet()) {
Node node = _nodeMap.get(nodeId);
node.currentlyAssigned = node.currentlyAssigned + 1;
// check if its in one of the preferred position
@@ -591,18 +610,18 @@ public class AutoRebalanceStrategy {
* @param allNodes Identifiers to all nodes, live and non-live
* @return Preferred assignment of replicas
*/
- private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
+ private Map<Replica, Node> computePreferredPlacement(final List<String> nodeNames) {
Map<Replica, Node> preferredMapping;
preferredMapping = new HashMap<Replica, Node>();
int partitionId = 0;
int numReplicas = countStateReplicas();
int count = countStateReplicas();
- for (String partition : _partitions) {
+ for (PartitionId partition : _partitions) {
for (int replicaId = 0; replicaId < count; replicaId++) {
Replica replica = new Replica(partition, replicaId);
- String nodeName =
- _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
- allNodes);
+ ParticipantId nodeName =
+ ParticipantId.from(_placementScheme.getLocation(partitionId, replicaId,
+ _partitions.size(), numReplicas, nodeNames));
preferredMapping.put(replica, _nodeMap.get(nodeName));
}
partitionId = partitionId + 1;
@@ -627,10 +646,10 @@ public class AutoRebalanceStrategy {
* Compute a map of replica ids to state names
* @return Map: replica id -> state name
*/
- private Map<Integer, String> generateStateMap() {
+ private Map<Integer, State> generateStateMap() {
int replicaId = 0;
- Map<Integer, String> stateMap = new HashMap<Integer, String>();
- for (String state : _states.keySet()) {
+ Map<Integer, State> stateMap = new HashMap<Integer, State>();
+ for (State state : _states.keySet()) {
Integer count = _states.get(state);
for (int i = 0; i < count; i++) {
stateMap.put(replicaId, state);
@@ -648,13 +667,13 @@ public class AutoRebalanceStrategy {
public int currentlyAssigned;
public int capacity;
public boolean hasCeilingCapacity;
- private String id;
+ private ParticipantId id;
boolean isAlive;
private List<Replica> preferred;
private List<Replica> nonPreferred;
private Set<Replica> newReplicas;
- public Node(String id) {
+ public Node(ParticipantId id) {
preferred = new ArrayList<Replica>();
nonPreferred = new ArrayList<Replica>();
newReplicas = new TreeSet<Replica>();
@@ -716,8 +735,8 @@ public class AutoRebalanceStrategy {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
- .append("\nnonpreferred:").append(nonPreferred.size());
+ sb.append("##########\nname=").append(id.toString()).append("\npreferred:")
+ .append(preferred.size()).append("\nnonpreferred:").append(nonPreferred.size());
return sb.toString();
}
}
@@ -727,14 +746,14 @@ public class AutoRebalanceStrategy {
* and an identifier signifying a specific replica of a given partition and state.
*/
class Replica implements Comparable<Replica> {
- private String partition;
+ private PartitionId partition;
private int replicaId; // this is a partition-relative id
private String format;
- public Replica(String partition, int replicaId) {
+ public Replica(PartitionId partition, int replicaId) {
this.partition = partition;
this.replicaId = replicaId;
- this.format = this.partition + "|" + this.replicaId;
+ this.format = this.partition.toString() + "|" + this.replicaId;
}
@Override
@@ -816,4 +835,11 @@ public class AutoRebalanceStrategy {
return nodeNames.get(index);
}
}
+
+ private static class NodeComparator implements Comparator<ParticipantId> {
+ @Override
+ public int compare(ParticipantId o1, ParticipantId o2) {
+ return o1.toString().compareTo(o2.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 471530c..7527751 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -110,6 +110,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
boolean success = false;
switch (type) {
+ case RESOURCEASSIGNMENTS:
case IDEALSTATES:
case EXTERNALVIEW:
// check if bucketized
@@ -200,6 +201,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
// ZNRecord record = null;
switch (type) {
+ case RESOURCEASSIGNMENTS:
case CURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
@@ -251,6 +253,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
}
switch (type) {
+ case RESOURCEASSIGNMENTS:
case CURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
@@ -313,6 +316,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
if (children != null) {
for (ZNRecord record : children) {
switch (type) {
+ case RESOURCEASSIGNMENTS:
case CURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
@@ -419,6 +423,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
HelixProperty value = children.get(i);
switch (type) {
+ case RESOURCEASSIGNMENTS:
case EXTERNALVIEW:
if (value.getBucketSize() == 0) {
records.add(value.getRecord());
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 173e251..9aff196 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -42,6 +42,12 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
+import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
import com.google.common.base.Enums;
@@ -72,7 +78,8 @@ public class IdealState extends HelixProperty {
REBALANCE_TIMER_PERIOD,
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
- REBALANCER_CLASS_NAME
+ REBALANCER_CLASS_NAME,
+ REBALANCER_CONFIG_NAME
}
public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -215,6 +222,42 @@ public class IdealState extends HelixProperty {
}
/**
+ * Set the RebalancerConfig implementation class for this resource
+ * @param clazz the class object
+ */
+ public void setRebalancerConfigClass(Class<? extends RebalancerConfig> clazz) {
+ String className = clazz.getName();
+ _record.setSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString(), className);
+ }
+
+ /**
+ * Get the class representing the rebalancer config of this resource
+ * @return The rebalancer config class
+ */
+ public Class<? extends RebalancerConfig> getRebalancerConfigClass() {
+ // try to extract the class from the persisted data
+ String className = _record.getSimpleField(IdealStateProperty.REBALANCER_CONFIG_NAME.toString());
+ if (className != null) {
+ try {
+ return HelixUtil.loadClass(getClass(), className).asSubclass(RebalancerConfig.class);
+ } catch (ClassNotFoundException e) {
+ logger.error(className + " is not a valid class");
+ }
+ }
+ // the fallback is to use the mode
+ switch (getRebalanceMode()) {
+ case FULL_AUTO:
+ return FullAutoRebalancerConfig.class;
+ case SEMI_AUTO:
+ return SemiAutoRebalancerConfig.class;
+ case CUSTOMIZED:
+ return CustomRebalancerConfig.class;
+ default:
+ return PartitionedRebalancerConfig.class;
+ }
+ }
+
+ /**
* Set the maximum number of partitions of this resource that an instance can serve
* @param max the maximum number of partitions supported
*/
@@ -280,13 +323,21 @@ public class IdealState extends HelixProperty {
* @return a set of partition names
*/
public Set<String> getPartitionSet() {
- if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
- || getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+ switch (getRebalanceMode()) {
+ case SEMI_AUTO:
+ case FULL_AUTO:
return _record.getListFields().keySet();
- } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED
- || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ case CUSTOMIZED:
return _record.getMapFields().keySet();
- } else {
+ case USER_DEFINED:
+ Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
+ if (configClass.equals(SemiAutoRebalancerConfig.class)
+ || configClass.equals(FullAutoRebalancerConfig.class)) {
+ return _record.getListFields().keySet();
+ } else {
+ return _record.getMapFields().keySet();
+ }
+ default:
logger.error("Invalid ideal state mode:" + getResourceName());
return Collections.emptySet();
}
@@ -360,8 +411,19 @@ public class IdealState extends HelixProperty {
* @return set of instance names
*/
public Set<String> getInstanceSet(String partitionName) {
- if (getRebalanceMode() == RebalanceMode.SEMI_AUTO
- || getRebalanceMode() == RebalanceMode.FULL_AUTO) {
+ boolean useListFields = false;
+ RebalanceMode rebalanceMode = getRebalanceMode();
+ if (rebalanceMode == RebalanceMode.USER_DEFINED) {
+ Class<? extends RebalancerConfig> configClass = getRebalancerConfigClass();
+ if (configClass.equals(SemiAutoRebalancerConfig.class)
+ || configClass.equals(FullAutoRebalancerConfig.class)) {
+ // override: if the user defined rebalancer expects auto-type inputs, use the list fields
+ useListFields = true;
+ }
+ }
+ if (useListFields || rebalanceMode == RebalanceMode.SEMI_AUTO
+ || rebalanceMode == RebalanceMode.FULL_AUTO) {
+ // get instances from list fields
List<String> prefList = _record.getListField(partitionName);
if (prefList != null) {
return new TreeSet<String>(prefList);
@@ -369,8 +431,9 @@ public class IdealState extends HelixProperty {
logger.warn(partitionName + " does NOT exist");
return Collections.emptySet();
}
- } else if (getRebalanceMode() == RebalanceMode.CUSTOMIZED
- || getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ } else if (rebalanceMode == RebalanceMode.CUSTOMIZED
+ || rebalanceMode == RebalanceMode.USER_DEFINED) {
+ // get instances from map fields
Map<String, String> stateMap = _record.getMapField(partitionName);
if (stateMap != null) {
return new TreeSet<String>(stateMap.keySet());
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 6da3dc2..c9a07ef 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -141,7 +141,7 @@ public class ResourceAssignment extends HelixProperty {
* @param rawMaps the map of partition name to participant name and state
* @return converted maps
*/
- public static Map<? extends PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
+ public static Map<PartitionId, Map<ParticipantId, State>> replicaMapsFromStringMaps(
Map<String, Map<String, String>> rawMaps) {
if (rawMaps == null) {
return Collections.emptyMap();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 8c5b863..aae58a8 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -112,4 +112,14 @@ public class ResourceConfiguration extends HelixProperty {
RebalancerConfigHolder config = new RebalancerConfigHolder(this);
return config.getRebalancerConfig(clazz);
}
+
+ /**
+ * Check if this resource config has a rebalancer config
+ * @return true if a rebalancer config is attached, false otherwise
+ */
+ public boolean hasRebalancerConfig() {
+ return _record.getSimpleFields().containsKey(
+ RebalancerConfigHolder.class.getSimpleName() + NamespacedConfig.PREFIX_CHAR
+ + RebalancerConfigHolder.Fields.REBALANCER_CONFIG);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index d1bce56..f52ef0a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,10 +19,6 @@ package org.apache.helix.task;
* under the License.
*/
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -54,6 +50,10 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
/**
* Custom rebalancer implementation for the {@code Task} state model.
*/
@@ -74,6 +74,9 @@ public class TaskRebalancer implements HelixRebalancer {
// Fetch task configuration
TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
+ if (taskCfg == null) {
+ return emptyAssignment(resourceId);
+ }
String workflowResource = taskCfg.getWorkflow();
// Fetch workflow configuration and context
@@ -333,7 +336,6 @@ public class TaskRebalancer implements HelixRebalancer {
// Remove the set of task partitions that are completed or in one of the error states.
pSet.removeAll(donePartitions);
}
-
if (isTaskComplete(taskCtx, allPartitions)) {
workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
if (isWorkflowComplete(workflowCtx, workflowConfig)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index a9428c6..df7288a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,10 +19,10 @@ package org.apache.helix.task;
* under the License.
*/
-import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
@@ -37,6 +37,8 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.log4j.Logger;
+import com.google.common.base.Joiner;
+
/**
* Static utility methods.
*/
@@ -67,8 +69,10 @@ public class TaskUtil {
*/
public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
+ if (taskCfg == null) {
+ return null;
+ }
TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-
return b.build();
}
@@ -107,8 +111,7 @@ public class TaskUtil {
ZNRecord r =
manager.getHelixPropertyStore().get(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
- TaskUtilEnum.PREV_RA_NODE.value()),
- null, AccessOption.PERSISTENT);
+ TaskUtilEnum.PREV_RA_NODE.value()), null, AccessOption.PERSISTENT);
return r != null ? new ResourceAssignment(r) : null;
}
@@ -116,24 +119,21 @@ public class TaskUtil {
ResourceAssignment ra) {
manager.getHelixPropertyStore().set(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
- TaskUtilEnum.PREV_RA_NODE.value()),
- ra.getRecord(), AccessOption.PERSISTENT);
+ TaskUtilEnum.PREV_RA_NODE.value()), ra.getRecord(), AccessOption.PERSISTENT);
}
public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
ZNRecord r =
manager.getHelixPropertyStore().get(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
- TaskUtilEnum.CONTEXT_NODE.value()),
- null, AccessOption.PERSISTENT);
+ TaskUtilEnum.CONTEXT_NODE.value()), null, AccessOption.PERSISTENT);
return r != null ? new TaskContext(r) : null;
}
public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
manager.getHelixPropertyStore().set(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
- TaskUtilEnum.CONTEXT_NODE.value()),
- ctx.getRecord(), AccessOption.PERSISTENT);
+ TaskUtilEnum.CONTEXT_NODE.value()), ctx.getRecord(), AccessOption.PERSISTENT);
}
public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
@@ -148,8 +148,7 @@ public class TaskUtil {
WorkflowContext ctx) {
manager.getHelixPropertyStore().set(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
- TaskUtilEnum.CONTEXT_NODE.value()),
- ctx.getRecord(), AccessOption.PERSISTENT);
+ TaskUtilEnum.CONTEXT_NODE.value()), ctx.getRecord(), AccessOption.PERSISTENT);
}
public static String getNamespacedTaskName(String singleTaskWorkflow) {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index a0959cc..0cc4d4c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -36,13 +36,11 @@ import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
@@ -68,7 +66,8 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
boolean verifyLiveNodes(List<ParticipantId> actualLiveNodes) {
Collections.sort(actualLiveNodes);
- List<String> rawActualLiveNodes = Lists.transform(actualLiveNodes, Functions.toStringFunction());
+ List<String> rawActualLiveNodes =
+ Lists.transform(actualLiveNodes, Functions.toStringFunction());
return _expectSortedLiveNodes.equals(rawActualLiveNodes);
}
@@ -97,7 +96,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
BestPossibleStateOutput calculateBestPossibleState(Cluster cluster) throws Exception {
ClusterEvent event = new ClusterEvent("event");
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
List<Stage> stages = new ArrayList<Stage>();
stages.add(new ResourceComputationStage());
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index da9d76e..973736b 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -440,7 +440,7 @@ public class ClusterStateVerifier {
static BestPossibleStateOutput calcBestPossState(Cluster cluster) throws Exception {
ClusterEvent event = new ClusterEvent("sampleEvent");
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
ResourceComputationStage rcState = new ResourceComputationStage();
CurrentStateComputationStage csStage = new CurrentStateComputationStage();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
index 9ec1ef9..2d577ca 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
@@ -140,6 +140,14 @@ public class YAMLClusterSetup {
}
helixAdmin.addResource(cfg.clusterName, resource.name, partitions,
resource.stateModel.name, resource.rebalancer.get("mode"));
+
+ // batch message mode
+ if (resource.batchMessageMode != null && resource.batchMessageMode) {
+ IdealState idealState = helixAdmin.getResourceIdealState(cfg.clusterName, resource.name);
+ idealState.setBatchMessageMode(true);
+ helixAdmin.setResourceIdealState(cfg.clusterName, resource.name, idealState);
+ }
+
// user-defined rebalancer
if (resource.rebalancer.containsKey("class")
&& resource.rebalancer.get("mode").equals(RebalanceMode.USER_DEFINED.toString())) {
@@ -268,6 +276,7 @@ public class YAMLClusterSetup {
public Map<String, Integer> partitions;
public StateModelConfig stateModel;
public ConstraintsConfig constraints;
+ public Boolean batchMessageMode;
public static class StateModelConfig {
public String name;
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 919eb00..a89d27f 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -602,7 +602,7 @@ public class Mocks {
String[] keySplit = key.split("\\/");
String[] pathSplit = path.split("\\/");
if (keySplit.length > pathSplit.length) {
- child.add(keySplit[pathSplit.length + 1]);
+ child.add(keySplit[pathSplit.length]);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
index ab4ead0..4f81729 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -127,7 +127,7 @@ public class TestNewStages extends ZkUnitTestBase {
}
});
event.addAttribute(AttributeName.RESOURCES.toString(), resourceConfigMap);
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
// Run the stage
try {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 2e17ad3..e55c682 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -35,22 +35,13 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
@@ -174,7 +165,6 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
clusterAccessor.initClusterStructure();
-
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
@@ -232,6 +222,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
+ ClusterDataCache cache = new ClusterDataCache();
+ event.addAttribute("ClusterDataCache", cache);
+
final String resourceName = "testResource_pending";
String[] resourceGroups = new String[] {
resourceName
@@ -288,6 +281,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
// message, make sure controller should not send O->DROPPEDN until O->S is done
HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.dropResource(clusterName, resourceName);
+ List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates());
+ cache.setIdealStates(idealStates);
runPipeline(event, dataRefresh);
runPipeline(event, rebalancePipeline);
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
index ece46ff..b275e9c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceValidationStage.java
@@ -76,7 +76,7 @@ public class TestResourceValidationStage {
ClusterId clusterId = new ClusterId("sampleClusterId");
ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
Cluster cluster = clusterAccessor.readCluster();
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(),
clusterConfiguration.getIdealStateRules());
@@ -113,7 +113,7 @@ public class TestResourceValidationStage {
ClusterId clusterId = new ClusterId("sampleClusterId");
ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
Cluster cluster = clusterAccessor.readCluster();
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
Map<String, Map<String, String>> emptyMap = Maps.newHashMap();
event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(), emptyMap);
@@ -148,7 +148,7 @@ public class TestResourceValidationStage {
ClusterId clusterId = new ClusterId("sampleClusterId");
ClusterAccessor clusterAccessor = new MockClusterAccessor(clusterId, accessor);
Cluster cluster = clusterAccessor.readCluster();
- event.addAttribute("ClusterDataCache", cluster);
+ event.addAttribute("Cluster", cluster);
Map<String, Map<String, String>> emptyMap = Maps.newHashMap();
event.addAttribute(AttributeName.IDEAL_STATE_RULES.toString(), emptyMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
new file mode 100644
index 0000000..78927f9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
@@ -0,0 +1,151 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * The controller pipeline will only update ideal states, live instances, and instance configs
+ * when the change. However, if a controller loses leadership and subsequently regains it, we need
+ * to ensure that the controller can verify its cache. That's what this test is for.
+ */
+public class TestReelectedPipelineCorrectness extends ZkUnitTestBase {
+ @Test
+ public void testReelection() throws Exception {
+ final int NUM_CONTROLLERS = 2;
+ final int NUM_PARTICIPANTS = 4;
+ final int NUM_PARTITIONS = 8;
+ final int NUM_REPLICAS = 2;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+ // configure distributed controllers
+ String controllerCluster = clusterName + "_controllers";
+ setupTool.addCluster(controllerCluster, true);
+ for (int i = 0; i < NUM_CONTROLLERS; i++) {
+ setupTool.addInstanceToCluster(controllerCluster, "controller_" + i);
+ }
+ setupTool.activateCluster(clusterName, controllerCluster, true);
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ final String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ // start controllers
+ ClusterDistributedController[] controllers = new ClusterDistributedController[NUM_CONTROLLERS];
+ for (int i = 0; i < NUM_CONTROLLERS; i++) {
+ controllers[i] =
+ new ClusterDistributedController(ZK_ADDR, controllerCluster, "controller_" + i);
+ controllers[i].syncStart();
+ }
+ Thread.sleep(1000);
+
+ // Ensure a balanced cluster
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable the leader, resulting in a leader election
+ HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+ LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+ String leaderId = leader.getId();
+ String standbyId = (leaderId.equals("controller_0")) ? "controller_1" : "controller_0";
+ HelixAdmin admin = setupTool.getClusterManagementTool();
+ admin.enableInstance(controllerCluster, leaderId, false);
+
+ // Stop a participant to make sure that the leader election worked
+ Thread.sleep(500);
+ participants[0].syncStop();
+ Thread.sleep(500);
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Disable the original standby (leaving 0 active controllers) and kill another participant
+ admin.enableInstance(controllerCluster, standbyId, false);
+ Thread.sleep(500);
+ participants[1].syncStop();
+
+ // Also change the ideal state
+ IdealState idealState = admin.getResourceIdealState(clusterName, "TestDB0");
+ idealState.setMaxPartitionsPerInstance(1);
+ admin.setResourceIdealState(clusterName, "TestDB0", idealState);
+ Thread.sleep(500);
+
+ // Also disable an instance in the main cluster
+ admin.enableInstance(clusterName, "localhost_12920", false);
+
+ // Re-enable the original leader
+ admin.enableInstance(controllerCluster, leaderId, true);
+
+ // Now check that both the ideal state and the live instances are adhered to by the rebalance
+ Thread.sleep(500);
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // cleanup
+ for (int i = 0; i < NUM_CONTROLLERS; i++) {
+ controllers[i].syncStop();
+ }
+ for (int i = 2; i < NUM_PARTICIPANTS; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("STOP " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
index 27004fe..2a026c2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -79,26 +79,30 @@ public class TestUserDefRebalancerCompatibility extends
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
- boolean result =
- ClusterStateVerifier
- .verifyByZkCallback(new TestCustomizedIdealStateRebalancer.ExternalViewBalancedVerifier(
- _gZkClient, CLUSTER_NAME, db2));
- Assert.assertTrue(result);
- Thread.sleep(1000);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
- ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
- Assert.assertEquals(ev.getPartitionSet().size(), 60);
- for (String partition : ev.getPartitionSet()) {
- Assert.assertEquals(ev.getStateMap(partition).size(), 1);
- }
- IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
- for (PartitionId partition : is.getPartitionIdSet()) {
- Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
- Assert.assertEquals(is.getParticipantStateMap(partition).size(), 3);
+ try {
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new TestCustomizedIdealStateRebalancer.ExternalViewBalancedVerifier(
+ _gZkClient, CLUSTER_NAME, db2));
+ Assert.assertTrue(result);
+ Thread.sleep(1000);
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
+ Assert.assertEquals(ev.getPartitionSet().size(), 60);
+ for (String partition : ev.getPartitionSet()) {
+ Assert.assertEquals(ev.getStateMap(partition).size(), 1);
+ }
+ IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
+ for (PartitionId partition : is.getPartitionIdSet()) {
+ Assert.assertEquals(is.getPreferenceList(partition).size(), 3);
+ Assert.assertEquals(is.getParticipantStateMap(partition).size(), 3);
+ }
+ Assert.assertTrue(testRebalancerCreated);
+ Assert.assertTrue(testRebalancerInvoked);
+ } finally {
+ _setupTool.dropResourceFromCluster(CLUSTER_NAME, db2);
}
- Assert.assertTrue(testRebalancerCreated);
- Assert.assertTrue(testRebalancerInvoked);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 46af6c4..a00db67 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -198,8 +198,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
// participant goes away. should be no change in number of beans as config is still present
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
+ Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered);
+ Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
String firstControllerName =
@@ -215,8 +215,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Thread.sleep(1000);
// 1 cluster status monitor, 1 resource monitor, 5 instances
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+ Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+ Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
String instanceName = "localhost0_" + (12918 + 0);
_participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
@@ -224,8 +224,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
// participant goes back. should be no change
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+ Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+ Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
// Add a resource, one more mbean registered
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
@@ -237,14 +237,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
Integer.parseInt(idealState.getReplicas()));
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+ Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+ Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
// remove resource, no change
setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
Thread.sleep(1000);
- Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
- Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+ Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
+ Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
}
}
[2/2] git commit: [HELIX-345] Speed up the controller pipeline
Posted by ka...@apache.org.
[HELIX-345] Speed up the controller pipeline
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/51329f6f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/51329f6f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/51329f6f
Branch: refs/heads/master
Commit: 51329f6f0bbb8a43cdfd06ffd3bf7ac5c8a93c68
Parents: c8a644f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Feb 4 10:50:11 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Feb 20 10:41:46 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/ConfigAccessor.java | 9 +-
.../main/java/org/apache/helix/api/Cluster.java | 27 +-
.../helix/api/accessor/ClusterAccessor.java | 268 ++++++++++++------
.../helix/api/accessor/ResourceAccessor.java | 42 ++-
.../apache/helix/api/config/ClusterConfig.java | 185 +-----------
.../controller/GenericHelixController.java | 40 +++
.../rebalancer/FullAutoRebalancer.java | 16 +-
.../controller/rebalancer/RebalancerRef.java | 27 +-
.../config/CustomRebalancerConfig.java | 7 +-
.../config/FullAutoRebalancerConfig.java | 7 +-
.../config/PartitionedRebalancerConfig.java | 57 +++-
.../config/RebalancerConfigHolder.java | 2 +-
.../config/SemiAutoRebalancerConfig.java | 7 +-
.../stages/BestPossibleStateCalcStage.java | 46 ++-
.../controller/stages/ClusterDataCache.java | 282 +++++++++++++++++--
.../stages/CompatibilityCheckStage.java | 4 +-
.../stages/CurrentStateComputationStage.java | 4 +-
.../stages/ExternalViewComputeStage.java | 4 +-
.../stages/MessageGenerationStage.java | 4 +-
.../stages/MessageSelectionStage.java | 7 +-
.../controller/stages/MessageThrottleStage.java | 4 +-
.../stages/PersistAssignmentStage.java | 57 +++-
.../controller/stages/PersistContextStage.java | 11 +-
.../controller/stages/ReadClusterDataStage.java | 14 +-
.../stages/ResourceComputationStage.java | 2 +-
.../stages/ResourceValidationStage.java | 4 +-
.../controller/stages/TaskAssignmentStage.java | 12 +-
.../strategy/AutoRebalanceStrategy.java | 186 ++++++------
.../helix/manager/zk/ZKHelixDataAccessor.java | 5 +
.../java/org/apache/helix/model/IdealState.java | 83 +++++-
.../apache/helix/model/ResourceAssignment.java | 2 +-
.../helix/model/ResourceConfiguration.java | 10 +
.../org/apache/helix/task/TaskRebalancer.java | 12 +-
.../java/org/apache/helix/task/TaskUtil.java | 23 +-
.../tools/ClusterExternalViewVerifier.java | 7 +-
.../helix/tools/ClusterStateVerifier.java | 2 +-
.../apache/helix/tools/YAMLClusterSetup.java | 9 +
.../src/test/java/org/apache/helix/Mocks.java | 2 +-
.../org/apache/helix/api/TestNewStages.java | 2 +-
.../stages/TestRebalancePipeline.java | 17 +-
.../stages/TestResourceValidationStage.java | 6 +-
.../TestReelectedPipelineCorrectness.java | 151 ++++++++++
.../TestUserDefRebalancerCompatibility.java | 44 +--
.../TestClusterStatusMonitorLifecycle.java | 20 +-
44 files changed, 1188 insertions(+), 542 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index f46e537..3589165 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ConfigScope;
@@ -489,7 +490,13 @@ public class ConfigAccessor {
List<String> retKeys = null;
if (scope.isFullKey()) {
- ZNRecord record = zkClient.readData(zkPath);
+ ZNRecord record;
+ try {
+ record = zkClient.readData(zkPath);
+ } catch (ZkNoNodeException e) {
+ LOG.warn(zkPath + " no longer exists");
+ return Collections.emptyList();
+ }
if (mapKey == null) {
retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
} else {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 98072d1..adaf200 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -34,10 +34,8 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SpectatorId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.context.ControllerContext;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
@@ -91,8 +89,6 @@ public class Cluster {
* @param constraintMap
* @param stateModelMap
* @param contextMap
- * @param stats
- * @param alerts
* @param userConfig
* @param isPaused
* @param autoJoinAllowed
@@ -101,8 +97,8 @@ public class Cluster {
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
Map<StateModelDefId, StateModelDefinition> stateModelMap,
- Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
- UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+ Map<ContextId, ControllerContext> contextMap, UserConfig userConfig, boolean isPaused,
+ boolean autoJoinAllowed) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -124,8 +120,7 @@ public class Cluster {
new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
.addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
.addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
- .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
- .build();
+ .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -240,22 +235,6 @@ public class Cluster {
}
/**
- * Get all the persisted stats for the cluster
- * @return PersistentStats instance
- */
- public PersistentStats getStats() {
- return _config.getStats();
- }
-
- /**
- * Get all the persisted alerts for the cluster
- * @return Alerts instance
- */
- public Alerts getAlerts() {
- return _config.getAlerts();
- }
-
- /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index abb3e49..5ecc210 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.api.accessor;
* under the License.
*/
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,9 +55,11 @@ import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.context.ControllerContext;
import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
@@ -86,15 +89,27 @@ public class ClusterAccessor {
private final PropertyKey.Builder _keyBuilder;
private final ClusterId _clusterId;
+ private final ClusterDataCache _cache;
+
/**
* Instantiate a cluster accessor
* @param clusterId the cluster to access
* @param accessor HelixDataAccessor for the physical store
*/
public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ this(clusterId, accessor, new ClusterDataCache());
+ }
+
+ /**
+ * Instantiate a cluster accessor
+ * @param clusterId the cluster to access
+ * @param accessor HelixDataAccessor for the physical store
+ */
+ public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor, ClusterDataCache cache) {
_accessor = accessor;
_keyBuilder = accessor.keyBuilder();
_clusterId = clusterId;
+ _cache = cache;
}
/**
@@ -129,9 +144,6 @@ public class ClusterAccessor {
if (cluster.autoJoinAllowed()) {
clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
}
- if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
- _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
- }
if (cluster.isPaused()) {
pauseCluster();
}
@@ -173,16 +185,6 @@ public class ClusterAccessor {
ClusterConstraints constraint = constraints.get(type);
_accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
}
- if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
- _accessor.removeProperty(_keyBuilder.persistantStat());
- } else {
- _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
- }
- if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
- _accessor.removeProperty(_keyBuilder.alerts());
- } else {
- _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
- }
return true;
}
@@ -218,19 +220,22 @@ public class ClusterAccessor {
LOG.error("Cluster is not fully set up");
return null;
}
- LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+ // refresh the cache
+ _cache.refresh(_accessor);
+
+ LiveInstance leader = _cache.getLeader();
/**
* map of constraint-type to constraints
*/
- Map<String, ClusterConstraints> constraintMap =
- _accessor.getChildValuesMap(_keyBuilder.constraints());
+ Map<String, ClusterConstraints> constraintMap = _cache.getConstraintMap();
// read all the resources
- Map<ResourceId, Resource> resourceMap = readResources();
+ Map<ResourceId, Resource> resourceMap = readResources(true);
// read all the participants
- Map<ParticipantId, Participant> participantMap = readParticipants();
+ Map<ParticipantId, Participant> participantMap = readParticipants(true);
// read the controllers
Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
@@ -249,10 +254,10 @@ public class ClusterAccessor {
}
// read the pause status
- PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ PauseSignal pauseSignal = _cache.getPauseSignal();
boolean isPaused = pauseSignal != null;
- ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ ClusterConfiguration clusterConfig = _cache.getClusterConfig();
boolean autoJoinAllowed = false;
UserConfig userConfig;
if (clusterConfig != null) {
@@ -263,21 +268,14 @@ public class ClusterAccessor {
}
// read the state model definitions
- Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
-
- // read the stats
- PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-
- // read the alerts
- Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+ Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions(true);
// read controller context
- Map<ContextId, ControllerContext> contextMap = readControllerContext();
+ Map<ContextId, ControllerContext> contextMap = readControllerContext(true);
// create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
- autoJoinAllowed);
+ clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed);
}
/**
@@ -285,9 +283,22 @@ public class ClusterAccessor {
* @return map of state model def id to state model definition
*/
public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+ return readStateModelDefinitions(false);
+ }
+
+ /**
+ * Get all the state model definitions for this cluster
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of state model def id to state model definition
+ */
+ private Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions(boolean useCache) {
Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
- List<StateModelDefinition> stateModelList =
- _accessor.getChildValues(_keyBuilder.stateModelDefs());
+ Collection<StateModelDefinition> stateModelList;
+ if (useCache) {
+ stateModelList = _cache.getStateModelDefMap().values();
+ } else {
+ stateModelList = _accessor.getChildValues(_keyBuilder.stateModelDefs());
+ }
for (StateModelDefinition stateModelDef : stateModelList) {
stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
}
@@ -299,35 +310,70 @@ public class ClusterAccessor {
* @return map of resource id to resource
*/
public Map<ResourceId, Resource> readResources() {
- if (!isClusterStructureValid()) {
+ return readResources(false);
+ }
+
+ /**
+ * Read all resources in the cluster
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of resource id to resource
+ */
+ private Map<ResourceId, Resource> readResources(boolean useCache) {
+ if (!useCache && !isClusterStructureValid()) {
LOG.error("Cluster is not fully set up yet!");
return Collections.emptyMap();
}
- /**
- * map of resource-id to ideal-state
- */
- Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
-
- /**
- * Map of resource id to external view
- */
- Map<String, ExternalView> externalViewMap =
- _accessor.getChildValuesMap(_keyBuilder.externalViews());
+ Map<String, IdealState> idealStateMap;
+ Map<String, ResourceConfiguration> resourceConfigMap;
+ Map<String, ExternalView> externalViewMap;
+ Map<String, ResourceAssignment> resourceAssignmentMap;
+ if (useCache) {
+ idealStateMap = _cache.getIdealStates();
+ resourceConfigMap = _cache.getResourceConfigs();
+ } else {
+ idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+ resourceConfigMap = _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+ }
- /**
- * Map of resource id to user configuration
- */
- Map<String, ResourceConfiguration> resourceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+ // check if external view and resource assignment reads are required
+ boolean extraReadsRequired = false;
+ for (String resourceName : idealStateMap.keySet()) {
+ if (extraReadsRequired) {
+ break;
+ }
+ // a rebalancer can be user defined if it has that mode set, or has a different rebalancer
+ // class
+ IdealState idealState = idealStateMap.get(resourceName);
+ extraReadsRequired =
+ extraReadsRequired || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED);
+ RebalancerRef ref = idealState.getRebalancerRef();
+ if (ref != null) {
+ extraReadsRequired =
+ extraReadsRequired
+ || !PartitionedRebalancerConfig.isBuiltinRebalancer(ref.getRebalancerClass());
+ }
+ }
+ for (String resourceName : resourceConfigMap.keySet()) {
+ if (extraReadsRequired) {
+ break;
+ }
+ extraReadsRequired =
+ extraReadsRequired || resourceConfigMap.get(resourceName).hasRebalancerConfig();
+ }
- /**
- * Map of resource id to resource assignment
- */
- Map<String, ResourceAssignment> resourceAssignmentMap =
- _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+ // now read external view and resource assignments if needed
+ if (!useCache || extraReadsRequired) {
+ externalViewMap = _accessor.getChildValuesMap(_keyBuilder.externalViews());
+ resourceAssignmentMap = _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+ _cache.setAssignmentWritePolicy(true);
+ } else {
+ externalViewMap = Maps.newHashMap();
+ resourceAssignmentMap = Maps.newHashMap();
+ _cache.setAssignmentWritePolicy(false);
+ }
- // read all the resources
+ // populate all the resources
Set<String> allResources = Sets.newHashSet();
allResources.addAll(idealStateMap.keySet());
allResources.addAll(resourceConfigMap.keySet());
@@ -347,45 +393,58 @@ public class ClusterAccessor {
* @return map of participant id to participant, or empty map
*/
public Map<ParticipantId, Participant> readParticipants() {
- if (!isClusterStructureValid()) {
+ return readParticipants(false);
+ }
+
+ /**
+ * Read all participants in the cluster
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of participant id to participant, or empty map
+ */
+ private Map<ParticipantId, Participant> readParticipants(boolean useCache) {
+ if (!useCache && !isClusterStructureValid()) {
LOG.error("Cluster is not fully set up yet!");
return Collections.emptyMap();
}
-
- /**
- * map of instance-id to instance-config
- */
- Map<String, InstanceConfig> instanceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
-
- /**
- * map of instance-id to live-instance
- */
- Map<String, LiveInstance> liveInstanceMap =
- _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+ Map<String, InstanceConfig> instanceConfigMap;
+ Map<String, LiveInstance> liveInstanceMap;
+ if (useCache) {
+ instanceConfigMap = _cache.getInstanceConfigMap();
+ liveInstanceMap = _cache.getLiveInstances();
+ } else {
+ instanceConfigMap = _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+ liveInstanceMap = _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+ }
/**
* map of participant-id to map of message-id to message
*/
- Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
- for (String instanceName : liveInstanceMap.keySet()) {
- Map<String, Message> instanceMsgMap =
- _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
- messageMap.put(instanceName, instanceMsgMap);
+ Map<String, Map<String, Message>> messageMap = Maps.newHashMap();
+ for (String participantName : liveInstanceMap.keySet()) {
+ Map<String, Message> instanceMsgMap;
+ if (useCache) {
+ instanceMsgMap = _cache.getMessages(participantName);
+ } else {
+ instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+ }
+ messageMap.put(participantName, instanceMsgMap);
}
/**
* map of participant-id to map of resource-id to current-state
*/
- Map<String, Map<String, CurrentState>> currentStateMap =
- new HashMap<String, Map<String, CurrentState>>();
+ Map<String, Map<String, CurrentState>> currentStateMap = Maps.newHashMap();
for (String participantName : liveInstanceMap.keySet()) {
LiveInstance liveInstance = liveInstanceMap.get(participantName);
SessionId sessionId = liveInstance.getTypedSessionId();
- Map<String, CurrentState> instanceCurStateMap =
- _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
- sessionId.stringify()));
-
+ Map<String, CurrentState> instanceCurStateMap;
+ if (useCache) {
+ instanceCurStateMap = _cache.getCurrentState(participantName, sessionId.stringify());
+ } else {
+ instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+ }
currentStateMap.put(participantName, instanceCurStateMap);
}
@@ -472,8 +531,21 @@ public class ClusterAccessor {
* @return map of context id to controller context
*/
public Map<ContextId, ControllerContext> readControllerContext() {
- Map<String, ControllerContextHolder> contextHolders =
- _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+ return readControllerContext(false);
+ }
+
+ /**
+ * Read the persisted controller contexts
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of context id to controller context
+ */
+ private Map<ContextId, ControllerContext> readControllerContext(boolean useCache) {
+ Map<String, ControllerContextHolder> contextHolders;
+ if (useCache) {
+ contextHolders = _cache.getContextMap();
+ } else {
+ contextHolders = _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+ }
Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
for (String contextName : contextHolders.keySet()) {
contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
@@ -482,6 +554,22 @@ public class ClusterAccessor {
}
/**
+ * Get the current cluster stats
+ * @return PersistentStats
+ */
+ public PersistentStats getStats() {
+ return _accessor.getProperty(_keyBuilder.persistantStat());
+ }
+
+ /**
+ * Get the current cluster alerts
+ * @return Alerts
+ */
+ public Alerts getAlerts() {
+ return _accessor.getProperty(_keyBuilder.alerts());
+ }
+
+ /**
* Add a statistic specification to the cluster. Existing stat specifications will not be
* overwritten
* @param statName string representing a stat specification
@@ -673,14 +761,22 @@ public class ClusterAccessor {
return false;
}
+ // Create an IdealState from a RebalancerConfig (if the resource supports it)
+ IdealState idealState =
+ ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+ resource.getBucketSize(), resource.getBatchMessageMode());
+ if (idealState != null) {
+ _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ }
+
// Add resource user config
if (resource.getUserConfig() != null) {
ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
configuration.setType(resource.getType());
configuration.addNamespacedConfig(resource.getUserConfig());
PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
- if (partitionedConfig == null
- || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ if (idealState == null
+ && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
// only persist if this is not easily convertible to an ideal state
configuration
.addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
@@ -688,14 +784,6 @@ public class ClusterAccessor {
}
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
-
- // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
- IdealState idealState =
- ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
- resource.getBucketSize(), resource.getBatchMessageMode());
- if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
- }
return true;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 73d43b0..a1d6580 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@ -137,14 +138,19 @@ public class ResourceAccessor {
*/
private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
RebalancerConfig rebalancerConfig) {
- boolean status =
- _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ boolean status = true;
+ if (configuration != null) {
+ status =
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ }
// set an ideal state if the resource supports it
IdealState idealState =
rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
configuration.getBatchMessageMode());
if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ status =
+ status
+ && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
}
return status;
}
@@ -253,7 +259,14 @@ public class ResourceAccessor {
}
ResourceId resourceId = resourceConfig.getId();
ResourceConfiguration config = new ResourceConfiguration(resourceId);
- config.addNamespacedConfig(resourceConfig.getUserConfig());
+ UserConfig userConfig = resourceConfig.getUserConfig();
+ if (userConfig != null
+ && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig
+ .getMapFields().isEmpty())) {
+ config.addNamespacedConfig(userConfig);
+ } else {
+ userConfig = null;
+ }
PartitionedRebalancerConfig partitionedConfig =
PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
if (partitionedConfig == null
@@ -261,9 +274,11 @@ public class ResourceAccessor {
// only persist if this is not easily convertible to an ideal state
config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
.toNamespacedConfig());
+ config.setBucketSize(resourceConfig.getBucketSize());
+ config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+ } else if (userConfig == null) {
+ config = null;
}
- config.setBucketSize(resourceConfig.getBucketSize());
- config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
return true;
}
@@ -387,9 +402,17 @@ public class ResourceAccessor {
boolean batchMessageMode) {
PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
if (partitionedConfig != null) {
+ if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
+ // don't proceed if this resource cannot be described by an ideal state
+ return null;
+ }
IdealState idealState = new IdealState(partitionedConfig.getResourceId());
idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
- idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+
+ RebalancerRef ref = partitionedConfig.getRebalancerRef();
+ if (ref != null) {
+ idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+ }
String replicas = null;
if (partitionedConfig.anyLiveParticipant()) {
replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
@@ -404,13 +427,14 @@ public class ResourceAccessor {
idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
idealState.setBucketSize(bucketSize);
idealState.setBatchMessageMode(batchMessageMode);
- if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ idealState.setRebalancerConfigClass(config.getClass());
+ if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
SemiAutoRebalancerConfig semiAutoConfig =
BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
}
- } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
CustomRebalancerConfig customConfig =
BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
for (PartitionId partitionId : customConfig.getPartitionSet()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 22a1528..ddc98fa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,8 +5,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ClusterId;
@@ -14,14 +12,12 @@ import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -61,8 +57,6 @@ public class ClusterConfig {
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
- private final PersistentStats _stats;
- private final Alerts _alerts;
private final UserConfig _userConfig;
private final boolean _isPaused;
private final boolean _autoJoin;
@@ -74,8 +68,6 @@ public class ClusterConfig {
* @param participantMap map of participant id to participant config
* @param constraintMap map of constraint type to all constraints of that type
* @param stateModelMap map of state model id to state model definition
- * @param stats statistics to watch on the cluster
- * @param alerts alerts that the cluster can trigger
* @param userConfig user-defined cluster properties
* @param isPaused true if paused, false if active
* @param allowAutoJoin true if participants can join automatically, false otherwise
@@ -83,15 +75,13 @@ public class ClusterConfig {
private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
Map<ParticipantId, ParticipantConfig> participantMap,
Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
- Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused, boolean allowAutoJoin) {
_id = id;
_resourceMap = ImmutableMap.copyOf(resourceMap);
_participantMap = ImmutableMap.copyOf(participantMap);
_constraintMap = ImmutableMap.copyOf(constraintMap);
_stateModelMap = ImmutableMap.copyOf(stateModelMap);
- _stats = stats;
- _alerts = alerts;
_userConfig = userConfig;
_isPaused = isPaused;
_autoJoin = allowAutoJoin;
@@ -237,22 +227,6 @@ public class ClusterConfig {
}
/**
- * Get all the statistics persisted on the cluster
- * @return PersistentStats instance
- */
- public PersistentStats getStats() {
- return _stats;
- }
-
- /**
- * Get all the alerts persisted on the cluster
- * @return Alerts instance
- */
- public Alerts getAlerts() {
- return _alerts;
- }
-
- /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
@@ -287,8 +261,6 @@ public class ClusterConfig {
private Set<Fields> _updateFields;
private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
- private PersistentStats _removedStats;
- private Alerts _removedAlerts;
private Builder _builder;
/**
@@ -302,8 +274,6 @@ public class ClusterConfig {
Set<ConstraintId> constraints = Sets.newHashSet();
_removedConstraints.put(type, constraints);
}
- _removedStats = new PersistentStats(PersistentStats.nodeName);
- _removedAlerts = new Alerts(Alerts.nodeName);
_builder = new Builder(clusterId);
}
@@ -431,57 +401,6 @@ public class ClusterConfig {
}
/**
- * Add a statistic specification to the cluster. Existing specifications will not be overwritten
- * @param stat string specifying the stat specification
- * @return Delta
- */
- public Delta addStat(String stat) {
- _builder.addStat(stat);
- return this;
- }
-
- /**
- * Add an alert specification for the cluster. Existing specifications will not be overwritten
- * @param alert string specifying the alert specification
- * @return Delta
- */
- public Delta addAlert(String alert) {
- _builder.addAlert(alert);
- return this;
- }
-
- /**
- * Remove a statistic specification from the cluster
- * @param stat statistic specification
- * @return Delta
- */
- public Delta removeStat(String stat) {
- Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
- Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- return this;
- }
-
- /**
- * Remove an alert specification for the cluster
- * @param alert alert specification
- * @return Delta
- */
- public Delta removeAlert(String alert) {
- Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
- if (!currAlertMap.containsKey(alert)) {
- Map<String, String> parsedAlert = Maps.newHashMap();
- StringBuilder statsName = new StringBuilder();
- AlertsHolder.parseAlert(alert, statsName, parsedAlert);
- removeStat(statsName.toString());
- currAlertMap.put(alert, parsedAlert);
- }
- return this;
- }
-
- /**
* Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
* @param orig the original ClusterConfig
* @return updated ClusterConfig
@@ -494,8 +413,7 @@ public class ClusterConfig {
.addParticipants(orig.getParticipantMap().values())
.addStateModelDefinitions(orig.getStateModelMap().values())
.userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
- .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
- .addAlerts(orig.getAlerts());
+ .autoJoin(orig.autoJoinAllowed());
for (Fields field : _updateFields) {
switch (field) {
case USER_CONFIG:
@@ -529,29 +447,8 @@ public class ClusterConfig {
builder.addConstraint(constraints);
}
- // add stats and alerts
- builder.addStats(deltaConfig.getStats());
- builder.addAlerts(deltaConfig.getAlerts());
-
// get the result
- ClusterConfig result = builder.build();
-
- // remove stats
- PersistentStats stats = result.getStats();
- for (String removedStat : _removedStats.getMapFields().keySet()) {
- if (stats.getMapFields().containsKey(removedStat)) {
- stats.getMapFields().remove(removedStat);
- }
- }
-
- // remove alerts
- Alerts alerts = result.getAlerts();
- for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
- if (alerts.getMapFields().containsKey(removedAlert)) {
- alerts.getMapFields().remove(removedAlert);
- }
- }
- return result;
+ return builder.build();
}
}
@@ -565,8 +462,6 @@ public class ClusterConfig {
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
private UserConfig _userConfig;
- private PersistentStats _stats;
- private Alerts _alerts;
private boolean _isPaused;
private boolean _autoJoin;
@@ -583,8 +478,6 @@ public class ClusterConfig {
_isPaused = false;
_autoJoin = false;
_userConfig = new UserConfig(Scope.cluster(id));
- _stats = new PersistentStats(PersistentStats.nodeName);
- _alerts = new Alerts(Alerts.nodeName);
}
/**
@@ -789,74 +682,6 @@ public class ClusterConfig {
}
/**
- * Add a statistic specification to the cluster. Existing specifications will not be overwritten
- * @param stat String specifying the stat specification
- * @return Builder
- */
- public Builder addStat(String stat) {
- Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
- Map<String, Map<String, String>> currentStats = _stats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- if (!currentStats.containsKey(statName)) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- }
- return this;
- }
-
- /**
- * Add statistic specifications to the cluster. Existing specifications will not be overwritten
- * @param stats PersistentStats specifying the stat specification
- * @return Builder
- */
- public Builder addStats(PersistentStats stats) {
- if (stats == null) {
- return this;
- }
- Map<String, Map<String, String>> parsedStat = stats.getMapFields();
- Map<String, Map<String, String>> currentStats = _stats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- if (!currentStats.containsKey(statName)) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- }
- return this;
- }
-
- /**
- * Add alert specifications to the cluster. Existing specifications will not be overwritten
- * @param alert string representing alert specifications
- * @return Builder
- */
- public Builder addAlert(String alert) {
- Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
- if (!currAlertMap.containsKey(alert)) {
- Map<String, String> parsedAlert = Maps.newHashMap();
- StringBuilder statsName = new StringBuilder();
- AlertsHolder.parseAlert(alert, statsName, parsedAlert);
- addStat(statsName.toString());
- currAlertMap.put(alert, parsedAlert);
- }
- return this;
- }
-
- /**
- * Add alert specifications to the cluster. Existing specifications will not be overwritten
- * @param alerts Alerts instance
- * @return Builder
- */
- public Builder addAlerts(Alerts alerts) {
- if (alerts == null) {
- return this;
- }
- Map<String, Map<String, String>> alertMap = alerts.getMapFields();
- for (String alert : alertMap.keySet()) {
- addAlert(alert);
- }
- return this;
- }
-
- /**
* Set the paused status of the cluster
* @param isPaused true if paused, false otherwise
* @return Builder
@@ -892,7 +717,7 @@ public class ClusterConfig {
*/
public ClusterConfig build() {
return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
- _stats, _alerts, _userConfig, _isPaused, _autoJoin);
+ _userConfig, _isPaused, _autoJoin);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2b2a71e..b11ab9c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -48,6 +48,7 @@ import org.apache.helix.api.id.SessionId;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
@@ -121,6 +122,11 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
int _timerPeriod = Integer.MAX_VALUE;
/**
+ * A cache maintained across pipelines
+ */
+ private ClusterDataCache _cache;
+
+ /**
* Default constructor that creates a default pipeline registry. This is sufficient in
* most cases, but if there is a some thing specific needed use another constructor
* where in you can pass a pipeline registry
@@ -138,6 +144,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void run() {
+ _cache.requireFullRefresh();
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
ClusterEvent event = new ClusterEvent("periodicalRebalance");
@@ -228,6 +235,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
_registry = registry;
_lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
_lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+ _cache = new ClusterDataCache();
_eventQueue = new ClusterEventBlockingQueue();
_eventThread = new ClusterEventProcessor();
_eventThread.start();
@@ -276,6 +284,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
}
+ // add the cache
+ event.addAttribute("ClusterDataCache", _cache);
+
List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
if (pipelines == null || pipelines.size() == 0) {
logger.info("No pipeline to run for event:" + event.getName());
@@ -318,6 +329,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onStateChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
ClusterEvent event = new ClusterEvent("currentStateChange");
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("instanceName", instanceName);
@@ -341,6 +355,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onMessage()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
ClusterEvent event = new ClusterEvent("messageChange");
event.addAttribute("helixmanager", changeContext.getManager());
@@ -360,10 +377,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
if (liveInstances == null) {
liveInstances = Collections.emptyList();
}
+ _cache.setLiveInstances(liveInstances);
+
// Go though the live instance list and make sure that we are observing them
// accordingly. The action is done regardless of the paused flag.
if (changeContext.getType() == NotificationContext.Type.INIT
@@ -403,6 +425,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
+
+ if (idealStates == null) {
+ idealStates = Collections.emptyList();
+ }
+ _cache.setIdealStates(idealStates);
ClusterEvent event = new ClusterEvent("idealStateChange");
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("changeContext", changeContext);
@@ -419,6 +449,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
logger.info("START: GenericClusterController.onConfigChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
+
+ if (configs == null) {
+ configs = Collections.emptyList();
+ }
+ _cache.setInstanceConfigs(configs);
+
ClusterEvent event = new ClusterEvent("configChange");
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
@@ -438,6 +477,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void onControllerChange(NotificationContext changeContext) {
logger.info("START: GenericClusterController.onControllerChange()");
+ _cache.requireFullRefresh();
if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
logger.info("GenericClusterController.onControllerChange() FINALIZE");
return;
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 0c55d45..13616b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -113,8 +113,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
}
if (!taggedLiveNodes.isEmpty()) {
// live nodes exist that have this tag
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following participants with tag " + config.getParticipantGroupTag()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("found the following participants with tag " + config.getParticipantGroupTag()
+ " for " + config.getResourceId() + ": " + taggedLiveNodes);
}
} else if (taggedNodes.isEmpty()) {
@@ -132,12 +132,12 @@ public class FullAutoRebalancer implements HelixRebalancer {
// determine which nodes the replicas should live on
int maxPartition = config.getMaxPartitionsPerParticipant();
- if (LOG.isInfoEnabled()) {
- LOG.info("currentMapping: " + currentMapping);
- LOG.info("stateCountMap: " + stateCountMap);
- LOG.info("liveNodes: " + liveParticipantList);
- LOG.info("allNodes: " + allParticipantList);
- LOG.info("maxPartition: " + maxPartition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentMapping: " + currentMapping);
+ LOG.debug("stateCountMap: " + stateCountMap);
+ LOG.debug("liveNodes: " + liveParticipantList);
+ LOG.debug("allNodes: " + allParticipantList);
+ LOG.debug("maxPartition: " + maxPartition);
}
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
_algorithm =
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
index 974222d..8439583 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -34,25 +34,46 @@ public class RebalancerRef {
@JsonProperty("rebalancerClassName")
private final String _rebalancerClassName;
+ @JsonIgnore
+ private Class<? extends HelixRebalancer> _class;
+
@JsonCreator
private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
_rebalancerClassName = rebalancerClassName;
+ _class = null;
}
/**
- * Get an instantiated Rebalancer
- * @return Rebalancer or null if instantiation failed
+ * Get an instantiated HelixRebalancer
+ * @return HelixRebalancer or null if instantiation failed
*/
@JsonIgnore
public HelixRebalancer getRebalancer() {
try {
- return (HelixRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+ return (HelixRebalancer) (getRebalancerClass().newInstance());
} catch (Exception e) {
LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
}
return null;
}
+ /**
+ * Get the class object of this rebalancer ref
+ * @return Class
+ */
+ @JsonIgnore
+ public Class<? extends HelixRebalancer> getRebalancerClass() {
+ try {
+ if (_class == null) {
+ _class =
+ HelixUtil.loadClass(getClass(), _rebalancerClassName).asSubclass(HelixRebalancer.class);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while loading rebalancer class:" + _rebalancerClassName, e);
+ }
+ return _class;
+ }
+
@Override
public String toString() {
return _rebalancerClassName;
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
index 73c3ccc..a44b230 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -53,7 +53,12 @@ public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
* Instantiate a CustomRebalancerConfig
*/
public CustomRebalancerConfig() {
- setRebalanceMode(RebalanceMode.CUSTOMIZED);
+ if (getClass().equals(CustomRebalancerConfig.class)) {
+ // only mark this as customized mode if this specifc config is used
+ setRebalanceMode(RebalanceMode.CUSTOMIZED);
+ } else {
+ setRebalanceMode(RebalanceMode.USER_DEFINED);
+ }
setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
_preferenceMaps = Maps.newHashMap();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
index 828d509..16bb4cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -30,7 +30,12 @@ import org.apache.helix.model.IdealState.RebalanceMode;
*/
public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
public FullAutoRebalancerConfig() {
- setRebalanceMode(RebalanceMode.FULL_AUTO);
+ if (getClass().equals(FullAutoRebalancerConfig.class)) {
+ // only mark this as full auto mode if this specifc config is used
+ setRebalanceMode(RebalanceMode.FULL_AUTO);
+ } else {
+ setRebalanceMode(RebalanceMode.USER_DEFINED);
+ }
setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
index 2c9769d..dd661d9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -10,14 +10,20 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskRebalancer;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -51,6 +57,23 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
private int _maxPartitionsPerParticipant;
private RebalanceMode _rebalanceMode;
+ @JsonIgnore
+ private static final Set<Class<? extends RebalancerConfig>> BUILTIN_CONFIG_CLASSES = Sets
+ .newHashSet();
+ @JsonIgnore
+ private static final Set<Class<? extends HelixRebalancer>> BUILTIN_REBALANCER_CLASSES = Sets
+ .newHashSet();
+ static {
+ BUILTIN_CONFIG_CLASSES.add(PartitionedRebalancerConfig.class);
+ BUILTIN_CONFIG_CLASSES.add(FullAutoRebalancerConfig.class);
+ BUILTIN_CONFIG_CLASSES.add(SemiAutoRebalancerConfig.class);
+ BUILTIN_CONFIG_CLASSES.add(CustomRebalancerConfig.class);
+ BUILTIN_REBALANCER_CLASSES.add(FullAutoRebalancer.class);
+ BUILTIN_REBALANCER_CLASSES.add(SemiAutoRebalancer.class);
+ BUILTIN_REBALANCER_CLASSES.add(CustomRebalancer.class);
+ BUILTIN_REBALANCER_CLASSES.add(TaskRebalancer.class);
+ }
+
/**
* Instantiate a PartitionedRebalancerConfig
*/
@@ -186,13 +209,42 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
}
/**
+ * Check if the given class is compatible with an {@link IdealState}
+ * @param clazz the PartitionedRebalancerConfig subclass
+ * @return true if IdealState can be used to describe this config, false otherwise
+ */
+ public static boolean isBuiltinConfig(Class<? extends RebalancerConfig> clazz) {
+ return BUILTIN_CONFIG_CLASSES.contains(clazz);
+ }
+
+ /**
+ * Check if the given class is a built-in rebalancer class
+ * @param clazz the HelixRebalancer subclass
+ * @return true if the rebalancer class is built in, false otherwise
+ */
+ public static boolean isBuiltinRebalancer(Class<? extends HelixRebalancer> clazz) {
+ return BUILTIN_REBALANCER_CLASSES.contains(clazz);
+ }
+
+ /**
* Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
* @param idealState populated IdealState
* @return PartitionedRebalancerConfig
*/
public static PartitionedRebalancerConfig from(IdealState idealState) {
PartitionedRebalancerConfig config;
- switch (idealState.getRebalanceMode()) {
+ RebalanceMode mode = idealState.getRebalanceMode();
+ if (mode == RebalanceMode.USER_DEFINED) {
+ Class<? extends RebalancerConfig> configClass = idealState.getRebalancerConfigClass();
+ if (configClass.equals(FullAutoRebalancerConfig.class)) {
+ mode = RebalanceMode.FULL_AUTO;
+ } else if (configClass.equals(SemiAutoRebalancerConfig.class)) {
+ mode = RebalanceMode.SEMI_AUTO;
+ } else if (configClass.equals(CustomRebalancerConfig.class)) {
+ mode = RebalanceMode.CUSTOMIZED;
+ }
+ }
+ switch (mode) {
case FULL_AUTO:
FullAutoRebalancerConfig.Builder fullAutoBuilder =
new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
@@ -252,7 +304,8 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
.maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
.participantGroupTag(idealState.getInstanceGroupTag())
.stateModelDefId(idealState.getStateModelDefId())
- .stateModelFactoryId(idealState.getStateModelFactoryId());
+ .stateModelFactoryId(idealState.getStateModelFactoryId())
+ .rebalanceMode(idealState.getRebalanceMode());
RebalancerRef rebalancerRef = idealState.getRebalancerRef();
if (rebalancerRef != null) {
builder.rebalancerRef(rebalancerRef);
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
index 8581732..d6ddb50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -32,7 +32,7 @@ import org.apache.log4j.Logger;
* information specific to each rebalancer.
*/
public final class RebalancerConfigHolder {
- private enum Fields {
+ public enum Fields {
SERIALIZER_CLASS,
REBALANCER_CONFIG,
REBALANCER_CONFIG_CLASS
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index bfc3309..727c3df 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -56,7 +56,12 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
* Instantiate a SemiAutoRebalancerConfig
*/
public SemiAutoRebalancerConfig() {
- setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ if (getClass().equals(SemiAutoRebalancerConfig.class)) {
+ // only mark this as semi auto mode if this specifc config is used
+ setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ } else {
+ setRebalanceMode(RebalanceMode.USER_DEFINED);
+ }
setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
_preferenceLists = Maps.newHashMap();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index ec812b2..644b9f6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -37,12 +37,14 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.FallbackRebalancer;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -52,6 +54,9 @@ import com.google.common.collect.Sets;
public class BestPossibleStateCalcStage extends AbstractBaseStage {
private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+ // cache for rebalancer instances
+ private Map<ResourceId, HelixRebalancer> _rebalancerMap = Maps.newHashMap();
+
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
@@ -63,11 +68,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
event.getAttribute(AttributeName.CURRENT_STATE.toString());
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (currentStateOutput == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+ + ". Requires CURRENT_STATE|RESOURCES|Cluster");
}
BestPossibleStateOutput bestPossibleStateOutput =
@@ -178,25 +183,42 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
stateModelDefs.get(rebalancerConfig.getStateModelDefId());
ResourceAssignment resourceAssignment = null;
if (rebalancerConfig != null) {
+ // use a cached rebalancer if possible
+ RebalancerRef ref = rebalancerConfig.getRebalancerRef();
HelixRebalancer rebalancer = null;
- if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
- rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+ if (_rebalancerMap.containsKey(resourceId)) {
+ HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
+ if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
+ rebalancer = candidateRebalancer;
+ }
}
- HelixManager manager = event.getAttribute("helixmanager");
- ControllerContextProvider provider =
- event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+
+ // otherwise instantiate a new one
if (rebalancer == null) {
- rebalancer = new FallbackRebalancer();
+ if (ref != null) {
+ rebalancer = ref.getRebalancer();
+ }
+ HelixManager manager = event.getAttribute("helixmanager");
+ ControllerContextProvider provider =
+ event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+ if (rebalancer == null) {
+ rebalancer = new FallbackRebalancer();
+ }
+ rebalancer.init(manager, provider);
+ _rebalancerMap.put(resourceId, rebalancer);
}
- rebalancer.init(manager, provider);
ResourceAssignment currentAssignment = null;
Resource resourceSnapshot = cluster.getResource(resourceId);
if (resourceSnapshot != null) {
currentAssignment = resourceSnapshot.getResourceAssignment();
}
- resourceAssignment =
- rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
- currentStateOutput);
+ try {
+ resourceAssignment =
+ rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+ currentStateOutput);
+ } catch (Exception e) {
+ LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
+ }
}
if (resourceAssignment == null) {
resourceAssignment =
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6f09d26..0c28bdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -22,12 +22,18 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
@@ -35,22 +41,43 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
* Reads the data from the cluster using data accessor. This output ClusterData which
* provides useful methods to search/lookup properties
*/
-@Deprecated
public class ClusterDataCache {
Map<String, LiveInstance> _liveInstanceMap;
+ Map<String, LiveInstance> _liveInstanceCacheMap;
Map<String, IdealState> _idealStateMap;
+ Map<String, IdealState> _idealStateCacheMap;
Map<String, StateModelDefinition> _stateModelDefMap;
Map<String, InstanceConfig> _instanceConfigMap;
+ Map<String, InstanceConfig> _instanceConfigCacheMap;
Map<String, ClusterConstraints> _constraintMap;
Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
Map<String, Map<String, Message>> _messageMap;
+ Map<String, Map<String, String>> _idealStateRuleMap;
+ Map<String, ResourceConfiguration> _resourceConfigMap;
+ Map<String, ControllerContextHolder> _controllerContextMap;
+ PauseSignal _pause;
+ LiveInstance _leader;
+ ClusterConfiguration _clusterConfig;
+ boolean _writeAssignments;
+
+ // maintain a cache of participant messages across pipeline runs
+ Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+ boolean _init = true;
// Map<String, Map<String, HealthStat>> _healthStatMap;
// private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
@@ -66,39 +93,111 @@ public class ClusterDataCache {
* @param accessor
* @return
*/
- public boolean refresh(HelixDataAccessor accessor) {
+ public synchronized boolean refresh(HelixDataAccessor accessor) {
+ LOG.info("START: ClusterDataCache.refresh()");
+ long startTime = System.currentTimeMillis();
+
Builder keyBuilder = accessor.keyBuilder();
- _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+ if (_init) {
+ _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+ _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+ _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ }
+ _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
+ _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
+ _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
for (LiveInstance instance : _liveInstanceMap.values()) {
- LOG.trace("live instance: " + instance.getParticipantId() + " "
- + instance.getTypedSessionId());
+ LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
}
_stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
- _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
_constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
+ List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+ long purgeSum = 0;
for (String instanceName : _liveInstanceMap.keySet()) {
- Map<String, Message> map = accessor.getChildValuesMap(keyBuilder.messages(instanceName));
- msgMap.put(instanceName, map);
+ // get the cache
+ Map<String, Message> cachedMap = _messageCache.get(instanceName);
+ if (cachedMap == null) {
+ cachedMap = Maps.newHashMap();
+ _messageCache.put(instanceName, cachedMap);
+ }
+ msgMap.put(instanceName, cachedMap);
+
+ // get the current names
+ Set<String> messageNames =
+ Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+ long purgeStart = System.currentTimeMillis();
+ // clear stale names
+ Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+ while (cachedNamesIter.hasNext()) {
+ String messageName = cachedNamesIter.next();
+ if (!messageNames.contains(messageName)) {
+ cachedNamesIter.remove();
+ }
+ }
+ long purgeEnd = System.currentTimeMillis();
+ purgeSum += purgeEnd - purgeStart;
+
+ // get the keys for the new messages
+ for (String messageName : messageNames) {
+ if (!cachedMap.containsKey(messageName)) {
+ newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+ }
+ }
+ }
+
+ // get the new messages
+ if (newMessageKeys.size() > 0) {
+ List<Message> newMessages = accessor.getProperty(newMessageKeys);
+ for (Message message : newMessages) {
+ if (message != null) {
+ Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+ cachedMap.put(message.getId(), message);
+ }
+ }
}
_messageMap = Collections.unmodifiableMap(msgMap);
+ LOG.debug("Purge took: " + purgeSum);
+ List<PropertyKey> currentStateKeys = Lists.newLinkedList();
Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
new HashMap<String, Map<String, Map<String, CurrentState>>>();
for (String instanceName : _liveInstanceMap.keySet()) {
LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
- String sessionId = liveInstance.getTypedSessionId().stringify();
- if (!allCurStateMap.containsKey(instanceName)) {
- allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+ String sessionId = liveInstance.getSessionId();
+ List<String> currentStateNames =
+ accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+ for (String currentStateName : currentStateNames) {
+ currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+ }
+
+ // ensure an empty current state map for all live instances and sessions
+ Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName);
+ if (instanceCurStateMap == null) {
+ instanceCurStateMap = Maps.newHashMap();
+ allCurStateMap.put(instanceName, instanceCurStateMap);
+ }
+ Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+ if (sessionCurStateMap == null) {
+ sessionCurStateMap = Maps.newHashMap();
+ instanceCurStateMap.put(sessionId, sessionCurStateMap);
+ }
+ }
+ List<CurrentState> currentStates = accessor.getProperty(currentStateKeys);
+ Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator();
+ for (CurrentState currentState : currentStates) {
+ PropertyKey key = csKeyIter.next();
+ String[] params = key.getParams();
+ if (currentState != null && params.length >= 4) {
+ Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]);
+ Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]);
+ sessionCurStateMap.put(params[3], currentState);
}
- Map<String, Map<String, CurrentState>> curStateMap = allCurStateMap.get(instanceName);
- Map<String, CurrentState> map =
- accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
- curStateMap.put(sessionId, map);
}
for (String instance : allCurStateMap.keySet()) {
@@ -106,10 +205,63 @@ public class ClusterDataCache {
}
_currentStateMap = Collections.unmodifiableMap(allCurStateMap);
+ // New in 0.7: Read more information for the benefit of user-defined rebalancers
+ _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+ _controllerContextMap = accessor.getChildValuesMap(keyBuilder.controllerContexts());
+
+ // Read all single properties together
+ List<HelixProperty> singleProperties =
+ accessor.getProperty(ImmutableList.of(keyBuilder.clusterConfig(),
+ keyBuilder.controllerLeader(), keyBuilder.pause()));
+ _clusterConfig = (ClusterConfiguration) singleProperties.get(0);
+ if (_clusterConfig != null) {
+ _idealStateRuleMap = _clusterConfig.getIdealStateRules();
+ } else {
+ _idealStateRuleMap = Collections.emptyMap();
+ }
+ _leader = (LiveInstance) singleProperties.get(1);
+ _pause = (PauseSignal) singleProperties.get(2);
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
+
+ if (LOG.isDebugEnabled()) {
+ int numPaths =
+ _liveInstanceMap.size() + _idealStateMap.size() + +_resourceConfigMap.size()
+ + _stateModelDefMap.size() + _instanceConfigMap.size() + _constraintMap.size()
+ + _controllerContextMap.size() + newMessageKeys.size() + currentStateKeys.size();
+ LOG.debug("Paths read: " + numPaths);
+ }
+
+ _init = false;
return true;
}
/**
+ * Get the live instance associated with the controller leader
+ * @return LiveInstance
+ */
+ public LiveInstance getLeader() {
+ return _leader;
+ }
+
+ /**
+ * Get the pause signal (if any)
+ * @return PauseSignal
+ */
+ public PauseSignal getPauseSignal() {
+ return _pause;
+ }
+
+ /**
+ * Retrieves the configs for all resources
+ * @return
+ */
+ public Map<String, ResourceConfiguration> getResourceConfigs() {
+ return _resourceConfigMap;
+ }
+
+ /**
* Retrieves the idealstates for all resources
* @return
*/
@@ -117,6 +269,18 @@ public class ClusterDataCache {
return _idealStateMap;
}
+ public synchronized void setIdealStates(List<IdealState> idealStates) {
+ Map<String, IdealState> idealStateMap = Maps.newHashMap();
+ for (IdealState idealState : idealStates) {
+ idealStateMap.put(idealState.getId(), idealState);
+ }
+ _idealStateCacheMap = idealStateMap;
+ }
+
+ public Map<String, Map<String, String>> getIdealStateRules() {
+ return _idealStateRuleMap;
+ }
+
/**
* Returns the LiveInstances for each of the instances that are curretnly up and running
* @return
@@ -125,6 +289,14 @@ public class ClusterDataCache {
return _liveInstanceMap;
}
+ public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
+ Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+ for (LiveInstance liveInstance : liveInstances) {
+ liveInstanceMap.put(liveInstance.getId(), liveInstance);
+ }
+ _liveInstanceCacheMap = liveInstanceMap;
+ }
+
/**
* Provides the current state of the node for a given session id,
* the sessionid can be got from LiveInstance
@@ -133,6 +305,10 @@ public class ClusterDataCache {
* @return
*/
public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
+ if (!_currentStateMap.containsKey(instanceName)
+ || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
+ return Collections.emptyMap();
+ }
return _currentStateMap.get(instanceName).get(clientSessionId);
}
@@ -150,6 +326,14 @@ public class ClusterDataCache {
}
}
+ /**
+ * Provides all outstanding messages
+ * @return
+ */
+ public Map<String, Map<String, Message>> getMessageMap() {
+ return _messageMap;
+ }
+
// public HealthStat getGlobalStats()
// {
// return _globalStats;
@@ -187,11 +371,18 @@ public class ClusterDataCache {
* @return
*/
public StateModelDefinition getStateModelDef(String stateModelDefRef) {
-
return _stateModelDefMap.get(stateModelDefRef);
}
/**
+ * Get all state model definitions
+ * @return map of name to state model definition
+ */
+ public Map<String, StateModelDefinition> getStateModelDefMap() {
+ return _stateModelDefMap;
+ }
+
+ /**
* Provides the idealstate for a given resource
* @param resourceName
* @return
@@ -208,6 +399,14 @@ public class ClusterDataCache {
return _instanceConfigMap;
}
+ public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+ Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
+ }
+ _instanceConfigCacheMap = instanceConfigMap;
+ }
+
/**
* Some partitions might be disabled on specific nodes.
* This method allows one to fetch the set of nodes where a given partition is disabled
@@ -266,6 +465,55 @@ public class ClusterDataCache {
return null;
}
+ public Map<String, ClusterConstraints> getConstraintMap() {
+ return _constraintMap;
+ }
+
+ public Map<String, ControllerContextHolder> getContextMap() {
+ return _controllerContextMap;
+ }
+
+ public ClusterConfiguration getClusterConfig() {
+ return _clusterConfig;
+ }
+
+ public void cacheMessages(List<Message> messages) {
+ for (Message message : messages) {
+ String instanceName = message.getTgtName();
+ Map<String, Message> instMsgMap = null;
+ if (_messageCache.containsKey(instanceName)) {
+ instMsgMap = _messageCache.get(instanceName);
+ } else {
+ instMsgMap = Maps.newHashMap();
+ _messageCache.put(instanceName, instMsgMap);
+ }
+ instMsgMap.put(message.getId(), message);
+ }
+ }
+
+ /**
+ * Enable or disable writing resource assignments
+ * @param enable true to enable, false to disable
+ */
+ public void setAssignmentWritePolicy(boolean enable) {
+ _writeAssignments = enable;
+ }
+
+ /**
+ * Check if writing resource assignments is enabled
+ * @return true if enabled, false if disabled
+ */
+ public boolean assignmentWriteEnabled() {
+ return _writeAssignments;
+ }
+
+ /**
+ * Indicate that a full read should be done on the next refresh
+ */
+ public synchronized void requireFullRefresh() {
+ _init = true;
+ }
+
/**
* toString method to print the entire cluster state
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 532ecb5..15264ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -40,10 +40,10 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (manager == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager | DataCache");
+ + ". Requires HelixManager | Cluster");
}
HelixManagerProperties properties = manager.getProperties();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 8235173..64bf792 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -47,13 +47,13 @@ import org.apache.helix.model.Message.MessageType;
public class CurrentStateComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
if (cluster == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCE");
+ + ". Requires Cluster|RESOURCE");
}
ResourceCurrentState currentStateOutput = new ResourceCurrentState();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e8e42bf..a46acbd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -65,11 +65,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
HelixManager manager = event.getAttribute("helixmanager");
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (manager == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires ClusterManager|RESOURCES|DataCache");
+ + ". Requires ClusterManager|RESOURCES|Cluster");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();