You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:37 UTC
[29/53] [abbrv] git commit: [HELIX-238] Updated accessors for a new
cluster setup
[HELIX-238] Updated accessors for a new cluster setup
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/dc94c8c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/dc94c8c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/dc94c8c1
Branch: refs/heads/master
Commit: dc94c8c18013b44d85dc68f298937d5753e3d0f5
Parents: 19cdf76
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Oct 3 17:27:58 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:35 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/api/Participant.java | 2 +-
.../helix/api/accessor/ClusterAccessor.java | 249 +++--
.../helix/api/accessor/ParticipantAccessor.java | 10 +
.../helix/api/accessor/ResourceAccessor.java | 9 +
.../accessor/StateModelDefinitionAccessor.java | 70 --
.../rebalancer/context/RebalancerConfig.java | 8 +
.../util/NewConstraintBasedAssignment.java | 2 +-
.../apache/helix/model/HelixConfigScope.java | 17 +
.../org/apache/helix/tools/NewClusterSetup.java | 1041 ++++++++++++++++++
9 files changed, 1238 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 0e0de9d..53f4038 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -111,7 +111,7 @@ public class Participant {
* Get disabled partition id's
* @return set of disabled partition id's, or empty set if none
*/
- public Set<PartitionId> getDisablePartitionIds() {
+ public Set<PartitionId> getDisabledPartitionIds() {
return _config.getDisabledPartitions();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 8768d8e..5c7df85 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -63,6 +63,8 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import org.testng.internal.annotations.Sets;
+import com.google.common.collect.Maps;
+
public class ClusterAccessor {
private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
@@ -179,55 +181,81 @@ public class ClusterAccessor {
* @return cluster snapshot
*/
public Cluster readCluster() {
- /**
- * map of instance-id to instance-config
- */
- Map<String, InstanceConfig> instanceConfigMap =
- _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
/**
- * map of resource-id to ideal-state
+ * map of constraint-type to constraints
*/
- Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+ Map<String, ClusterConstraints> constraintMap =
+ _accessor.getChildValuesMap(_keyBuilder.constraints());
- /**
- * map of instance-id to live-instance
- */
- Map<String, LiveInstance> liveInstanceMap =
- _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+ // read all the resources
+ Map<ResourceId, Resource> resourceMap = readResources();
- /**
- * map of participant-id to map of message-id to message
- */
- Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
- for (String instanceName : liveInstanceMap.keySet()) {
- Map<String, Message> instanceMsgMap =
- _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
- messageMap.put(instanceName, instanceMsgMap);
+ // read all the participants
+ Map<ParticipantId, Participant> participantMap = readParticipants();
+
+ // read the controllers
+ Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
+ ControllerId leaderId = null;
+ if (leader != null) {
+ leaderId = ControllerId.from(leader.getId());
+ controllerMap.put(leaderId, new Controller(leaderId, leader, true));
}
- /**
- * map of participant-id to map of resource-id to current-state
- */
- Map<String, Map<String, CurrentState>> currentStateMap =
- new HashMap<String, Map<String, CurrentState>>();
- for (String participantName : liveInstanceMap.keySet()) {
- LiveInstance liveInstance = liveInstanceMap.get(participantName);
- SessionId sessionId = liveInstance.getSessionId();
- Map<String, CurrentState> instanceCurStateMap =
- _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
- sessionId.stringify()));
+ // read the constraints
+ Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+ new HashMap<ConstraintType, ClusterConstraints>();
+ for (String constraintType : constraintMap.keySet()) {
+ clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+ constraintMap.get(constraintType));
+ }
- currentStateMap.put(participantName, instanceCurStateMap);
+ // read the pause status
+ PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ boolean isPaused = pauseSignal != null;
+
+ ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ boolean autoJoinAllowed = false;
+ UserConfig userConfig;
+ if (clusterUserConfig != null) {
+ userConfig = UserConfig.from(clusterUserConfig);
+ autoJoinAllowed = clusterUserConfig.autoJoinAllowed();
+ } else {
+ userConfig = new UserConfig(Scope.cluster(_clusterId));
}
- LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+ // read the state model definitions
+ Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
+ // create the cluster snapshot object
+ return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+ clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
+ }
+
+ /**
+ * Get all the state model definitions for this cluster
+ * @return map of state model def id to state model definition
+ */
+ public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = Maps.newHashMap();
+ List<StateModelDefinition> stateModelList =
+ _accessor.getChildValues(_keyBuilder.stateModelDefs());
+ for (StateModelDefinition stateModelDef : stateModelList) {
+ stateModelDefs.put(stateModelDef.getStateModelDefId(), stateModelDef);
+ }
+ return stateModelDefs;
+ }
+
+ /**
+ * Read all resource in the cluster
+ * @return map of resource id to resource
+ */
+ public Map<ResourceId, Resource> readResources() {
/**
- * map of constraint-type to constraints
+ * map of resource-id to ideal-state
*/
- Map<String, ClusterConstraints> constraintMap =
- _accessor.getChildValuesMap(_keyBuilder.constraints());
+ Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
/**
* Map of resource id to external view
@@ -251,7 +279,7 @@ public class ClusterAccessor {
Set<String> allResources = Sets.newHashSet();
allResources.addAll(idealStateMap.keySet());
allResources.addAll(resourceConfigMap.keySet());
- Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
+ Map<ResourceId, Resource> resourceMap = Maps.newHashMap();
for (String resourceName : allResources) {
ResourceId resourceId = ResourceId.from(resourceName);
resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
@@ -259,8 +287,53 @@ public class ClusterAccessor {
externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
}
+ return resourceMap;
+ }
+
+ /**
+ * Read all participants in the cluster
+ * @return map of participant id to participant
+ */
+ public Map<ParticipantId, Participant> readParticipants() {
+ /**
+ * map of instance-id to instance-config
+ */
+ Map<String, InstanceConfig> instanceConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+
+ /**
+ * map of instance-id to live-instance
+ */
+ Map<String, LiveInstance> liveInstanceMap =
+ _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+
+ /**
+ * map of participant-id to map of message-id to message
+ */
+ Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
+ for (String instanceName : liveInstanceMap.keySet()) {
+ Map<String, Message> instanceMsgMap =
+ _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
+ messageMap.put(instanceName, instanceMsgMap);
+ }
+
+ /**
+ * map of participant-id to map of resource-id to current-state
+ */
+ Map<String, Map<String, CurrentState>> currentStateMap =
+ new HashMap<String, Map<String, CurrentState>>();
+ for (String participantName : liveInstanceMap.keySet()) {
+ LiveInstance liveInstance = liveInstanceMap.get(participantName);
+ SessionId sessionId = liveInstance.getSessionId();
+ Map<String, CurrentState> instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+
+ currentStateMap.put(participantName, instanceCurStateMap);
+ }
+
// read all the participants
- Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
+ Map<ParticipantId, Participant> participantMap = Maps.newHashMap();
for (String participantName : instanceConfigMap.keySet()) {
InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
UserConfig userConfig = UserConfig.from(instanceConfig);
@@ -274,45 +347,7 @@ public class ClusterAccessor {
currentStateMap.get(participantName)));
}
- // read the controllers
- Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
- ControllerId leaderId = null;
- if (leader != null) {
- leaderId = ControllerId.from(leader.getId());
- controllerMap.put(leaderId, new Controller(leaderId, leader, true));
- }
-
- // read the constraints
- Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
- new HashMap<ConstraintType, ClusterConstraints>();
- for (String constraintType : constraintMap.keySet()) {
- clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
- constraintMap.get(constraintType));
- }
-
- // read the pause status
- PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
- boolean isPaused = pauseSignal != null;
-
- ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
- boolean autoJoinAllowed = false;
- UserConfig userConfig;
- if (clusterUserConfig != null) {
- userConfig = UserConfig.from(clusterUserConfig);
- autoJoinAllowed = clusterUserConfig.autoJoinAllowed();
- } else {
- userConfig = new UserConfig(Scope.cluster(_clusterId));
- }
-
- // read the state model definitions
- StateModelDefinitionAccessor stateModelDefAccessor =
- new StateModelDefinitionAccessor(_accessor);
- Map<StateModelDefId, StateModelDefinition> stateModelMap =
- stateModelDefAccessor.readStateModelDefinitions();
-
- // create the cluster snapshot object
- return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
+ return participantMap;
}
/**
@@ -335,6 +370,14 @@ public class ClusterAccessor {
}
/**
+ * Clear any user-specified configuration from the cluster
+ * @return true if the config was cleared, false otherwise
+ */
+ public boolean dropUserConfig() {
+ return setUserConfig(new UserConfig(Scope.cluster(_clusterId)));
+ }
+
+ /**
* Add user configuration to the existing cluster user configuration. Overwrites properties with
* the same key
* @param userConfig the user config key-value pairs to add
@@ -438,8 +481,18 @@ public class ClusterAccessor {
* @return true if valid or false otherwise
*/
public boolean isClusterStructureValid() {
- List<String> paths = getRequiredPaths();
- BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ return isClusterStructureValid(_clusterId, _accessor.getBaseDataAccessor());
+ }
+
+ /**
+ * check if cluster structure is valid
+ * @param clusterId the cluster to check
+ * @param baseAccessor a base data accessor
+ * @return true if valid or false otherwise
+ */
+ private static boolean isClusterStructureValid(ClusterId clusterId,
+ BaseDataAccessor<?> baseAccessor) {
+ List<String> paths = getRequiredPaths(clusterId);
if (baseAccessor != null) {
boolean[] existsResults = baseAccessor.exists(paths, 0);
for (boolean exists : existsResults) {
@@ -456,7 +509,7 @@ public class ClusterAccessor {
*/
private void initClusterStructure() {
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
- List<String> paths = getRequiredPaths();
+ List<String> paths = getRequiredPaths(_clusterId);
for (String path : paths) {
boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
if (!status && LOG.isDebugEnabled()) {
@@ -467,25 +520,25 @@ public class ClusterAccessor {
/**
* Get all property paths that must be set for a cluster structure to be valid
+ * @param the cluster that the paths will be relative to
* @return list of paths as strings
*/
- private List<String> getRequiredPaths() {
+ private static List<String> getRequiredPaths(ClusterId clusterId) {
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
List<String> paths = new ArrayList<String>();
- paths.add(_keyBuilder.cluster().getPath());
- paths.add(_keyBuilder.idealStates().getPath());
- paths.add(_keyBuilder.clusterConfigs().getPath());
- paths.add(_keyBuilder.instanceConfigs().getPath());
- paths.add(_keyBuilder.resourceConfigs().getPath());
- paths.add(_keyBuilder.propertyStore().getPath());
- paths.add(_keyBuilder.liveInstances().getPath());
- paths.add(_keyBuilder.instances().getPath());
- paths.add(_keyBuilder.externalViews().getPath());
- paths.add(_keyBuilder.controller().getPath());
- paths.add(_keyBuilder.stateModelDefs().getPath());
- paths.add(_keyBuilder.controllerMessages().getPath());
- paths.add(_keyBuilder.controllerTaskErrors().getPath());
- paths.add(_keyBuilder.controllerTaskStatuses().getPath());
- paths.add(_keyBuilder.controllerLeaderHistory().getPath());
+ paths.add(keyBuilder.cluster().getPath());
+ paths.add(keyBuilder.clusterConfigs().getPath());
+ paths.add(keyBuilder.instanceConfigs().getPath());
+ paths.add(keyBuilder.propertyStore().getPath());
+ paths.add(keyBuilder.liveInstances().getPath());
+ paths.add(keyBuilder.instances().getPath());
+ paths.add(keyBuilder.externalViews().getPath());
+ paths.add(keyBuilder.controller().getPath());
+ paths.add(keyBuilder.stateModelDefs().getPath());
+ paths.add(keyBuilder.controllerMessages().getPath());
+ paths.add(keyBuilder.controllerTaskErrors().getPath());
+ paths.add(keyBuilder.controllerTaskStatuses().getPath());
+ paths.add(keyBuilder.controllerLeaderHistory().getPath());
return paths;
}
@@ -578,8 +631,8 @@ public class ClusterAccessor {
return false;
}
- StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
- return smdAccessor.setStateModelDefinition(stateModelDef);
+ return _accessor
+ .createProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 7e74fc7..e139c2e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -36,6 +36,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Participant;
import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.MessageId;
@@ -296,6 +297,15 @@ public class ParticipantAccessor {
}
/**
+ * Clear any user-specified configuration from the participant
+ * @param participantId the participant to update
+ * @return true if the config was cleared, false otherwise
+ */
+ public boolean dropUserConfig(ParticipantId participantId) {
+ return setUserConfig(participantId, new UserConfig(Scope.participant(participantId)));
+ }
+
+ /**
* Update a participant configuration
* @param participantId the participant to update
* @param participantDelta changes to the participant
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 0b39d36..e5c9443 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -196,6 +196,15 @@ public class ResourceAccessor {
}
/**
+ * Clear any user-specified configuration from the resource
+ * @param resourceId the resource to update
+ * @return true if the config was cleared, false otherwise
+ */
+ public boolean dropUserConfig(ResourceId resourceId) {
+ return setUserConfig(resourceId, new UserConfig(Scope.resource(resourceId)));
+ }
+
+ /**
* Persist an existing resource's logical configuration
* @param resourceConfig logical resource configuration
* @return true if resource is set, false otherwise
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
deleted file mode 100644
index 3816507..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.helix.api.accessor;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.StateModelDefinition;
-
-import com.google.common.collect.ImmutableMap;
-
-public class StateModelDefinitionAccessor {
- private final HelixDataAccessor _accessor;
- private final PropertyKey.Builder _keyBuilder;
-
- /**
- * @param accessor
- */
- public StateModelDefinitionAccessor(HelixDataAccessor accessor) {
- _accessor = accessor;
- _keyBuilder = accessor.keyBuilder();
- }
-
- /**
- * Get all of the state model definitions available to the cluster
- * @return map of state model ids to state model definition objects
- */
- public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
- Map<String, StateModelDefinition> stateModelDefs =
- _accessor.getChildValuesMap(_keyBuilder.stateModelDefs());
- Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
- new HashMap<StateModelDefId, StateModelDefinition>();
-
- for (String stateModelDefName : stateModelDefs.keySet()) {
- stateModelDefMap.put(StateModelDefId.from(stateModelDefName),
- stateModelDefs.get(stateModelDefName));
- }
-
- return ImmutableMap.copyOf(stateModelDefMap);
- }
-
- /**
- * Set a state model definition. Adds the state model definition if it does not exist
- * @param stateModelDef fully initialized state model definition
- * @return true if the model is persisted, false otherwise
- */
- public boolean setStateModelDefinition(StateModelDefinition stateModelDef) {
- return _accessor.setProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
index e3ba6f0..c64941c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/context/RebalancerConfig.java
@@ -142,6 +142,14 @@ public final class RebalancerConfig {
}
/**
+ * Get the rebalancer context serialized as a string
+ * @return string representing the context
+ */
+ public String getSerializedContext() {
+ return _config.getSimpleField(Fields.REBALANCER_CONTEXT.toString());
+ }
+
+ /**
* Convert this to a namespaced config
* @return NamespacedConfig
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 4f2e10c..7bc2769 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -66,7 +66,7 @@ public class NewConstraintBasedAssignment {
public boolean apply(ParticipantId participantId) {
Participant participant = participantMap.get(participantId);
return !participant.isEnabled()
- || participant.getDisablePartitionIds().contains(partitionId);
+ || participant.getDisabledPartitionIds().contains(partitionId);
}
});
return disabledParticipantsForPartition;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java b/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
index 9d3b41a..95c9848 100644
--- a/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
+++ b/helix-core/src/main/java/org/apache/helix/model/HelixConfigScope.java
@@ -92,6 +92,8 @@ public class HelixConfigScope {
*/
final String _participantName;
+ final String _resourceName;
+
final String _zkPath;
final String _mapKey;
@@ -131,6 +133,13 @@ public class HelixConfigScope {
_participantName = null;
}
+ // init resourceName
+ if (type == ConfigScopeProperty.RESOURCE && _isFullKey) {
+ _resourceName = zkPathKeys.get(1);
+ } else {
+ _resourceName = null;
+ }
+
_zkPath = template.instantiate(type, zkPathKeys.toArray(new String[0]));
_mapKey = mapKey;
}
@@ -160,6 +169,14 @@ public class HelixConfigScope {
}
/**
+ * Get the resource name if it exists
+ * @return the resource name if the type is RESOURCE, or null
+ */
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ /**
* Get the path to the corresponding ZNode
* @return a Zookeeper path
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/dc94c8c1/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
new file mode 100644
index 0000000..1f44e69
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java
@@ -0,0 +1,1041 @@
+package org.apache.helix.tools;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.Scope.ScopeType;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.accessor.ParticipantAccessor;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Parse command line and call helix-admin
+ */
+public class NewClusterSetup {
+
+ private static Logger LOG = Logger.getLogger(NewClusterSetup.class);
+
+ /**
+ * List all helix cluster setup options
+ */
+ public enum HelixOption {
+ // help
+ help(0, "", "Print command-line options"),
+
+ // zookeeper address
+ zkSvr(1, true, "zookeeperServerAddress", "Zookeeper address (host:port, required)"),
+
+ // list cluster/resource/instances
+ listClusters(0, "", "List clusters"),
+ listResources(1, "clusterId", "List resources in a cluster"),
+ listInstances(1, "clusterId", "List instances in a cluster"),
+
+ // add, drop, and rebalance cluster
+ addCluster(1, "clusterId", "Add a new cluster"),
+ activateCluster(3, "clusterId grandClusterId true/false",
+ "Enable/disable a cluster in distributed controller mode"),
+ dropCluster(1, "clusterId", "Delete a cluster"),
+ dropResource(2, "clusterId resourceId", "Drop a resource from a cluster"),
+ addInstance(2, "clusterId instanceId", "Add an instance to a cluster"),
+ addResource(4, "clusterId resourceId partitionNumber stateModelDefId",
+ "Add a resource to a cluster"),
+ addStateModelDef(2, "clusterId jsonFileName", "Add a state model definition to a cluster"),
+ addIdealState(2, "clusterId resourceId jsonfileName",
+ "Add an ideal state of a resource in cluster"),
+ swapInstance(3, "clusterId oldInstanceId newInstanceId",
+ "Swap an old instance in cluster with a new instance"),
+ dropInstance(2, "clusterId instanceId", "Drop an instance from a cluster"),
+ rebalance(3, "clusterId resourceId replicas", "Rebalance a resource in cluster"),
+ expandCluster(1, "clusterId", "Expand a cluster"),
+ expandResource(2, "clusterId resourceId", "Expand resource to additional nodes"),
+ @Deprecated
+ mode(1, "rebalancerMode", "Specify rebalancer mode, used with " + addResource + " command"),
+ rebalancerMode(1, "rebalancerMode", "Specify rebalancer mode, used with " + addResource
+ + " command"),
+ instanceGroupTag(1, "instanceGroupTag", "Specify instance group tag, used with " + rebalance
+ + " command"),
+ bucketSize(1, "bucketSize", "Specify bucket size, used with " + addResource + " command"),
+ resourceKeyPrefix(1, "resourceKeyPrefix", "Specify resource key prefix, used with " + rebalance
+ + " command"),
+ maxPartitionsPerNode(1, "maxPartitionsPerNode", "Specify max partitions per node, used with "
+ + addResource + " command"),
+ addResourceProperty(4, "clusterId resourceId propertyName propertyValue",
+ "Add a resource property"),
+ removeResourceProperty(3, "clusterId resourceId propertyName", "Remove a resource property"),
+ addInstanceTag(3, "clusterId instanceId tag", "Add a tag to instance"),
+ removeInstanceTag(3, "clusterId instanceId tag", "Remove a tag from instance"),
+
+ // query info
+ listClusterInfo(1, "clusterId", "Query informaton of a cluster"),
+ listInstanceInfo(2, "clusterId instanceId", "Query information of an instance in cluster"),
+ listResourceInfo(2, "clusterId resourceId", "Query information of a resource"),
+ listPartitionInfo(3, "clusterId resourceId partitionId", "Query information of a partition"),
+ listStateModels(1, "clusterId", "Query information of state models in a cluster"),
+ listStateModel(2, "clusterId stateModelDefId", "Query information of a state model in cluster"),
+
+ // enable/disable/reset instances/cluster/resource/partition
+ enableInstance(3, "clusterId instanceId true/false", "Enable/disable an instance"),
+ enablePartition(-1, "true/false clusterId instanceId resourceId partitionId...",
+ "Enable/disable partitions"),
+ enableCluster(2, "clusterId true/false", "Pause/resume the controller of a cluster"),
+ resetPartition(4, "clusterId instanceId resourceId partitionName",
+ "Reset a partition in error state"),
+ resetInstance(2, "clusterId instanceId", "Reset all partitions in error state for an instance"),
+ resetResource(2, "clusterId resourceId", "Reset all partitions in error state for a resource"),
+
+ // stats/alerts
+ addStat(2, "clusterId statName", "Add a persistent stat"),
+ addAlert(2, "clusterId alertName", "Add an alert"),
+ dropStat(2, "clusterId statName", "Drop a persistent stat"),
+ dropAlert(2, "clusterId alertName", "Drop an alert"),
+
+ // set/set/remove configs
+ getConfig(3, "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keys(e.g. k1,k2)",
+ "Get configs"),
+ setConfig(3,
+ "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keyValues(e.g. k1=v1,k2=v2)",
+ "Set configs"),
+ removeConfig(3, "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keys(e.g. k1,k2)",
+ "Remove configs"),
+
+ // get/set/remove constraints
+ getConstraints(2, "clusterId constraintType(e.g. MESSAGE_CONSTRAINT)", "Get constraints"),
+ setConstraint(
+ 4,
+ "clusterId constraintType(e.g. MESSAGE_CONSTRAINT) constraintId keyValues(e.g. k1=v1,k2=v2)",
+ "Set a constraint, create if not exist"),
+ removeConstraint(3, "clusterId constraintType(e.g. MESSAGE_CONSTRAINT) constraintId",
+ "Remove a constraint");
+
+ final int _argNum;
+ final boolean _isRequired;
+ final String _argName;
+ final String _description;
+
+ private HelixOption(int argNum, boolean isRequired, String argName, String description) {
+ _argNum = argNum;
+ _isRequired = isRequired;
+ _argName = argName;
+ _description = description;
+ }
+
+ private HelixOption(int argNum, String argName, String description) {
+ this(argNum, false, argName, description);
+ }
+ }
+
+ private final ZkClient _zkclient;
+ private final BaseDataAccessor<ZNRecord> _baseAccessor;
+
+ private NewClusterSetup(ZkClient zkclient) {
+ _zkclient = zkclient;
+ _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+ }
+
+ @SuppressWarnings("static-access")
+ static Options constructCommandLineOptions() {
+ Options options = new Options();
+
+ OptionGroup optionGroup = new OptionGroup();
+ for (HelixOption option : HelixOption.values()) {
+ Option opt =
+ OptionBuilder.withLongOpt(option.name()).hasArgs(option._argNum)
+ .isRequired(option._isRequired).withArgName(option._argName)
+ .withDescription(option._description).create();
+ if (option == HelixOption.help || option == HelixOption.zkSvr) {
+ options.addOption(opt);
+ } else {
+ optionGroup.addOption(opt);
+ }
+ }
+ options.addOptionGroup(optionGroup);
+ return options;
+ }
+
+ /**
+ * Check if we have the right number of arguments
+ * @param opt
+ * @param optValues
+ */
+ static void checkArgNum(HelixOption opt, String[] optValues) {
+
+ if (opt._argNum != -1 && opt._argNum < optValues.length) {
+ throw new IllegalArgumentException(opt + " should have no less than " + opt._argNum
+ + " arguments, but was: " + optValues.length + ", " + Arrays.asList(optValues));
+ }
+ }
+
+ static void printUsage(Options cliOptions) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + NewClusterSetup.class.getName(), cliOptions);
+ }
+
+ ClusterAccessor clusterAccessor(String clusterName) {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ return new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ }
+
+ ParticipantAccessor participantAccessor(String clusterName) {
+ return new ParticipantAccessor(new ZKHelixDataAccessor(clusterName, _baseAccessor));
+ }
+
+ ResourceAccessor resourceAccessor(String clusterName) {
+ return new ResourceAccessor(new ZKHelixDataAccessor(clusterName, _baseAccessor));
+ }
+
+ void addCluster(String[] optValues) {
+ String clusterName = optValues[0];
+
+ List<StateModelDefinition> defaultStateModelDefs = new ArrayList<StateModelDefinition>();
+ defaultStateModelDefs.add(new StateModelDefinition(StateModelConfigGenerator
+ .generateConfigForMasterSlave()));
+
+ ClusterConfig.Builder builder =
+ new ClusterConfig.Builder(ClusterId.from(clusterName))
+ .addStateModelDefinitions(defaultStateModelDefs);
+
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.createCluster(builder.build());
+ }
+
+ void addResource(String[] optValues, String[] rebalancerModeValues, String[] bucketSizeValues,
+ String[] maxPartitionsPerNodeValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ int partitionNumber = Integer.parseInt(optValues[2]);
+ String stateModelDefName = optValues[3];
+ RebalanceMode rebalancerMode =
+ rebalancerModeValues == null ? RebalanceMode.SEMI_AUTO : RebalanceMode
+ .valueOf(rebalancerModeValues[0]);
+
+ int bucketSize = bucketSizeValues == null ? 0 : Integer.parseInt(bucketSizeValues[0]);
+
+ int maxPartitionsPerNode =
+ maxPartitionsPerNodeValues == null ? -1 : Integer.parseInt(maxPartitionsPerNodeValues[0]);
+
+ ResourceId resourceId = ResourceId.from(resourceName);
+ StateModelDefId stateModelDefId = StateModelDefId.from(stateModelDefName);
+
+ IdealState idealState = new IdealState(resourceName);
+ idealState.setRebalanceMode(rebalancerMode);
+ idealState.setNumPartitions(partitionNumber);
+ idealState.setMaxPartitionsPerInstance(maxPartitionsPerNode);
+ idealState.setStateModelDefId(stateModelDefId);
+
+ RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+ ResourceConfig.Builder builder =
+ new ResourceConfig.Builder(resourceId).rebalancerContext(rebalancerCtx).bucketSize(
+ bucketSize);
+
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.addResourceToCluster(builder.build());
+
+ }
+
+ void addInstance(String[] optValues) {
+ String clusterName = optValues[0];
+ String[] instanceIds = optValues[1].split(";");
+
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ for (String instanceId : instanceIds) {
+ ParticipantConfig.Builder builder =
+ new ParticipantConfig.Builder(ParticipantId.from(instanceId));
+
+ accessor.addParticipantToCluster(builder.build());
+ }
+ }
+
+ void dropCluster(String[] optValues) {
+ String clusterName = optValues[0];
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.dropCluster();
+ }
+
+ void dropResource(String[] optValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.dropResourceFromCluster(ResourceId.from(resourceName));
+ }
+
+ void dropInstance(String[] optValues) {
+ String clusterName = optValues[0];
+ String[] instanceIds = optValues[1].split(";");
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ for (String instanceId : instanceIds) {
+ accessor.dropParticipantFromCluster(ParticipantId.from(instanceId));
+ }
+
+ }
+
+ private static byte[] readFile(String filePath) throws IOException {
+ File file = new File(filePath);
+
+ int size = (int) file.length();
+ byte[] bytes = new byte[size];
+ DataInputStream dis = null;
+ try {
+ dis = new DataInputStream(new FileInputStream(file));
+ int read = 0;
+ int numRead = 0;
+ while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+ read = read + numRead;
+ }
+ return bytes;
+ } finally {
+ if (dis != null) {
+ dis.close();
+ }
+ }
+ }
+
+ void addStateModelDef(String[] optValues) {
+ String clusterName = optValues[0];
+ String stateModelDefJsonFile = optValues[1];
+
+ try {
+ StateModelDefinition stateModelDef =
+ new StateModelDefinition(
+ (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefJsonFile))));
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.addStateModelDefinitionToCluster(stateModelDef);
+
+ } catch (IOException e) {
+ LOG.error("Could not parse the state model", e);
+ }
+
+ }
+
+ void addIdealState(String[] optValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ String idealStateJsonFile = optValues[2];
+
+ try {
+ IdealState idealState =
+ new IdealState(
+ (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateJsonFile))));
+
+ RebalancerContext rebalancerCtx = PartitionedRebalancerContext.from(idealState);
+ ResourceConfig.Builder builder =
+ new ResourceConfig.Builder(ResourceId.from(resourceName))
+ .rebalancerContext(rebalancerCtx).bucketSize(idealState.getBucketSize());
+
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ accessor.addResourceToCluster(builder.build());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ void addInstanceTag(String[] optValues) {
+ String clusterName = optValues[0];
+ String participantName = optValues[1];
+ String tag = optValues[2];
+
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ ParticipantId participantId = ParticipantId.from(participantName);
+
+ ParticipantConfig.Delta delta = new ParticipantConfig.Delta(participantId);
+ delta.addTag(tag);
+ accessor.updateParticipant(participantId, delta);
+ }
+
+ void removeInstanceTag(String[] optValues) {
+ String clusterName = optValues[0];
+ String participantName = optValues[1];
+ String tag = optValues[2];
+
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ ParticipantId participantId = ParticipantId.from(participantName);
+
+ ParticipantConfig.Delta delta = new ParticipantConfig.Delta(participantId);
+ delta.removeTag(tag);
+ accessor.updateParticipant(participantId, delta);
+ }
+
+ void listPartitionInfo(String[] optValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ String partitionName = optValues[2];
+
+ ResourceId resourceId = ResourceId.from(resourceName);
+ PartitionId partitionId = PartitionId.from(partitionName);
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ Resource resource = accessor.readResource(resourceId);
+
+ StringBuilder sb = new StringBuilder();
+ Map<ParticipantId, State> stateMap = resource.getExternalView().getStateMap(partitionId);
+ sb.append(resourceName + "/" + partitionName + ", externalView: " + stateMap);
+ PartitionedRebalancerContext partitionedContext =
+ resource.getRebalancerConfig().getRebalancerContext(PartitionedRebalancerContext.class);
+ if (partitionedContext != null) {
+ // for partitioned contexts, check the mode and apply mode-specific information if possible
+ if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerContext semiAutoContext =
+ resource.getRebalancerConfig().getRebalancerContext(SemiAutoRebalancerContext.class);
+ sb.append(", preferenceList: " + semiAutoContext.getPreferenceList(partitionId));
+ } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerContext customContext =
+ resource.getRebalancerConfig().getRebalancerContext(CustomRebalancerContext.class);
+ sb.append(", preferenceMap: " + customContext.getPreferenceMap(partitionId));
+ }
+ if (partitionedContext.anyLiveParticipant()) {
+ sb.append(", anyLiveParticipant: " + partitionedContext.anyLiveParticipant());
+ } else {
+ sb.append(", replicaCount: " + partitionedContext.getReplicaCount());
+ }
+ }
+
+ System.out.println(sb.toString());
+ }
+
+ void enableInstance(String[] optValues) {
+ String clusterName = optValues[0];
+ String instanceId = optValues[1];
+ if (instanceId.indexOf(":") != -1) {
+ instanceId = instanceId.replaceAll(":", "_");
+ }
+ boolean enabled = Boolean.parseBoolean(optValues[2].toLowerCase());
+
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ if (enabled) {
+ accessor.enableParticipant(ParticipantId.from(instanceId));
+ } else {
+ accessor.disableParticipant(ParticipantId.from(instanceId));
+ }
+ }
+
+ void enablePartition(String[] optValues) {
+ boolean enabled = Boolean.parseBoolean(optValues[0].toLowerCase());
+ String clusterName = optValues[1];
+ ParticipantId participantId = ParticipantId.from(optValues[2]);
+ ResourceId resourceId = ResourceId.from(optValues[3]);
+
+ Set<PartitionId> partitionIdSet = new HashSet<PartitionId>();
+ for (int i = 4; i < optValues.length; i++) {
+ partitionIdSet.add(PartitionId.from(optValues[i]));
+ }
+
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ if (enabled) {
+ accessor.enablePartitionsForParticipant(participantId, resourceId, partitionIdSet);
+ } else {
+ accessor.disablePartitionsForParticipant(participantId, resourceId, partitionIdSet);
+ }
+ }
+
+ void enableCluster(String[] optValues) {
+ String clusterName = optValues[0];
+ boolean enabled = Boolean.parseBoolean(optValues[1].toLowerCase());
+
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ if (enabled) {
+ accessor.resumeCluster();
+ } else {
+ accessor.pauseCluster();
+ }
+ }
+
+ /**
+ * Convert user config to key value map
+ * @param userConfig
+ * @param mapKey
+ * @param keys
+ * @return
+ */
+ private Map<String, String> keyValueMap(UserConfig userConfig, String mapKey, String[] keys) {
+ Map<String, String> results = new HashMap<String, String>();
+
+ for (String key : keys) {
+ if (mapKey == null) {
+ results.put(key, userConfig.getSimpleField(key));
+ } else {
+ results.put(key, userConfig.getMapField(mapKey).get(key));
+ }
+ }
+ return results;
+ }
+
+ void getConfig(String[] optValues) {
+ ScopeType scopeType = ScopeType.valueOf(optValues[0].toUpperCase());
+ String[] scopeArgs = optValues[1].split("[\\s,]");
+ String[] keys = optValues[2].split("[\\s,]");
+
+ String clusterName = scopeArgs[0];
+ Map<String, String> results = null;
+ switch (scopeType) {
+ case CLUSTER: {
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ results = keyValueMap(accessor.readUserConfig(), null, keys);
+ break;
+ }
+ case PARTICIPANT: {
+ ParticipantId participantId = ParticipantId.from(scopeArgs[1]);
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ results = keyValueMap(accessor.readUserConfig(participantId), null, keys);
+ break;
+ }
+ case RESOURCE: {
+ ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ results = keyValueMap(accessor.readUserConfig(resourceId), null, keys);
+ break;
+ }
+ case PARTITION: {
+ ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+ String partitionId = scopeArgs[2];
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ results = keyValueMap(accessor.readUserConfig(resourceId), partitionId, keys);
+ break;
+ }
+ default:
+ System.err.println("Non-recognized scopeType: " + scopeType);
+ break;
+ }
+
+ System.out.println(results);
+ }
+
+ /**
+ * Convert key-value map to user-config
+ * @param scope
+ * @param mapKey
+ * @param keyValues
+ * @return
+ */
+ private UserConfig userConfig(Scope<?> scope, String mapKey, String[] keyValues) {
+ UserConfig userConfig = new UserConfig(scope);
+
+ for (String keyValue : keyValues) {
+ String[] splits = keyValue.split("=");
+ String key = splits[0];
+ String value = splits[1];
+ if (mapKey == null) {
+ userConfig.setSimpleField(key, value);
+ } else {
+ if (userConfig.getMapField(mapKey) == null) {
+ userConfig.setMapField(mapKey, new TreeMap<String, String>());
+ }
+ userConfig.getMapField(mapKey).put(key, value);
+ }
+ }
+ return userConfig;
+ }
+
+ void setConfig(String[] optValues) {
+ ScopeType scopeType = ScopeType.valueOf(optValues[0].toUpperCase());
+ String[] scopeArgs = optValues[1].split("[\\s,]");
+ String[] keyValues = optValues[2].split("[\\s,]");
+
+ String clusterName = scopeArgs[0];
+ Map<String, String> results = new HashMap<String, String>();
+ switch (scopeType) {
+ case CLUSTER: {
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Scope<ClusterId> scope = Scope.cluster(ClusterId.from(clusterName));
+ UserConfig userConfig = userConfig(scope, null, keyValues);
+ accessor.setUserConfig(userConfig);
+ break;
+ }
+ case PARTICIPANT: {
+ ParticipantId participantId = ParticipantId.from(scopeArgs[1]);
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ Scope<ParticipantId> scope = Scope.participant(participantId);
+ UserConfig userConfig = userConfig(scope, null, keyValues);
+ accessor.setUserConfig(participantId, userConfig);
+ break;
+ }
+ case RESOURCE: {
+ ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ Scope<ResourceId> scope = Scope.resource(resourceId);
+ UserConfig userConfig = userConfig(scope, null, keyValues);
+ accessor.setUserConfig(resourceId, userConfig);
+ break;
+ }
+ case PARTITION: {
+ ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+ String partitionId = scopeArgs[2];
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ Scope<ResourceId> scope = Scope.resource(resourceId);
+ UserConfig userConfig = userConfig(scope, partitionId, keyValues);
+ accessor.setUserConfig(resourceId, userConfig);
+ break;
+ }
+ default:
+ System.err.println("Non-recognized scopeType: " + scopeType);
+ break;
+ }
+
+ System.out.println(results);
+ }
+
+ void setConstraint(String[] optValues) {
+ String clusterName = optValues[0];
+ String constraintType = optValues[1];
+ String constraintId = optValues[2];
+ String constraintAttributesMap = optValues[3];
+ if (clusterName == null || constraintType == null || constraintId == null
+ || constraintAttributesMap == null) {
+ System.err
+ .println("fail to set constraint. missing clusterName|constraintType|constraintId|constraintAttributesMap");
+ return;
+ }
+ ClusterId clusterId = ClusterId.from(clusterName);
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Map<String, String> constraintAttributes =
+ HelixUtil.parseCsvFormatedKeyValuePairs(constraintAttributesMap);
+ ConstraintItem item = new ConstraintItem(constraintAttributes);
+ ClusterConfig.Delta delta =
+ new ClusterConfig.Delta(clusterId).addConstraintItem(
+ ConstraintType.valueOf(constraintType), ConstraintId.from(constraintId), item);
+ accessor.updateCluster(delta);
+ }
+
+ void listClusterInfo(String[] optValues) {
+ String clusterName = optValues[0];
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Set<ResourceId> resources = accessor.readResources().keySet();
+ StringBuilder sb =
+ new StringBuilder("Existing resources in cluster ").append(clusterName).append(":\n");
+ for (ResourceId resourceId : resources) {
+ sb.append(resourceId.stringify()).append('\n');
+ }
+ Set<ParticipantId> participants = accessor.readParticipants().keySet();
+ sb.append("Participants in cluster ").append(clusterName).append(":\n");
+ for (ParticipantId participantId : participants) {
+ sb.append(participantId.stringify()).append('\n');
+ }
+ System.out.print(sb.toString());
+ }
+
+ void listParticipantInfo(String[] optValues) {
+ String clusterName = optValues[0];
+ String participantName = optValues[1];
+ ParticipantAccessor accessor = participantAccessor(clusterName);
+ ParticipantId participantId = ParticipantId.from(participantName);
+ Participant participant = accessor.readParticipant(participantId);
+ StringBuilder sb =
+ new StringBuilder("Participant ").append(participantName).append(" in cluster ")
+ .append(clusterName).append(":\n").append("hostName: ")
+ .append(participant.getHostName()).append(", port: ").append(participant.getPort())
+ .append(", enabled: ").append(participant.isEnabled()).append(", disabledPartitions: ")
+ .append(participant.getDisabledPartitionIds().toString()).append(", tags:")
+ .append(participant.getTags().toString()).append(", currentState: ")
+ .append(", messages: ").append(participant.getMessageMap().toString())
+ .append(participant.getCurrentStateMap().toString()).append(", alive: ")
+ .append(participant.isAlive()).append(", userConfig: ")
+ .append(participant.getUserConfig().toString());
+ if (participant.isAlive()) {
+ RunningInstance runningInstance = participant.getRunningInstance();
+ sb.append(", sessionId: ").append(runningInstance.getSessionId().stringify())
+ .append(", processId: ").append(runningInstance.getPid().stringify())
+ .append(", helixVersion: ").append(runningInstance.getVersion().toString());
+ }
+ System.out.println(sb.toString());
+ }
+
+ void listResourceInfo(String[] optValues) {
+ String clusterName = optValues[0];
+ String resourceName = optValues[1];
+ ResourceAccessor accessor = resourceAccessor(clusterName);
+ ResourceId resourceId = ResourceId.from(resourceName);
+ Resource resource = accessor.readResource(resourceId);
+ StringBuilder sb =
+ new StringBuilder("Resource ").append(resourceName).append(" in cluster ")
+ .append(clusterName).append(":\n").append("externalView: ")
+ .append(resource.getExternalView()).append(", userConfig: ")
+ .append(resource.getUserConfig()).append(", rebalancerContext: ")
+ .append(resource.getRebalancerConfig().getSerializedContext());
+ System.out.println(sb.toString());
+ }
+
+ void listResources(String[] optValues) {
+ String clusterName = optValues[0];
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Set<ResourceId> resources = accessor.readResources().keySet();
+ StringBuilder sb =
+ new StringBuilder("Existing resources in cluster ").append(clusterName).append(":\n");
+ for (ResourceId resourceId : resources) {
+ sb.append(resourceId.stringify()).append('\n');
+ }
+ System.out.print(sb.toString());
+ }
+
+ void listParticipants(String[] optValues) {
+ String clusterName = optValues[0];
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Set<ParticipantId> participants = accessor.readParticipants().keySet();
+ StringBuilder sb =
+ new StringBuilder("Participants in cluster ").append(clusterName).append(":\n");
+ for (ParticipantId participantId : participants) {
+ sb.append(participantId.stringify()).append('\n');
+ }
+ System.out.print(sb.toString());
+ }
+
+ void listStateModels(String[] optValues) {
+ String clusterName = optValues[0];
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Set<StateModelDefId> stateModelDefs = accessor.readStateModelDefinitions().keySet();
+ StringBuilder sb =
+ new StringBuilder("State models in cluster ").append(clusterName).append(":\n");
+ for (StateModelDefId stateModelDefId : stateModelDefs) {
+ sb.append(stateModelDefId.stringify()).append('\n');
+ }
+ System.out.print(sb.toString());
+ }
+
+ void listStateModel(String[] optValues) {
+ String clusterName = optValues[0];
+ String stateModel = optValues[1];
+ StateModelDefId stateModelDefId = StateModelDefId.from(stateModel);
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs =
+ accessor.readStateModelDefinitions();
+ StateModelDefinition stateModelDef = stateModelDefs.get(stateModelDefId);
+ StringBuilder sb = new StringBuilder("StateModelDefinition: ").append(stateModelDef.toString());
+ System.out.println(sb.toString());
+ }
+
+ void listClusters(String[] optValues) {
+ List<ClusterId> result = Lists.newArrayList();
+ List<String> clusterNames = _baseAccessor.getChildNames("/", 0);
+ for (String clusterName : clusterNames) {
+ ClusterAccessor accessor = clusterAccessor(clusterName);
+ if (accessor.isClusterStructureValid()) {
+ result.add(ClusterId.from(clusterName));
+ }
+ }
+ System.out.println("Existing clusters: " + result);
+ }
+
+ void removeConfig(String[] optValues) {
+ ScopeType type = ScopeType.valueOf(optValues[0].toUpperCase());
+ String[] scopeArgs = optValues[1].split("[\\s,]");
+ String[] keys = optValues[2].split("[\\s,]");
+ String clusterName = scopeArgs[0];
+ UserConfig userConfig;
+ switch (type) {
+ case CLUSTER:
+ ClusterAccessor clusterAccessor = clusterAccessor(clusterName);
+ userConfig = clusterAccessor.readUserConfig();
+ removeKeysFromUserConfig(userConfig, keys);
+ clusterAccessor.setUserConfig(userConfig);
+ break;
+ case RESOURCE:
+ ResourceAccessor resourceAccessor = resourceAccessor(clusterName);
+ ResourceId resourceId = ResourceId.from(scopeArgs[1]);
+ userConfig = resourceAccessor.readUserConfig(resourceId);
+ removeKeysFromUserConfig(userConfig, keys);
+ resourceAccessor.setUserConfig(resourceId, userConfig);
+ break;
+ case PARTICIPANT:
+ ParticipantAccessor participantAccessor = participantAccessor(clusterName);
+ ParticipantId participantId = ParticipantId.from(scopeArgs[1]);
+ userConfig = participantAccessor.readUserConfig(participantId);
+ removeKeysFromUserConfig(userConfig, keys);
+ participantAccessor.setUserConfig(participantId, userConfig);
+ break;
+ case PARTITION:
+ ResourceAccessor resourcePartitionAccessor = resourceAccessor(clusterName);
+ PartitionId partitionId = PartitionId.from(scopeArgs[1]);
+ userConfig = resourcePartitionAccessor.readUserConfig(partitionId.getResourceId());
+ removePartitionFromResourceUserConfig(userConfig, partitionId, keys);
+ resourcePartitionAccessor.setUserConfig(partitionId.getResourceId(), userConfig);
+ break;
+ }
+ }
+
+ private void removeKeysFromUserConfig(UserConfig userConfig, String[] keys) {
+ Map<String, String> simpleFields = Maps.newHashMap(userConfig.getSimpleFields());
+ for (String key : keys) {
+ simpleFields.remove(key);
+ }
+ userConfig.setSimpleFields(simpleFields);
+ }
+
+ private void removePartitionFromResourceUserConfig(UserConfig userConfig,
+ PartitionId partitionId, String[] keys) {
+ Map<String, String> fields = Maps.newHashMap(userConfig.getMapField(partitionId.stringify()));
+ for (String key : keys) {
+ fields.remove(key);
+ }
+ userConfig.setMapField(partitionId.stringify(), fields);
+ }
+
+ static int processCommandLineArgs(String[] cliArgs) {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try {
+ cmd = cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe) {
+ System.err.println("CommandLineClient: failed to parse command-line options: " + pe);
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+
+ String zkAddr = cmd.getOptionValue(HelixOption.zkSvr.name());
+ ZkClient zkclient = null;
+
+ try {
+ zkclient =
+ new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ new ZNRecordSerializer());
+
+ NewClusterSetup setup = new NewClusterSetup(zkclient);
+
+ Option[] options = cmd.getOptions();
+
+ for (Option option : options) {
+ if (option.getLongOpt().equals(HelixOption.zkSvr.name())) {
+ continue;
+ }
+
+ HelixOption opt = HelixOption.valueOf(option.getLongOpt());
+ String[] optValues = cmd.getOptionValues(option.getLongOpt());
+
+ checkArgNum(opt, optValues);
+
+ switch (opt) {
+ case listClusters:
+ setup.listClusters(optValues);
+ break;
+ case listResources:
+ setup.listResources(optValues);
+ break;
+ case listInstances:
+ setup.listParticipants(optValues);
+ break;
+ case addCluster:
+ setup.addCluster(optValues);
+ break;
+ case activateCluster:
+ break;
+ case dropCluster:
+ setup.dropCluster(optValues);
+ break;
+ case dropResource:
+ setup.dropResource(optValues);
+ break;
+ case addInstance:
+ setup.addInstance(optValues);
+ break;
+ case addResource:
+ String[] rebalancerModeValues = null;
+ if (cmd.hasOption(HelixOption.rebalancerMode.name())) {
+ rebalancerModeValues = cmd.getOptionValues(HelixOption.rebalancerMode.name());
+ checkArgNum(HelixOption.rebalancerMode, rebalancerModeValues);
+ }
+ String[] bucketSizeValues = null;
+ if (cmd.hasOption(HelixOption.bucketSize.name())) {
+ bucketSizeValues = cmd.getOptionValues(HelixOption.bucketSize.name());
+ checkArgNum(HelixOption.bucketSize, bucketSizeValues);
+ }
+ String[] maxPartitionsPerNodeValues = null;
+ if (cmd.hasOption(HelixOption.maxPartitionsPerNode.name())) {
+ maxPartitionsPerNodeValues =
+ cmd.getOptionValues(HelixOption.maxPartitionsPerNode.name());
+ checkArgNum(HelixOption.maxPartitionsPerNode, maxPartitionsPerNodeValues);
+ }
+ setup.addResource(optValues, rebalancerModeValues, bucketSizeValues,
+ maxPartitionsPerNodeValues);
+ break;
+ case addStateModelDef:
+ setup.addStateModelDef(optValues);
+ break;
+ case addIdealState:
+ setup.addIdealState(optValues);
+ break;
+ case swapInstance:
+ // TODO impl ClusterAccessor#swapParticipantsInCluster()
+ break;
+ case dropInstance:
+ setup.dropInstance(optValues);
+ break;
+ case rebalance:
+ // TODO impl this using ResourceAccessor
+ break;
+ case expandCluster:
+ // TODO impl this
+ break;
+ case expandResource:
+ // TODO impl this
+ break;
+ case mode:
+ case rebalancerMode:
+ case bucketSize:
+ case maxPartitionsPerNode:
+ // always used with addResource command
+ continue;
+ case instanceGroupTag:
+ case resourceKeyPrefix:
+ // always used with rebalance command
+ continue;
+ case addResourceProperty:
+ throw new UnsupportedOperationException(HelixOption.addResourceProperty
+ + " is not supported, please use setConfig");
+ case removeResourceProperty:
+ throw new UnsupportedOperationException(HelixOption.removeResourceProperty
+ + " is not supported, please use removeConfig");
+ case addInstanceTag:
+ setup.addInstanceTag(optValues);
+ break;
+ case removeInstanceTag:
+ setup.removeInstanceTag(optValues);
+ break;
+ case listClusterInfo:
+ setup.listClusterInfo(optValues);
+ break;
+ case listInstanceInfo:
+ setup.listParticipantInfo(optValues);
+ break;
+ case listResourceInfo:
+ setup.listResourceInfo(optValues);
+ break;
+ case listPartitionInfo:
+ setup.listPartitionInfo(optValues);
+ break;
+ case listStateModels:
+ setup.listStateModels(optValues);
+ break;
+ case listStateModel:
+ setup.listStateModel(optValues);
+ break;
+ case enableInstance:
+ setup.enableInstance(optValues);
+ break;
+ case enablePartition:
+ setup.enablePartition(optValues);
+ break;
+ case enableCluster:
+ setup.enableCluster(optValues);
+ break;
+ case resetPartition:
+ // TODO impl ResoourceAccessor#resetPartitions()
+ break;
+ case resetInstance:
+ // TODO impl ParticipantAccessor#resetInstance()
+ break;
+ case resetResource:
+ // TODO impl ResourceAccessor#resetResource()
+ break;
+ case addStat:
+ // TODO impl ClusterAccessor.addStat()
+ break;
+ case addAlert:
+ // TODO impl ClusterAccessor#addAlert()
+ break;
+ case dropStat:
+ // TODO impl ClusterAccessor.dropStat()
+ break;
+ case dropAlert:
+ // TODO impl ClusterAccessor#dropAlert()
+ break;
+ case getConfig:
+ setup.getConfig(optValues);
+ break;
+ case setConfig:
+ setup.setConfig(optValues);
+ break;
+ case removeConfig:
+ setup.removeConfig(optValues);
+ break;
+ case getConstraints:
+ break;
+ case setConstraint:
+ setup.setConstraint(optValues);
+ break;
+ case removeConstraint:
+ break;
+ default:
+ System.err.println("Non-recognized option: " + opt);
+ break;
+ }
+
+ // process 1 option only
+ break;
+ }
+
+ return 0;
+ } finally {
+ if (zkclient != null) {
+ zkclient.close();
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ // if (args.length == 1 && args[0].equals("setup-test-cluster")) {
+ // System.out
+ // .println("By default setting up TestCluster with 6 instances, 10 partitions, Each partition will have 3 replicas");
+ // new ClusterSetup("localhost:2181").setupTestCluster("TestCluster");
+ // System.exit(0);
+ // }
+
+ int ret = processCommandLineArgs(args);
+ System.exit(ret);
+
+ }
+}