You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/20 19:42:00 UTC
[2/2] git commit: [HELIX-345] Speed up the controller pipeline
[HELIX-345] Speed up the controller pipeline
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/51329f6f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/51329f6f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/51329f6f
Branch: refs/heads/master
Commit: 51329f6f0bbb8a43cdfd06ffd3bf7ac5c8a93c68
Parents: c8a644f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Feb 4 10:50:11 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Feb 20 10:41:46 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/ConfigAccessor.java | 9 +-
.../main/java/org/apache/helix/api/Cluster.java | 27 +-
.../helix/api/accessor/ClusterAccessor.java | 268 ++++++++++++------
.../helix/api/accessor/ResourceAccessor.java | 42 ++-
.../apache/helix/api/config/ClusterConfig.java | 185 +-----------
.../controller/GenericHelixController.java | 40 +++
.../rebalancer/FullAutoRebalancer.java | 16 +-
.../controller/rebalancer/RebalancerRef.java | 27 +-
.../config/CustomRebalancerConfig.java | 7 +-
.../config/FullAutoRebalancerConfig.java | 7 +-
.../config/PartitionedRebalancerConfig.java | 57 +++-
.../config/RebalancerConfigHolder.java | 2 +-
.../config/SemiAutoRebalancerConfig.java | 7 +-
.../stages/BestPossibleStateCalcStage.java | 46 ++-
.../controller/stages/ClusterDataCache.java | 282 +++++++++++++++++--
.../stages/CompatibilityCheckStage.java | 4 +-
.../stages/CurrentStateComputationStage.java | 4 +-
.../stages/ExternalViewComputeStage.java | 4 +-
.../stages/MessageGenerationStage.java | 4 +-
.../stages/MessageSelectionStage.java | 7 +-
.../controller/stages/MessageThrottleStage.java | 4 +-
.../stages/PersistAssignmentStage.java | 57 +++-
.../controller/stages/PersistContextStage.java | 11 +-
.../controller/stages/ReadClusterDataStage.java | 14 +-
.../stages/ResourceComputationStage.java | 2 +-
.../stages/ResourceValidationStage.java | 4 +-
.../controller/stages/TaskAssignmentStage.java | 12 +-
.../strategy/AutoRebalanceStrategy.java | 186 ++++++------
.../helix/manager/zk/ZKHelixDataAccessor.java | 5 +
.../java/org/apache/helix/model/IdealState.java | 83 +++++-
.../apache/helix/model/ResourceAssignment.java | 2 +-
.../helix/model/ResourceConfiguration.java | 10 +
.../org/apache/helix/task/TaskRebalancer.java | 12 +-
.../java/org/apache/helix/task/TaskUtil.java | 23 +-
.../tools/ClusterExternalViewVerifier.java | 7 +-
.../helix/tools/ClusterStateVerifier.java | 2 +-
.../apache/helix/tools/YAMLClusterSetup.java | 9 +
.../src/test/java/org/apache/helix/Mocks.java | 2 +-
.../org/apache/helix/api/TestNewStages.java | 2 +-
.../stages/TestRebalancePipeline.java | 17 +-
.../stages/TestResourceValidationStage.java | 6 +-
.../TestReelectedPipelineCorrectness.java | 151 ++++++++++
.../TestUserDefRebalancerCompatibility.java | 44 +--
.../TestClusterStatusMonitorLifecycle.java | 20 +-
44 files changed, 1188 insertions(+), 542 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index f46e537..3589165 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ConfigScope;
@@ -489,7 +490,13 @@ public class ConfigAccessor {
List<String> retKeys = null;
if (scope.isFullKey()) {
- ZNRecord record = zkClient.readData(zkPath);
+ ZNRecord record;
+ try {
+ record = zkClient.readData(zkPath);
+ } catch (ZkNoNodeException e) {
+ LOG.warn(zkPath + " no longer exists");
+ return Collections.emptyList();
+ }
if (mapKey == null) {
retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
} else {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index 98072d1..adaf200 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -34,10 +34,8 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SpectatorId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.context.ControllerContext;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
@@ -91,8 +89,6 @@ public class Cluster {
* @param constraintMap
* @param stateModelMap
* @param contextMap
- * @param stats
- * @param alerts
* @param userConfig
* @param isPaused
* @param autoJoinAllowed
@@ -101,8 +97,8 @@ public class Cluster {
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
Map<StateModelDefId, StateModelDefinition> stateModelMap,
- Map<ContextId, ControllerContext> contextMap, PersistentStats stats, Alerts alerts,
- UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+ Map<ContextId, ControllerContext> contextMap, UserConfig userConfig, boolean isPaused,
+ boolean autoJoinAllowed) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -124,8 +120,7 @@ public class Cluster {
new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
.addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
.addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
- .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
- .build();
+ .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -240,22 +235,6 @@ public class Cluster {
}
/**
- * Get all the persisted stats for the cluster
- * @return PersistentStats instance
- */
- public PersistentStats getStats() {
- return _config.getStats();
- }
-
- /**
- * Get all the persisted alerts for the cluster
- * @return Alerts instance
- */
- public Alerts getAlerts() {
- return _config.getAlerts();
- }
-
- /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index abb3e49..5ecc210 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix.api.accessor;
* under the License.
*/
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,9 +55,11 @@ import org.apache.helix.api.id.SessionId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.context.ControllerContext;
import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConfiguration;
@@ -86,15 +89,27 @@ public class ClusterAccessor {
private final PropertyKey.Builder _keyBuilder;
private final ClusterId _clusterId;
+ private final ClusterDataCache _cache;
+
/**
* Instantiate a cluster accessor
* @param clusterId the cluster to access
* @param accessor HelixDataAccessor for the physical store
*/
public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ this(clusterId, accessor, new ClusterDataCache());
+ }
+
+ /**
+ * Instantiate a cluster accessor
+ * @param clusterId the cluster to access
+ * @param accessor HelixDataAccessor for the physical store
+ */
+ public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor, ClusterDataCache cache) {
_accessor = accessor;
_keyBuilder = accessor.keyBuilder();
_clusterId = clusterId;
+ _cache = cache;
}
/**
@@ -129,9 +144,6 @@ public class ClusterAccessor {
if (cluster.autoJoinAllowed()) {
clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
}
- if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
- _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
- }
if (cluster.isPaused()) {
pauseCluster();
}
@@ -173,16 +185,6 @@ public class ClusterAccessor {
ClusterConstraints constraint = constraints.get(type);
_accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
}
- if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
- _accessor.removeProperty(_keyBuilder.persistantStat());
- } else {
- _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
- }
- if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
- _accessor.removeProperty(_keyBuilder.alerts());
- } else {
- _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
- }
return true;
}
@@ -218,19 +220,22 @@ public class ClusterAccessor {
LOG.error("Cluster is not fully set up");
return null;
}
- LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+ // refresh the cache
+ _cache.refresh(_accessor);
+
+ LiveInstance leader = _cache.getLeader();
/**
* map of constraint-type to constraints
*/
- Map<String, ClusterConstraints> constraintMap =
- _accessor.getChildValuesMap(_keyBuilder.constraints());
+ Map<String, ClusterConstraints> constraintMap = _cache.getConstraintMap();
// read all the resources
- Map<ResourceId, Resource> resourceMap = readResources();
+ Map<ResourceId, Resource> resourceMap = readResources(true);
// read all the participants
- Map<ParticipantId, Participant> participantMap = readParticipants();
+ Map<ParticipantId, Participant> participantMap = readParticipants(true);
// read the controllers
Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
@@ -249,10 +254,10 @@ public class ClusterAccessor {
}
// read the pause status
- PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ PauseSignal pauseSignal = _cache.getPauseSignal();
boolean isPaused = pauseSignal != null;
- ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ ClusterConfiguration clusterConfig = _cache.getClusterConfig();
boolean autoJoinAllowed = false;
UserConfig userConfig;
if (clusterConfig != null) {
@@ -263,21 +268,14 @@ public class ClusterAccessor {
}
// read the state model definitions
- Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
-
- // read the stats
- PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-
- // read the alerts
- Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
+ Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions(true);
// read controller context
- Map<ContextId, ControllerContext> contextMap = readControllerContext();
+ Map<ContextId, ControllerContext> contextMap = readControllerContext(true);
// create the cluster snapshot object
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, contextMap, stats, alerts, userConfig, isPaused,
- autoJoinAllowed);
+ clusterConstraintMap, stateModelMap, contextMap, userConfig, isPaused, autoJoinAllowed);
}
/**
@@ -285,9 +283,22 @@ public class ClusterAccessor {
* @return map of state model def id to state model definition
*/
public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+ return readStateModelDefinitions(false);
+ }
+
+ /**
+ * Get all the state model definitions for this cluster
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of state model def id to state model definition
+ */
+ private Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions(boolean useCache) {
Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
- List<StateModelDefinition> stateModelList =
- _accessor.getChildValues(_keyBuilder.stateModelDefs());
+ Collection<StateModelDefinition> stateModelList;
+ if (useCache) {
+ stateModelList = _cache.getStateModelDefMap().values();
+ } else {
+ stateModelList = _accessor.getChildValues(_keyBuilder.stateModelDefs());
+ }
for (StateModelDefinition stateModelDef : stateModelList) {
stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
}
@@ -299,35 +310,70 @@ public class ClusterAccessor {
* @return map of resource id to resource
*/
public Map<ResourceId, Resource> readResources() {
- if (!isClusterStructureValid()) {
+ return readResources(false);
+ }
+
+ /**
+ * Read all resources in the cluster
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of resource id to resource
+ */
+ private Map<ResourceId, Resource> readResources(boolean useCache) {
+ if (!useCache && !isClusterStructureValid()) {
LOG.error("Cluster is not fully set up yet!");
return Collections.emptyMap();
}
- /**
- * map of resource-id to ideal-state
- */
- Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
-
- /**
- * Map of resource id to external view
- */
- Map<String, ExternalView> externalViewMap =
- _accessor.getChildValuesMap(_keyBuilder.externalViews());
+ Map<String, IdealState> idealStateMap;
+ Map<String, ResourceConfiguration> resourceConfigMap;
+ Map<String, ExternalView> externalViewMap;
+ Map<String, ResourceAssignment> resourceAssignmentMap;
+ if (useCache) {
+ idealStateMap = _cache.getIdealStates();
+ resourceConfigMap = _cache.getResourceConfigs();
+ } else {
+ idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+ resourceConfigMap = _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+ }
- /**
- * Map of resource id to user configuration
- */
- Map<String, ResourceConfiguration> resourceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+ // check if external view and resource assignment reads are required
+ boolean extraReadsRequired = false;
+ for (String resourceName : idealStateMap.keySet()) {
+ if (extraReadsRequired) {
+ break;
+ }
+ // a rebalancer can be user defined if it has that mode set, or has a different rebalancer
+ // class
+ IdealState idealState = idealStateMap.get(resourceName);
+ extraReadsRequired =
+ extraReadsRequired || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED);
+ RebalancerRef ref = idealState.getRebalancerRef();
+ if (ref != null) {
+ extraReadsRequired =
+ extraReadsRequired
+ || !PartitionedRebalancerConfig.isBuiltinRebalancer(ref.getRebalancerClass());
+ }
+ }
+ for (String resourceName : resourceConfigMap.keySet()) {
+ if (extraReadsRequired) {
+ break;
+ }
+ extraReadsRequired =
+ extraReadsRequired || resourceConfigMap.get(resourceName).hasRebalancerConfig();
+ }
- /**
- * Map of resource id to resource assignment
- */
- Map<String, ResourceAssignment> resourceAssignmentMap =
- _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+ // now read external view and resource assignments if needed
+ if (!useCache || extraReadsRequired) {
+ externalViewMap = _accessor.getChildValuesMap(_keyBuilder.externalViews());
+ resourceAssignmentMap = _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+ _cache.setAssignmentWritePolicy(true);
+ } else {
+ externalViewMap = Maps.newHashMap();
+ resourceAssignmentMap = Maps.newHashMap();
+ _cache.setAssignmentWritePolicy(false);
+ }
- // read all the resources
+ // populate all the resources
Set<String> allResources = Sets.newHashSet();
allResources.addAll(idealStateMap.keySet());
allResources.addAll(resourceConfigMap.keySet());
@@ -347,45 +393,58 @@ public class ClusterAccessor {
* @return map of participant id to participant, or empty map
*/
public Map<ParticipantId, Participant> readParticipants() {
- if (!isClusterStructureValid()) {
+ return readParticipants(false);
+ }
+
+ /**
+ * Read all participants in the cluster
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of participant id to participant, or empty map
+ */
+ private Map<ParticipantId, Participant> readParticipants(boolean useCache) {
+ if (!useCache && !isClusterStructureValid()) {
LOG.error("Cluster is not fully set up yet!");
return Collections.emptyMap();
}
-
- /**
- * map of instance-id to instance-config
- */
- Map<String, InstanceConfig> instanceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
-
- /**
- * map of instance-id to live-instance
- */
- Map<String, LiveInstance> liveInstanceMap =
- _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+ Map<String, InstanceConfig> instanceConfigMap;
+ Map<String, LiveInstance> liveInstanceMap;
+ if (useCache) {
+ instanceConfigMap = _cache.getInstanceConfigMap();
+ liveInstanceMap = _cache.getLiveInstances();
+ } else {
+ instanceConfigMap = _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+ liveInstanceMap = _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+ }
/**
* map of participant-id to map of message-id to message
*/
- Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
- for (String instanceName : liveInstanceMap.keySet()) {
- Map<String, Message> instanceMsgMap =
- _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
- messageMap.put(instanceName, instanceMsgMap);
+ Map<String, Map<String, Message>> messageMap = Maps.newHashMap();
+ for (String participantName : liveInstanceMap.keySet()) {
+ Map<String, Message> instanceMsgMap;
+ if (useCache) {
+ instanceMsgMap = _cache.getMessages(participantName);
+ } else {
+ instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+ }
+ messageMap.put(participantName, instanceMsgMap);
}
/**
* map of participant-id to map of resource-id to current-state
*/
- Map<String, Map<String, CurrentState>> currentStateMap =
- new HashMap<String, Map<String, CurrentState>>();
+ Map<String, Map<String, CurrentState>> currentStateMap = Maps.newHashMap();
for (String participantName : liveInstanceMap.keySet()) {
LiveInstance liveInstance = liveInstanceMap.get(participantName);
SessionId sessionId = liveInstance.getTypedSessionId();
- Map<String, CurrentState> instanceCurStateMap =
- _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
- sessionId.stringify()));
-
+ Map<String, CurrentState> instanceCurStateMap;
+ if (useCache) {
+ instanceCurStateMap = _cache.getCurrentState(participantName, sessionId.stringify());
+ } else {
+ instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+ }
currentStateMap.put(participantName, instanceCurStateMap);
}
@@ -472,8 +531,21 @@ public class ClusterAccessor {
* @return map of context id to controller context
*/
public Map<ContextId, ControllerContext> readControllerContext() {
- Map<String, ControllerContextHolder> contextHolders =
- _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+ return readControllerContext(false);
+ }
+
+ /**
+ * Read the persisted controller contexts
+ * @param useCache Use the ClusterDataCache associated with this class rather than reading again
+ * @return map of context id to controller context
+ */
+ private Map<ContextId, ControllerContext> readControllerContext(boolean useCache) {
+ Map<String, ControllerContextHolder> contextHolders;
+ if (useCache) {
+ contextHolders = _cache.getContextMap();
+ } else {
+ contextHolders = _accessor.getChildValuesMap(_keyBuilder.controllerContexts());
+ }
Map<ContextId, ControllerContext> contexts = Maps.newHashMap();
for (String contextName : contextHolders.keySet()) {
contexts.put(ContextId.from(contextName), contextHolders.get(contextName).getContext());
@@ -482,6 +554,22 @@ public class ClusterAccessor {
}
/**
+ * Get the current cluster stats
+ * @return PersistentStats
+ */
+ public PersistentStats getStats() {
+ return _accessor.getProperty(_keyBuilder.persistantStat());
+ }
+
+ /**
+ * Get the current cluster alerts
+ * @return Alerts
+ */
+ public Alerts getAlerts() {
+ return _accessor.getProperty(_keyBuilder.alerts());
+ }
+
+ /**
* Add a statistic specification to the cluster. Existing stat specifications will not be
* overwritten
* @param statName string representing a stat specification
@@ -673,14 +761,22 @@ public class ClusterAccessor {
return false;
}
+ // Create an IdealState from a RebalancerConfig (if the resource supports it)
+ IdealState idealState =
+ ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+ resource.getBucketSize(), resource.getBatchMessageMode());
+ if (idealState != null) {
+ _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ }
+
// Add resource user config
if (resource.getUserConfig() != null) {
ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
configuration.setType(resource.getType());
configuration.addNamespacedConfig(resource.getUserConfig());
PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
- if (partitionedConfig == null
- || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+ if (idealState == null
+ && (partitionedConfig == null || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED)) {
// only persist if this is not easily convertible to an ideal state
configuration
.addNamespacedConfig(new RebalancerConfigHolder(resource.getRebalancerConfig())
@@ -688,14 +784,6 @@ public class ClusterAccessor {
}
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
-
- // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
- IdealState idealState =
- ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
- resource.getBucketSize(), resource.getBatchMessageMode());
- if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
- }
return true;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 73d43b0..a1d6580 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -39,6 +39,7 @@ import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@ -137,14 +138,19 @@ public class ResourceAccessor {
*/
private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration,
RebalancerConfig rebalancerConfig) {
- boolean status =
- _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ boolean status = true;
+ if (configuration != null) {
+ status =
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ }
// set an ideal state if the resource supports it
IdealState idealState =
rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
configuration.getBatchMessageMode());
if (idealState != null) {
- _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+ status =
+ status
+ && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
}
return status;
}
@@ -253,7 +259,14 @@ public class ResourceAccessor {
}
ResourceId resourceId = resourceConfig.getId();
ResourceConfiguration config = new ResourceConfiguration(resourceId);
- config.addNamespacedConfig(resourceConfig.getUserConfig());
+ UserConfig userConfig = resourceConfig.getUserConfig();
+ if (userConfig != null
+ && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig
+ .getMapFields().isEmpty())) {
+ config.addNamespacedConfig(userConfig);
+ } else {
+ userConfig = null;
+ }
PartitionedRebalancerConfig partitionedConfig =
PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
if (partitionedConfig == null
@@ -261,9 +274,11 @@ public class ResourceAccessor {
// only persist if this is not easily convertible to an ideal state
config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
.toNamespacedConfig());
+ config.setBucketSize(resourceConfig.getBucketSize());
+ config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+ } else if (userConfig == null) {
+ config = null;
}
- config.setBucketSize(resourceConfig.getBucketSize());
- config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
setConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
return true;
}
@@ -387,9 +402,17 @@ public class ResourceAccessor {
boolean batchMessageMode) {
PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
if (partitionedConfig != null) {
+ if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) {
+ // don't proceed if this resource cannot be described by an ideal state
+ return null;
+ }
IdealState idealState = new IdealState(partitionedConfig.getResourceId());
idealState.setRebalanceMode(partitionedConfig.getRebalanceMode());
- idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+
+ RebalancerRef ref = partitionedConfig.getRebalancerRef();
+ if (ref != null) {
+ idealState.setRebalancerRef(partitionedConfig.getRebalancerRef());
+ }
String replicas = null;
if (partitionedConfig.anyLiveParticipant()) {
replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
@@ -404,13 +427,14 @@ public class ResourceAccessor {
idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId());
idealState.setBucketSize(bucketSize);
idealState.setBatchMessageMode(batchMessageMode);
- if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ idealState.setRebalancerConfigClass(config.getClass());
+ if (SemiAutoRebalancerConfig.class.equals(config.getClass())) {
SemiAutoRebalancerConfig semiAutoConfig =
BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class);
for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) {
idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId));
}
- } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ } else if (CustomRebalancerConfig.class.equals(config.getClass())) {
CustomRebalancerConfig customConfig =
BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class);
for (PartitionId partitionId : customConfig.getPartitionSet()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 22a1528..ddc98fa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,8 +5,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.id.ClusterId;
@@ -14,14 +12,12 @@ import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ClusterConstraints.ConstraintValue;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.PersistentStats;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.Transition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -61,8 +57,6 @@ public class ClusterConfig {
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
- private final PersistentStats _stats;
- private final Alerts _alerts;
private final UserConfig _userConfig;
private final boolean _isPaused;
private final boolean _autoJoin;
@@ -74,8 +68,6 @@ public class ClusterConfig {
* @param participantMap map of participant id to participant config
* @param constraintMap map of constraint type to all constraints of that type
* @param stateModelMap map of state model id to state model definition
- * @param stats statistics to watch on the cluster
- * @param alerts alerts that the cluster can trigger
* @param userConfig user-defined cluster properties
* @param isPaused true if paused, false if active
* @param allowAutoJoin true if participants can join automatically, false otherwise
@@ -83,15 +75,13 @@ public class ClusterConfig {
private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
Map<ParticipantId, ParticipantConfig> participantMap,
Map<ConstraintType, ClusterConstraints> constraintMap,
- Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
- Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused, boolean allowAutoJoin) {
_id = id;
_resourceMap = ImmutableMap.copyOf(resourceMap);
_participantMap = ImmutableMap.copyOf(participantMap);
_constraintMap = ImmutableMap.copyOf(constraintMap);
_stateModelMap = ImmutableMap.copyOf(stateModelMap);
- _stats = stats;
- _alerts = alerts;
_userConfig = userConfig;
_isPaused = isPaused;
_autoJoin = allowAutoJoin;
@@ -237,22 +227,6 @@ public class ClusterConfig {
}
/**
- * Get all the statistics persisted on the cluster
- * @return PersistentStats instance
- */
- public PersistentStats getStats() {
- return _stats;
- }
-
- /**
- * Get all the alerts persisted on the cluster
- * @return Alerts instance
- */
- public Alerts getAlerts() {
- return _alerts;
- }
-
- /**
* Get user-specified configuration properties of this cluster
* @return UserConfig properties
*/
@@ -287,8 +261,6 @@ public class ClusterConfig {
private Set<Fields> _updateFields;
private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
- private PersistentStats _removedStats;
- private Alerts _removedAlerts;
private Builder _builder;
/**
@@ -302,8 +274,6 @@ public class ClusterConfig {
Set<ConstraintId> constraints = Sets.newHashSet();
_removedConstraints.put(type, constraints);
}
- _removedStats = new PersistentStats(PersistentStats.nodeName);
- _removedAlerts = new Alerts(Alerts.nodeName);
_builder = new Builder(clusterId);
}
@@ -431,57 +401,6 @@ public class ClusterConfig {
}
/**
- * Add a statistic specification to the cluster. Existing specifications will not be overwritten
- * @param stat string specifying the stat specification
- * @return Delta
- */
- public Delta addStat(String stat) {
- _builder.addStat(stat);
- return this;
- }
-
- /**
- * Add an alert specification for the cluster. Existing specifications will not be overwritten
- * @param alert string specifying the alert specification
- * @return Delta
- */
- public Delta addAlert(String alert) {
- _builder.addAlert(alert);
- return this;
- }
-
- /**
- * Remove a statistic specification from the cluster
- * @param stat statistic specification
- * @return Delta
- */
- public Delta removeStat(String stat) {
- Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
- Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- return this;
- }
-
- /**
- * Remove an alert specification for the cluster
- * @param alert alert specification
- * @return Delta
- */
- public Delta removeAlert(String alert) {
- Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
- if (!currAlertMap.containsKey(alert)) {
- Map<String, String> parsedAlert = Maps.newHashMap();
- StringBuilder statsName = new StringBuilder();
- AlertsHolder.parseAlert(alert, statsName, parsedAlert);
- removeStat(statsName.toString());
- currAlertMap.put(alert, parsedAlert);
- }
- return this;
- }
-
- /**
* Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
* @param orig the original ClusterConfig
* @return updated ClusterConfig
@@ -494,8 +413,7 @@ public class ClusterConfig {
.addParticipants(orig.getParticipantMap().values())
.addStateModelDefinitions(orig.getStateModelMap().values())
.userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
- .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
- .addAlerts(orig.getAlerts());
+ .autoJoin(orig.autoJoinAllowed());
for (Fields field : _updateFields) {
switch (field) {
case USER_CONFIG:
@@ -529,29 +447,8 @@ public class ClusterConfig {
builder.addConstraint(constraints);
}
- // add stats and alerts
- builder.addStats(deltaConfig.getStats());
- builder.addAlerts(deltaConfig.getAlerts());
-
// get the result
- ClusterConfig result = builder.build();
-
- // remove stats
- PersistentStats stats = result.getStats();
- for (String removedStat : _removedStats.getMapFields().keySet()) {
- if (stats.getMapFields().containsKey(removedStat)) {
- stats.getMapFields().remove(removedStat);
- }
- }
-
- // remove alerts
- Alerts alerts = result.getAlerts();
- for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
- if (alerts.getMapFields().containsKey(removedAlert)) {
- alerts.getMapFields().remove(removedAlert);
- }
- }
- return result;
+ return builder.build();
}
}
@@ -565,8 +462,6 @@ public class ClusterConfig {
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
private UserConfig _userConfig;
- private PersistentStats _stats;
- private Alerts _alerts;
private boolean _isPaused;
private boolean _autoJoin;
@@ -583,8 +478,6 @@ public class ClusterConfig {
_isPaused = false;
_autoJoin = false;
_userConfig = new UserConfig(Scope.cluster(id));
- _stats = new PersistentStats(PersistentStats.nodeName);
- _alerts = new Alerts(Alerts.nodeName);
}
/**
@@ -789,74 +682,6 @@ public class ClusterConfig {
}
/**
- * Add a statistic specification to the cluster. Existing specifications will not be overwritten
- * @param stat String specifying the stat specification
- * @return Builder
- */
- public Builder addStat(String stat) {
- Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
- Map<String, Map<String, String>> currentStats = _stats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- if (!currentStats.containsKey(statName)) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- }
- return this;
- }
-
- /**
- * Add statistic specifications to the cluster. Existing specifications will not be overwritten
- * @param stats PersistentStats specifying the stat specification
- * @return Builder
- */
- public Builder addStats(PersistentStats stats) {
- if (stats == null) {
- return this;
- }
- Map<String, Map<String, String>> parsedStat = stats.getMapFields();
- Map<String, Map<String, String>> currentStats = _stats.getMapFields();
- for (String statName : parsedStat.keySet()) {
- if (!currentStats.containsKey(statName)) {
- currentStats.put(statName, parsedStat.get(statName));
- }
- }
- return this;
- }
-
- /**
- * Add alert specifications to the cluster. Existing specifications will not be overwritten
- * @param alert string representing alert specifications
- * @return Builder
- */
- public Builder addAlert(String alert) {
- Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
- if (!currAlertMap.containsKey(alert)) {
- Map<String, String> parsedAlert = Maps.newHashMap();
- StringBuilder statsName = new StringBuilder();
- AlertsHolder.parseAlert(alert, statsName, parsedAlert);
- addStat(statsName.toString());
- currAlertMap.put(alert, parsedAlert);
- }
- return this;
- }
-
- /**
- * Add alert specifications to the cluster. Existing specifications will not be overwritten
- * @param alerts Alerts instance
- * @return Builder
- */
- public Builder addAlerts(Alerts alerts) {
- if (alerts == null) {
- return this;
- }
- Map<String, Map<String, String>> alertMap = alerts.getMapFields();
- for (String alert : alertMap.keySet()) {
- addAlert(alert);
- }
- return this;
- }
-
- /**
* Set the paused status of the cluster
* @param isPaused true if paused, false otherwise
* @return Builder
@@ -892,7 +717,7 @@ public class ClusterConfig {
*/
public ClusterConfig build() {
return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
- _stats, _alerts, _userConfig, _isPaused, _autoJoin);
+ _userConfig, _isPaused, _autoJoin);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2b2a71e..b11ab9c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -48,6 +48,7 @@ import org.apache.helix.api.id.SessionId;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
@@ -121,6 +122,11 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
int _timerPeriod = Integer.MAX_VALUE;
/**
+ * A cache maintained across pipelines
+ */
+ private ClusterDataCache _cache;
+
+ /**
* Default constructor that creates a default pipeline registry. This is sufficient in
* most cases, but if there is a some thing specific needed use another constructor
* where in you can pass a pipeline registry
@@ -138,6 +144,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void run() {
+ _cache.requireFullRefresh();
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
ClusterEvent event = new ClusterEvent("periodicalRebalance");
@@ -228,6 +235,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
_registry = registry;
_lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
_lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+ _cache = new ClusterDataCache();
_eventQueue = new ClusterEventBlockingQueue();
_eventThread = new ClusterEventProcessor();
_eventThread.start();
@@ -276,6 +284,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
}
+ // add the cache
+ event.addAttribute("ClusterDataCache", _cache);
+
List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
if (pipelines == null || pipelines.size() == 0) {
logger.info("No pipeline to run for event:" + event.getName());
@@ -318,6 +329,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onStateChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
ClusterEvent event = new ClusterEvent("currentStateChange");
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("instanceName", instanceName);
@@ -341,6 +355,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onMessage()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
ClusterEvent event = new ClusterEvent("messageChange");
event.addAttribute("helixmanager", changeContext.getManager());
@@ -360,10 +377,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
if (liveInstances == null) {
liveInstances = Collections.emptyList();
}
+ _cache.setLiveInstances(liveInstances);
+
// Go though the live instance list and make sure that we are observing them
// accordingly. The action is done regardless of the paused flag.
if (changeContext.getType() == NotificationContext.Type.INIT
@@ -403,6 +425,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
+
+ if (idealStates == null) {
+ idealStates = Collections.emptyList();
+ }
+ _cache.setIdealStates(idealStates);
ClusterEvent event = new ClusterEvent("idealStateChange");
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("changeContext", changeContext);
@@ -419,6 +449,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
logger.info("START: GenericClusterController.onConfigChange()");
+ if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+ _cache.requireFullRefresh();
+ }
+
+ if (configs == null) {
+ configs = Collections.emptyList();
+ }
+ _cache.setInstanceConfigs(configs);
+
ClusterEvent event = new ClusterEvent("configChange");
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
@@ -438,6 +477,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
@Override
public void onControllerChange(NotificationContext changeContext) {
logger.info("START: GenericClusterController.onControllerChange()");
+ _cache.requireFullRefresh();
if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
logger.info("GenericClusterController.onControllerChange() FINALIZE");
return;
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 0c55d45..13616b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -113,8 +113,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
}
if (!taggedLiveNodes.isEmpty()) {
// live nodes exist that have this tag
- if (LOG.isInfoEnabled()) {
- LOG.info("found the following participants with tag " + config.getParticipantGroupTag()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("found the following participants with tag " + config.getParticipantGroupTag()
+ " for " + config.getResourceId() + ": " + taggedLiveNodes);
}
} else if (taggedNodes.isEmpty()) {
@@ -132,12 +132,12 @@ public class FullAutoRebalancer implements HelixRebalancer {
// determine which nodes the replicas should live on
int maxPartition = config.getMaxPartitionsPerParticipant();
- if (LOG.isInfoEnabled()) {
- LOG.info("currentMapping: " + currentMapping);
- LOG.info("stateCountMap: " + stateCountMap);
- LOG.info("liveNodes: " + liveParticipantList);
- LOG.info("allNodes: " + allParticipantList);
- LOG.info("maxPartition: " + maxPartition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentMapping: " + currentMapping);
+ LOG.debug("stateCountMap: " + stateCountMap);
+ LOG.debug("liveNodes: " + liveParticipantList);
+ LOG.debug("allNodes: " + allParticipantList);
+ LOG.debug("maxPartition: " + maxPartition);
}
ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
_algorithm =
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
index 974222d..8439583 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/RebalancerRef.java
@@ -34,25 +34,46 @@ public class RebalancerRef {
@JsonProperty("rebalancerClassName")
private final String _rebalancerClassName;
+ @JsonIgnore
+ private Class<? extends HelixRebalancer> _class;
+
@JsonCreator
private RebalancerRef(@JsonProperty("rebalancerClassName") String rebalancerClassName) {
_rebalancerClassName = rebalancerClassName;
+ _class = null;
}
/**
- * Get an instantiated Rebalancer
- * @return Rebalancer or null if instantiation failed
+ * Get an instantiated HelixRebalancer
+ * @return HelixRebalancer or null if instantiation failed
*/
@JsonIgnore
public HelixRebalancer getRebalancer() {
try {
- return (HelixRebalancer) (HelixUtil.loadClass(getClass(), _rebalancerClassName).newInstance());
+ return (HelixRebalancer) (getRebalancerClass().newInstance());
} catch (Exception e) {
LOG.warn("Exception while invoking custom rebalancer class:" + _rebalancerClassName, e);
}
return null;
}
+ /**
+ * Get the class object of this rebalancer ref
+ * @return Class
+ */
+ @JsonIgnore
+ public Class<? extends HelixRebalancer> getRebalancerClass() {
+ try {
+ if (_class == null) {
+ _class =
+ HelixUtil.loadClass(getClass(), _rebalancerClassName).asSubclass(HelixRebalancer.class);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while loading rebalancer class:" + _rebalancerClassName, e);
+ }
+ return _class;
+ }
+
@Override
public String toString() {
return _rebalancerClassName;
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
index 73c3ccc..a44b230 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/CustomRebalancerConfig.java
@@ -53,7 +53,12 @@ public class CustomRebalancerConfig extends PartitionedRebalancerConfig {
* Instantiate a CustomRebalancerConfig
*/
public CustomRebalancerConfig() {
- setRebalanceMode(RebalanceMode.CUSTOMIZED);
+ if (getClass().equals(CustomRebalancerConfig.class)) {
+ // only mark this as customized mode if this specifc config is used
+ setRebalanceMode(RebalanceMode.CUSTOMIZED);
+ } else {
+ setRebalanceMode(RebalanceMode.USER_DEFINED);
+ }
setRebalancerRef(RebalancerRef.from(CustomRebalancer.class));
_preferenceMaps = Maps.newHashMap();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
index 828d509..16bb4cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/FullAutoRebalancerConfig.java
@@ -30,7 +30,12 @@ import org.apache.helix.model.IdealState.RebalanceMode;
*/
public class FullAutoRebalancerConfig extends PartitionedRebalancerConfig {
public FullAutoRebalancerConfig() {
- setRebalanceMode(RebalanceMode.FULL_AUTO);
+ if (getClass().equals(FullAutoRebalancerConfig.class)) {
+ // only mark this as full auto mode if this specifc config is used
+ setRebalanceMode(RebalanceMode.FULL_AUTO);
+ } else {
+ setRebalanceMode(RebalanceMode.USER_DEFINED);
+ }
setRebalancerRef(RebalancerRef.from(FullAutoRebalancer.class));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
index 2c9769d..dd661d9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java
@@ -10,14 +10,20 @@ import org.apache.helix.api.Partition;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.FullAutoRebalancer;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
import org.apache.helix.controller.rebalancer.RebalancerRef;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskRebalancer;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -51,6 +57,23 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
private int _maxPartitionsPerParticipant;
private RebalanceMode _rebalanceMode;
+ @JsonIgnore
+ private static final Set<Class<? extends RebalancerConfig>> BUILTIN_CONFIG_CLASSES = Sets
+ .newHashSet();
+ @JsonIgnore
+ private static final Set<Class<? extends HelixRebalancer>> BUILTIN_REBALANCER_CLASSES = Sets
+ .newHashSet();
+ static {
+ BUILTIN_CONFIG_CLASSES.add(PartitionedRebalancerConfig.class);
+ BUILTIN_CONFIG_CLASSES.add(FullAutoRebalancerConfig.class);
+ BUILTIN_CONFIG_CLASSES.add(SemiAutoRebalancerConfig.class);
+ BUILTIN_CONFIG_CLASSES.add(CustomRebalancerConfig.class);
+ BUILTIN_REBALANCER_CLASSES.add(FullAutoRebalancer.class);
+ BUILTIN_REBALANCER_CLASSES.add(SemiAutoRebalancer.class);
+ BUILTIN_REBALANCER_CLASSES.add(CustomRebalancer.class);
+ BUILTIN_REBALANCER_CLASSES.add(TaskRebalancer.class);
+ }
+
/**
* Instantiate a PartitionedRebalancerConfig
*/
@@ -186,13 +209,42 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
}
/**
+ * Check if the given class is compatible with an {@link IdealState}
+ * @param clazz the PartitionedRebalancerConfig subclass
+ * @return true if IdealState can be used to describe this config, false otherwise
+ */
+ public static boolean isBuiltinConfig(Class<? extends RebalancerConfig> clazz) {
+ return BUILTIN_CONFIG_CLASSES.contains(clazz);
+ }
+
+ /**
+ * Check if the given class is a built-in rebalancer class
+ * @param clazz the HelixRebalancer subclass
+ * @return true if the rebalancer class is built in, false otherwise
+ */
+ public static boolean isBuiltinRebalancer(Class<? extends HelixRebalancer> clazz) {
+ return BUILTIN_REBALANCER_CLASSES.contains(clazz);
+ }
+
+ /**
* Convert a physically-stored IdealState into a rebalancer config for a partitioned resource
* @param idealState populated IdealState
* @return PartitionedRebalancerConfig
*/
public static PartitionedRebalancerConfig from(IdealState idealState) {
PartitionedRebalancerConfig config;
- switch (idealState.getRebalanceMode()) {
+ RebalanceMode mode = idealState.getRebalanceMode();
+ if (mode == RebalanceMode.USER_DEFINED) {
+ Class<? extends RebalancerConfig> configClass = idealState.getRebalancerConfigClass();
+ if (configClass.equals(FullAutoRebalancerConfig.class)) {
+ mode = RebalanceMode.FULL_AUTO;
+ } else if (configClass.equals(SemiAutoRebalancerConfig.class)) {
+ mode = RebalanceMode.SEMI_AUTO;
+ } else if (configClass.equals(CustomRebalancerConfig.class)) {
+ mode = RebalanceMode.CUSTOMIZED;
+ }
+ }
+ switch (mode) {
case FULL_AUTO:
FullAutoRebalancerConfig.Builder fullAutoBuilder =
new FullAutoRebalancerConfig.Builder(idealState.getResourceId());
@@ -252,7 +304,8 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement
.maxPartitionsPerParticipant(idealState.getMaxPartitionsPerInstance())
.participantGroupTag(idealState.getInstanceGroupTag())
.stateModelDefId(idealState.getStateModelDefId())
- .stateModelFactoryId(idealState.getStateModelFactoryId());
+ .stateModelFactoryId(idealState.getStateModelFactoryId())
+ .rebalanceMode(idealState.getRebalanceMode());
RebalancerRef rebalancerRef = idealState.getRebalancerRef();
if (rebalancerRef != null) {
builder.rebalancerRef(rebalancerRef);
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
index 8581732..d6ddb50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/RebalancerConfigHolder.java
@@ -32,7 +32,7 @@ import org.apache.log4j.Logger;
* information specific to each rebalancer.
*/
public final class RebalancerConfigHolder {
- private enum Fields {
+ public enum Fields {
SERIALIZER_CLASS,
REBALANCER_CONFIG,
REBALANCER_CONFIG_CLASS
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index bfc3309..727c3df 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -56,7 +56,12 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
* Instantiate a SemiAutoRebalancerConfig
*/
public SemiAutoRebalancerConfig() {
- setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ if (getClass().equals(SemiAutoRebalancerConfig.class)) {
+ // only mark this as semi auto mode if this specifc config is used
+ setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ } else {
+ setRebalanceMode(RebalanceMode.USER_DEFINED);
+ }
setRebalancerRef(RebalancerRef.from(SemiAutoRebalancer.class));
_preferenceLists = Maps.newHashMap();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index ec812b2..644b9f6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -37,12 +37,14 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.FallbackRebalancer;
import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.RebalancerRef;
import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -52,6 +54,9 @@ import com.google.common.collect.Sets;
public class BestPossibleStateCalcStage extends AbstractBaseStage {
private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
+ // cache for rebalancer instances
+ private Map<ResourceId, HelixRebalancer> _rebalancerMap = Maps.newHashMap();
+
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
@@ -63,11 +68,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
event.getAttribute(AttributeName.CURRENT_STATE.toString());
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (currentStateOutput == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+ + ". Requires CURRENT_STATE|RESOURCES|Cluster");
}
BestPossibleStateOutput bestPossibleStateOutput =
@@ -178,25 +183,42 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
stateModelDefs.get(rebalancerConfig.getStateModelDefId());
ResourceAssignment resourceAssignment = null;
if (rebalancerConfig != null) {
+ // use a cached rebalancer if possible
+ RebalancerRef ref = rebalancerConfig.getRebalancerRef();
HelixRebalancer rebalancer = null;
- if (rebalancerConfig != null && rebalancerConfig.getRebalancerRef() != null) {
- rebalancer = rebalancerConfig.getRebalancerRef().getRebalancer();
+ if (_rebalancerMap.containsKey(resourceId)) {
+ HelixRebalancer candidateRebalancer = _rebalancerMap.get(resourceId);
+ if (ref != null && candidateRebalancer.getClass().equals(ref.toString())) {
+ rebalancer = candidateRebalancer;
+ }
}
- HelixManager manager = event.getAttribute("helixmanager");
- ControllerContextProvider provider =
- event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+
+ // otherwise instantiate a new one
if (rebalancer == null) {
- rebalancer = new FallbackRebalancer();
+ if (ref != null) {
+ rebalancer = ref.getRebalancer();
+ }
+ HelixManager manager = event.getAttribute("helixmanager");
+ ControllerContextProvider provider =
+ event.getAttribute(AttributeName.CONTEXT_PROVIDER.toString());
+ if (rebalancer == null) {
+ rebalancer = new FallbackRebalancer();
+ }
+ rebalancer.init(manager, provider);
+ _rebalancerMap.put(resourceId, rebalancer);
}
- rebalancer.init(manager, provider);
ResourceAssignment currentAssignment = null;
Resource resourceSnapshot = cluster.getResource(resourceId);
if (resourceSnapshot != null) {
currentAssignment = resourceSnapshot.getResourceAssignment();
}
- resourceAssignment =
- rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
- currentStateOutput);
+ try {
+ resourceAssignment =
+ rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster,
+ currentStateOutput);
+ } catch (Exception e) {
+ LOG.error("Rebalancer for resource " + resourceId + " failed.", e);
+ }
}
if (resourceAssignment == null) {
resourceAssignment =
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6f09d26..0c28bdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -22,12 +22,18 @@ package org.apache.helix.controller.stages;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
@@ -35,22 +41,43 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
* Reads the data from the cluster using data accessor. This output ClusterData which
* provides useful methods to search/lookup properties
*/
-@Deprecated
public class ClusterDataCache {
Map<String, LiveInstance> _liveInstanceMap;
+ Map<String, LiveInstance> _liveInstanceCacheMap;
Map<String, IdealState> _idealStateMap;
+ Map<String, IdealState> _idealStateCacheMap;
Map<String, StateModelDefinition> _stateModelDefMap;
Map<String, InstanceConfig> _instanceConfigMap;
+ Map<String, InstanceConfig> _instanceConfigCacheMap;
Map<String, ClusterConstraints> _constraintMap;
Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
Map<String, Map<String, Message>> _messageMap;
+ Map<String, Map<String, String>> _idealStateRuleMap;
+ Map<String, ResourceConfiguration> _resourceConfigMap;
+ Map<String, ControllerContextHolder> _controllerContextMap;
+ PauseSignal _pause;
+ LiveInstance _leader;
+ ClusterConfiguration _clusterConfig;
+ boolean _writeAssignments;
+
+ // maintain a cache of participant messages across pipeline runs
+ Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+ boolean _init = true;
// Map<String, Map<String, HealthStat>> _healthStatMap;
// private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
@@ -66,39 +93,111 @@ public class ClusterDataCache {
* @param accessor
* @return
*/
- public boolean refresh(HelixDataAccessor accessor) {
+ public synchronized boolean refresh(HelixDataAccessor accessor) {
+ LOG.info("START: ClusterDataCache.refresh()");
+ long startTime = System.currentTimeMillis();
+
Builder keyBuilder = accessor.keyBuilder();
- _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+
+ if (_init) {
+ _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+ _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+ _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ }
+ _idealStateMap = Maps.newHashMap(_idealStateCacheMap);
+ _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap);
+ _instanceConfigMap = Maps.newHashMap(_instanceConfigCacheMap);
for (LiveInstance instance : _liveInstanceMap.values()) {
- LOG.trace("live instance: " + instance.getParticipantId() + " "
- + instance.getTypedSessionId());
+ LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
}
_stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
- _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
_constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>();
+ List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+ long purgeSum = 0;
for (String instanceName : _liveInstanceMap.keySet()) {
- Map<String, Message> map = accessor.getChildValuesMap(keyBuilder.messages(instanceName));
- msgMap.put(instanceName, map);
+ // get the cache
+ Map<String, Message> cachedMap = _messageCache.get(instanceName);
+ if (cachedMap == null) {
+ cachedMap = Maps.newHashMap();
+ _messageCache.put(instanceName, cachedMap);
+ }
+ msgMap.put(instanceName, cachedMap);
+
+ // get the current names
+ Set<String> messageNames =
+ Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+ long purgeStart = System.currentTimeMillis();
+ // clear stale names
+ Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+ while (cachedNamesIter.hasNext()) {
+ String messageName = cachedNamesIter.next();
+ if (!messageNames.contains(messageName)) {
+ cachedNamesIter.remove();
+ }
+ }
+ long purgeEnd = System.currentTimeMillis();
+ purgeSum += purgeEnd - purgeStart;
+
+ // get the keys for the new messages
+ for (String messageName : messageNames) {
+ if (!cachedMap.containsKey(messageName)) {
+ newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+ }
+ }
+ }
+
+ // get the new messages
+ if (newMessageKeys.size() > 0) {
+ List<Message> newMessages = accessor.getProperty(newMessageKeys);
+ for (Message message : newMessages) {
+ if (message != null) {
+ Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+ cachedMap.put(message.getId(), message);
+ }
+ }
}
_messageMap = Collections.unmodifiableMap(msgMap);
+ LOG.debug("Purge took: " + purgeSum);
+ List<PropertyKey> currentStateKeys = Lists.newLinkedList();
Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap =
new HashMap<String, Map<String, Map<String, CurrentState>>>();
for (String instanceName : _liveInstanceMap.keySet()) {
LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
- String sessionId = liveInstance.getTypedSessionId().stringify();
- if (!allCurStateMap.containsKey(instanceName)) {
- allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+ String sessionId = liveInstance.getSessionId();
+ List<String> currentStateNames =
+ accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+ for (String currentStateName : currentStateNames) {
+ currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+ }
+
+ // ensure an empty current state map for all live instances and sessions
+ Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName);
+ if (instanceCurStateMap == null) {
+ instanceCurStateMap = Maps.newHashMap();
+ allCurStateMap.put(instanceName, instanceCurStateMap);
+ }
+ Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+ if (sessionCurStateMap == null) {
+ sessionCurStateMap = Maps.newHashMap();
+ instanceCurStateMap.put(sessionId, sessionCurStateMap);
+ }
+ }
+ List<CurrentState> currentStates = accessor.getProperty(currentStateKeys);
+ Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator();
+ for (CurrentState currentState : currentStates) {
+ PropertyKey key = csKeyIter.next();
+ String[] params = key.getParams();
+ if (currentState != null && params.length >= 4) {
+ Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]);
+ Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]);
+ sessionCurStateMap.put(params[3], currentState);
}
- Map<String, Map<String, CurrentState>> curStateMap = allCurStateMap.get(instanceName);
- Map<String, CurrentState> map =
- accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
- curStateMap.put(sessionId, map);
}
for (String instance : allCurStateMap.keySet()) {
@@ -106,10 +205,63 @@ public class ClusterDataCache {
}
_currentStateMap = Collections.unmodifiableMap(allCurStateMap);
+ // New in 0.7: Read more information for the benefit of user-defined rebalancers
+ _resourceConfigMap = accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+ _controllerContextMap = accessor.getChildValuesMap(keyBuilder.controllerContexts());
+
+ // Read all single properties together
+ List<HelixProperty> singleProperties =
+ accessor.getProperty(ImmutableList.of(keyBuilder.clusterConfig(),
+ keyBuilder.controllerLeader(), keyBuilder.pause()));
+ _clusterConfig = (ClusterConfiguration) singleProperties.get(0);
+ if (_clusterConfig != null) {
+ _idealStateRuleMap = _clusterConfig.getIdealStateRules();
+ } else {
+ _idealStateRuleMap = Collections.emptyMap();
+ }
+ _leader = (LiveInstance) singleProperties.get(1);
+ _pause = (PauseSignal) singleProperties.get(2);
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
+
+ if (LOG.isDebugEnabled()) {
+ int numPaths =
+ _liveInstanceMap.size() + _idealStateMap.size() + +_resourceConfigMap.size()
+ + _stateModelDefMap.size() + _instanceConfigMap.size() + _constraintMap.size()
+ + _controllerContextMap.size() + newMessageKeys.size() + currentStateKeys.size();
+ LOG.debug("Paths read: " + numPaths);
+ }
+
+ _init = false;
return true;
}
/**
+ * Get the live instance associated with the controller leader
+ * @return LiveInstance
+ */
+ public LiveInstance getLeader() {
+ return _leader;
+ }
+
+ /**
+ * Get the pause signal (if any)
+ * @return PauseSignal
+ */
+ public PauseSignal getPauseSignal() {
+ return _pause;
+ }
+
+ /**
+ * Retrieves the configs for all resources
+ * @return
+ */
+ public Map<String, ResourceConfiguration> getResourceConfigs() {
+ return _resourceConfigMap;
+ }
+
+ /**
* Retrieves the idealstates for all resources
* @return
*/
@@ -117,6 +269,18 @@ public class ClusterDataCache {
return _idealStateMap;
}
+ public synchronized void setIdealStates(List<IdealState> idealStates) {
+ Map<String, IdealState> idealStateMap = Maps.newHashMap();
+ for (IdealState idealState : idealStates) {
+ idealStateMap.put(idealState.getId(), idealState);
+ }
+ _idealStateCacheMap = idealStateMap;
+ }
+
+ public Map<String, Map<String, String>> getIdealStateRules() {
+ return _idealStateRuleMap;
+ }
+
/**
* Returns the LiveInstances for each of the instances that are curretnly up and running
* @return
@@ -125,6 +289,14 @@ public class ClusterDataCache {
return _liveInstanceMap;
}
+ public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
+ Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+ for (LiveInstance liveInstance : liveInstances) {
+ liveInstanceMap.put(liveInstance.getId(), liveInstance);
+ }
+ _liveInstanceCacheMap = liveInstanceMap;
+ }
+
/**
* Provides the current state of the node for a given session id,
* the sessionid can be got from LiveInstance
@@ -133,6 +305,10 @@ public class ClusterDataCache {
* @return
*/
public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
+ if (!_currentStateMap.containsKey(instanceName)
+ || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
+ return Collections.emptyMap();
+ }
return _currentStateMap.get(instanceName).get(clientSessionId);
}
@@ -150,6 +326,14 @@ public class ClusterDataCache {
}
}
+ /**
+ * Provides all outstanding messages
+ * @return
+ */
+ public Map<String, Map<String, Message>> getMessageMap() {
+ return _messageMap;
+ }
+
// public HealthStat getGlobalStats()
// {
// return _globalStats;
@@ -187,11 +371,18 @@ public class ClusterDataCache {
* @return
*/
public StateModelDefinition getStateModelDef(String stateModelDefRef) {
-
return _stateModelDefMap.get(stateModelDefRef);
}
/**
+ * Get all state model definitions
+ * @return map of name to state model definition
+ */
+ public Map<String, StateModelDefinition> getStateModelDefMap() {
+ return _stateModelDefMap;
+ }
+
+ /**
* Provides the idealstate for a given resource
* @param resourceName
* @return
@@ -208,6 +399,14 @@ public class ClusterDataCache {
return _instanceConfigMap;
}
+ public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
+ Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
+ }
+ _instanceConfigCacheMap = instanceConfigMap;
+ }
+
/**
* Some partitions might be disabled on specific nodes.
* This method allows one to fetch the set of nodes where a given partition is disabled
@@ -266,6 +465,55 @@ public class ClusterDataCache {
return null;
}
+ public Map<String, ClusterConstraints> getConstraintMap() {
+ return _constraintMap;
+ }
+
+ public Map<String, ControllerContextHolder> getContextMap() {
+ return _controllerContextMap;
+ }
+
+ public ClusterConfiguration getClusterConfig() {
+ return _clusterConfig;
+ }
+
+ public void cacheMessages(List<Message> messages) {
+ for (Message message : messages) {
+ String instanceName = message.getTgtName();
+ Map<String, Message> instMsgMap = null;
+ if (_messageCache.containsKey(instanceName)) {
+ instMsgMap = _messageCache.get(instanceName);
+ } else {
+ instMsgMap = Maps.newHashMap();
+ _messageCache.put(instanceName, instMsgMap);
+ }
+ instMsgMap.put(message.getId(), message);
+ }
+ }
+
+ /**
+ * Enable or disable writing resource assignments
+ * @param enable true to enable, false to disable
+ */
+ public void setAssignmentWritePolicy(boolean enable) {
+ _writeAssignments = enable;
+ }
+
+ /**
+ * Check if writing resource assignments is enabled
+ * @return true if enabled, false if disabled
+ */
+ public boolean assignmentWriteEnabled() {
+ return _writeAssignments;
+ }
+
+ /**
+ * Indicate that a full read should be done on the next refresh
+ */
+ public synchronized void requireFullRefresh() {
+ _init = true;
+ }
+
/**
* toString method to print the entire cluster state
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 532ecb5..15264ca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -40,10 +40,10 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
HelixManager manager = event.getAttribute("helixmanager");
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (manager == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager | DataCache");
+ + ". Requires HelixManager | Cluster");
}
HelixManagerProperties properties = manager.getProperties();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 8235173..64bf792 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -47,13 +47,13 @@ import org.apache.helix.model.Message.MessageType;
public class CurrentStateComputationStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
if (cluster == null || resourceMap == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCE");
+ + ". Requires Cluster|RESOURCE");
}
ResourceCurrentState currentStateOutput = new ResourceCurrentState();
http://git-wip-us.apache.org/repos/asf/helix/blob/51329f6f/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e8e42bf..a46acbd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -65,11 +65,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
HelixManager manager = event.getAttribute("helixmanager");
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- Cluster cluster = event.getAttribute("ClusterDataCache");
+ Cluster cluster = event.getAttribute("Cluster");
if (manager == null || resourceMap == null || cluster == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires ClusterManager|RESOURCES|DataCache");
+ + ". Requires ClusterManager|RESOURCES|Cluster");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();