You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/09/27 01:05:13 UTC
[4/6] [HELIX-238] Refactor, add update to accessors, test update logic
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
new file mode 100644
index 0000000..8e07d97
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -0,0 +1,553 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Controller;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.model.ClusterConfiguration;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class ClusterAccessor {
+ private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
+
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+ private final ClusterId _clusterId;
+
+ public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ _clusterId = clusterId;
+ }
+
+ /**
+ * create a new cluster, fail if it already exists
+ * @return true if created, false if creation failed
+ */
+ public boolean createCluster(ClusterConfig cluster) {
+ boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
+ if (!created) {
+ LOG.error("Cluster already created. Aborting.");
+ return false;
+ }
+ initClusterStructure();
+ Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
+ for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
+ addStateModelDefinitionToCluster(stateModelDef);
+ }
+ Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
+ for (ResourceConfig resource : resources.values()) {
+ addResourceToCluster(resource);
+ }
+ Map<ParticipantId, ParticipantConfig> participants = cluster.getParticipantMap();
+ for (ParticipantConfig participant : participants.values()) {
+ addParticipantToCluster(participant);
+ }
+ _accessor.createProperty(_keyBuilder.constraints(), null);
+ for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
+ _accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
+ constraints);
+ }
+ _accessor.createProperty(_keyBuilder.clusterConfig(),
+ ClusterConfiguration.from(cluster.getUserConfig()));
+ if (cluster.isPaused()) {
+ pauseCluster();
+ }
+
+ return true;
+ }
+
+ /**
+ * Update the cluster configuration
+ * @param clusterDelta change to the cluster configuration
+ * @return updated ClusterConfig, or null if there was an error
+ */
+ public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+ Cluster cluster = readCluster();
+ if (cluster == null) {
+ LOG.error("Cluster does not exist, cannot be updated");
+ return null;
+ }
+ ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
+ boolean status = setBasicClusterConfig(config);
+ return status ? config : null;
+ }
+
+ /**
+ * Set a cluster config minus state model, participants, and resources
+ * @param config ClusterConfig
+ * @return true if correctly set, false otherwise
+ */
+ private boolean setBasicClusterConfig(ClusterConfig config) {
+ if (config == null) {
+ return false;
+ }
+ ClusterConfiguration configuration = ClusterConfiguration.from(config.getUserConfig());
+ _accessor.setProperty(_keyBuilder.clusterConfig(), configuration);
+ Map<ConstraintType, ClusterConstraints> constraints = config.getConstraintMap();
+ for (ConstraintType type : constraints.keySet()) {
+ ClusterConstraints constraint = constraints.get(type);
+ _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
+ }
+ return true;
+ }
+
+ /**
+ * drop a cluster
+ * @return true if the cluster was dropped, false if there was an error
+ */
+ public boolean dropCluster() {
+ LOG.info("Dropping cluster: " + _clusterId);
+ List<String> liveInstanceNames = _accessor.getChildNames(_keyBuilder.liveInstances());
+ if (liveInstanceNames.size() > 0) {
+ LOG.error("Can't drop cluster: " + _clusterId + " because there are running participant: "
+ + liveInstanceNames + ", shutdown participants first.");
+ return false;
+ }
+
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+ if (leader != null) {
+ LOG.error("Can't drop cluster: " + _clusterId + ", because leader: " + leader.getId()
+ + " are running, shutdown leader first.");
+ return false;
+ }
+
+ return _accessor.removeProperty(_keyBuilder.cluster());
+ }
+
+ /**
+ * read entire cluster data
+ * @return cluster snapshot
+ */
+ public Cluster readCluster() {
+ /**
+ * map of instance-id to instance-config
+ */
+ Map<String, InstanceConfig> instanceConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.instanceConfigs());
+
+ /**
+ * map of resource-id to ideal-state
+ */
+ Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
+
+ /**
+ * map of instance-id to live-instance
+ */
+ Map<String, LiveInstance> liveInstanceMap =
+ _accessor.getChildValuesMap(_keyBuilder.liveInstances());
+
+ /**
+ * map of participant-id to map of message-id to message
+ */
+ Map<String, Map<String, Message>> messageMap = new HashMap<String, Map<String, Message>>();
+ for (String instanceName : liveInstanceMap.keySet()) {
+ Map<String, Message> instanceMsgMap =
+ _accessor.getChildValuesMap(_keyBuilder.messages(instanceName));
+ messageMap.put(instanceName, instanceMsgMap);
+ }
+
+ /**
+ * map of participant-id to map of resource-id to current-state
+ */
+ Map<String, Map<String, CurrentState>> currentStateMap =
+ new HashMap<String, Map<String, CurrentState>>();
+ for (String participantName : liveInstanceMap.keySet()) {
+ LiveInstance liveInstance = liveInstanceMap.get(participantName);
+ SessionId sessionId = liveInstance.getSessionId();
+ Map<String, CurrentState> instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+
+ currentStateMap.put(participantName, instanceCurStateMap);
+ }
+
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+
+ /**
+ * map of constraint-type to constraints
+ */
+ Map<String, ClusterConstraints> constraintMap =
+ _accessor.getChildValuesMap(_keyBuilder.constraints());
+
+ /**
+ * Map of resource id to external view
+ */
+ Map<String, ExternalView> externalViewMap =
+ _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
+ /**
+ * Map of resource id to user configuration
+ */
+ Map<String, ResourceConfiguration> resourceConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+
+ /**
+ * Map of resource id to resource assignment
+ */
+ Map<String, ResourceAssignment> resourceAssignmentMap =
+ _accessor.getChildValuesMap(_keyBuilder.resourceAssignments());
+
+ // read all the resources
+ Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
+ for (String resourceName : idealStateMap.keySet()) {
+ ResourceId resourceId = ResourceId.from(resourceName);
+ resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
+ resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
+ externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
+ }
+
+ // read all the participants
+ Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
+ for (String participantName : instanceConfigMap.keySet()) {
+ InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
+ UserConfig userConfig = UserConfig.from(instanceConfig);
+ LiveInstance liveInstance = liveInstanceMap.get(participantName);
+ Map<String, Message> instanceMsgMap = messageMap.get(participantName);
+
+ ParticipantId participantId = ParticipantId.from(participantName);
+
+ participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
+ instanceConfig, userConfig, liveInstance, instanceMsgMap,
+ currentStateMap.get(participantName)));
+ }
+
+ // read the controllers
+ Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
+ ControllerId leaderId = null;
+ if (leader != null) {
+ leaderId = ControllerId.from(leader.getId());
+ controllerMap.put(leaderId, new Controller(leaderId, leader, true));
+ }
+
+ // read the constraints
+ Map<ConstraintType, ClusterConstraints> clusterConstraintMap =
+ new HashMap<ConstraintType, ClusterConstraints>();
+ for (String constraintType : constraintMap.keySet()) {
+ clusterConstraintMap.put(ConstraintType.valueOf(constraintType),
+ constraintMap.get(constraintType));
+ }
+
+ // read the pause status
+ PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
+ boolean isPaused = pauseSignal != null;
+
+ ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ UserConfig userConfig;
+ if (clusterUserConfig != null) {
+ userConfig = UserConfig.from(clusterUserConfig);
+ } else {
+ userConfig = new UserConfig(Scope.cluster(_clusterId));
+ }
+
+ // read the state model definitions
+ StateModelDefinitionAccessor stateModelDefAccessor =
+ new StateModelDefinitionAccessor(_accessor);
+ Map<StateModelDefId, StateModelDefinition> stateModelMap =
+ stateModelDefAccessor.readStateModelDefinitions();
+
+ // create the cluster snapshot object
+ return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
+ clusterConstraintMap, stateModelMap, userConfig, isPaused);
+ }
+
+ /**
+ * pause controller of cluster
+ */
+ public void pauseCluster() {
+ _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
+ }
+
+ /**
+ * resume controller of cluster
+ */
+ public void resumeCluster() {
+ _accessor.removeProperty(_keyBuilder.pause());
+ }
+
+ /**
+ * add a resource to cluster
+ * @param resource
+ * @return true if resource added, false if there was an error
+ */
+ public boolean addResourceToCluster(ResourceConfig resource) {
+ if (resource == null || resource.getRebalancerConfig() == null) {
+ LOG.error("Resource not fully defined with a rebalancer context");
+ return false;
+ }
+
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+ RebalancerContext context =
+ resource.getRebalancerConfig().getRebalancerContext(RebalancerContext.class);
+ StateModelDefId stateModelDefId = context.getStateModelDefId();
+ if (_accessor.getProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify())) == null) {
+ LOG.error("State model: " + stateModelDefId + " not found in cluster: " + _clusterId);
+ return false;
+ }
+
+ ResourceId resourceId = resource.getId();
+ if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) != null) {
+ LOG.error("Skip adding resource: " + resourceId
+ + ", because resource ideal state already exists in cluster: " + _clusterId);
+ return false;
+ }
+ if (_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())) != null) {
+ LOG.error("Skip adding resource: " + resourceId
+ + ", because resource config already exists in cluster: " + _clusterId);
+ return false;
+ }
+
+ // Add resource user config
+ if (resource.getUserConfig() != null) {
+ ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
+ configuration.setType(resource.getType());
+ configuration.addNamespacedConfig(resource.getUserConfig());
+ configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
+ configuration.setBucketSize(resource.getBucketSize());
+ configuration.setBatchMessageMode(resource.getBatchMessageMode());
+ _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ }
+
+ // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ IdealState idealState =
+ ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
+ resource.getBatchMessageMode());
+ if (idealState != null) {
+ _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+ }
+ return true;
+ }
+
+ /**
+ * drop a resource from cluster
+ * @param resourceId
+ * @return true if removal succeeded, false otherwise
+ */
+ public boolean dropResourceFromCluster(ResourceId resourceId) {
+ if (_accessor.getProperty(_keyBuilder.idealState(resourceId.stringify())) == null) {
+ LOG.error("Skip removing resource: " + resourceId
+ + ", because resource ideal state already removed from cluster: " + _clusterId);
+ return false;
+ }
+ _accessor.removeProperty(_keyBuilder.idealState(resourceId.stringify()));
+ _accessor.removeProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ return true;
+ }
+
+ /**
+ * check if cluster structure is valid
+ * @return true if valid or false otherwise
+ */
+ public boolean isClusterStructureValid() {
+ List<String> paths = getRequiredPaths();
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ if (baseAccessor != null) {
+ boolean[] existsResults = baseAccessor.exists(paths, 0);
+ for (boolean exists : existsResults) {
+ if (!exists) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Create empty persistent properties to ensure that there is a valid cluster structure
+ */
+ private void initClusterStructure() {
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ List<String> paths = getRequiredPaths();
+ for (String path : paths) {
+ boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+ if (!status && LOG.isDebugEnabled()) {
+ LOG.debug(path + " already exists");
+ }
+ }
+ }
+
+ /**
+ * Get all property paths that must be set for a cluster structure to be valid
+ * @return list of paths as strings
+ */
+ private List<String> getRequiredPaths() {
+ List<String> paths = new ArrayList<String>();
+ paths.add(_keyBuilder.cluster().getPath());
+ paths.add(_keyBuilder.idealStates().getPath());
+ paths.add(_keyBuilder.clusterConfigs().getPath());
+ paths.add(_keyBuilder.instanceConfigs().getPath());
+ paths.add(_keyBuilder.resourceConfigs().getPath());
+ paths.add(_keyBuilder.propertyStore().getPath());
+ paths.add(_keyBuilder.liveInstances().getPath());
+ paths.add(_keyBuilder.instances().getPath());
+ paths.add(_keyBuilder.externalViews().getPath());
+ paths.add(_keyBuilder.controller().getPath());
+ paths.add(_keyBuilder.stateModelDefs().getPath());
+ paths.add(_keyBuilder.controllerMessages().getPath());
+ paths.add(_keyBuilder.controllerTaskErrors().getPath());
+ paths.add(_keyBuilder.controllerTaskStatuses().getPath());
+ paths.add(_keyBuilder.controllerLeaderHistory().getPath());
+ return paths;
+ }
+
+ /**
+ * add a participant to cluster
+ * @param participant
+ * @return true if participant added, false otherwise
+ */
+ public boolean addParticipantToCluster(ParticipantConfig participant) {
+ if (participant == null) {
+ LOG.error("Participant not initialized");
+ return false;
+ }
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+
+ ParticipantId participantId = participant.getId();
+ if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
+ LOG.error("Config for participant: " + participantId + " already exists in cluster: "
+ + _clusterId);
+ return false;
+ }
+
+ // add empty root ZNodes
+ List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
+ createKeys.add(_keyBuilder.messages(participantId.stringify()));
+ createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
+ createKeys.add(_keyBuilder.participantErrors(participantId.stringify()));
+ createKeys.add(_keyBuilder.statusUpdates(participantId.stringify()));
+ for (PropertyKey key : createKeys) {
+ _accessor.createProperty(key, null);
+ }
+
+ // add the config
+ InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
+ instanceConfig.setHostName(participant.getHostName());
+ instanceConfig.setPort(Integer.toString(participant.getPort()));
+ instanceConfig.setInstanceEnabled(participant.isEnabled());
+ UserConfig userConfig = participant.getUserConfig();
+ instanceConfig.addNamespacedConfig(userConfig);
+ Set<String> tags = participant.getTags();
+ for (String tag : tags) {
+ instanceConfig.addTag(tag);
+ }
+ Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
+ for (PartitionId partitionId : disabledPartitions) {
+ instanceConfig.setInstanceEnabledForPartition(partitionId, false);
+ }
+ _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
+ _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+ return true;
+ }
+
+ /**
+ * drop a participant from cluster
+ * @param participantId
+ * @return true if participant dropped, false if there was an error
+ */
+ public boolean dropParticipantFromCluster(ParticipantId participantId) {
+ if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+ + _clusterId);
+ return false;
+ }
+
+ if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+ LOG.error("Participant: " + participantId + " structure does NOT exist in cluster: "
+ + _clusterId);
+ return false;
+ }
+
+ // delete participant config path
+ _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+ // delete participant path
+ _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+ return true;
+ }
+
+ /**
+ * Add a state model definition. Updates the existing state model definition if it already exists.
+ * @param stateModelDef fully initialized state model definition
+ * @return true if the model is persisted, false otherwise
+ */
+ public boolean addStateModelDefinitionToCluster(StateModelDefinition stateModelDef) {
+ if (!isClusterStructureValid()) {
+ LOG.error("Cluster: " + _clusterId + " structure is not valid");
+ return false;
+ }
+
+ StateModelDefinitionAccessor smdAccessor = new StateModelDefinitionAccessor(_accessor);
+ return smdAccessor.setStateModelDefinition(stateModelDef);
+ }
+
+ /**
+ * Remove a state model definition if it exists
+ * @param stateModelDefId state model definition id
+ * @return true if removed, false if it did not exist
+ */
+ public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
+ return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
new file mode 100644
index 0000000..609e458
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
@@ -0,0 +1,49 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Controller;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.model.LiveInstance;
+
+public class ControllerAccessor {
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+
+ public ControllerAccessor(HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ }
+
+ /**
+ * Read the leader controller if it is live
+ * @return Controller snapshot, or null
+ */
+ public Controller readLeader() {
+ LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+ if (leader != null) {
+ ControllerId leaderId = ControllerId.from(leader.getId());
+ return new Controller(leaderId, leader, true);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
new file mode 100644
index 0000000..c1a9250
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -0,0 +1,435 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.RunningInstance;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+public class ParticipantAccessor {
+ private static final Logger LOG = Logger.getLogger(ParticipantAccessor.class);
+
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+ private final ClusterId _clusterId;
+
+ public ParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ _clusterId = clusterId;
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ }
+
+ /**
+ * enable/disable a participant
+ * @param participantId
+ * @param isEnabled
+ */
+ void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+ String participantName = participantId.stringify();
+ if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+ + _clusterId);
+ return;
+ }
+
+ InstanceConfig config = new InstanceConfig(participantName);
+ config.setInstanceEnabled(isEnabled);
+ _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
+
+ }
+
+ /**
+ * disable participant
+ * @param participantId
+ */
+ public void disableParticipant(ParticipantId participantId) {
+ enableParticipant(participantId, false);
+ }
+
+ /**
+ * enable participant
+ * @param participantId
+ */
+ public void enableParticipant(ParticipantId participantId) {
+ enableParticipant(participantId, true);
+ }
+
+ /**
+ * create messages for participant
+ * @param participantId
+ * @param msgMap map of message-id to message
+ */
+ public void insertMessagesToParticipant(ParticipantId participantId,
+ Map<MessageId, Message> msgMap) {
+ List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+ List<Message> msgs = new ArrayList<Message>();
+ for (MessageId msgId : msgMap.keySet()) {
+ msgKeys.add(_keyBuilder.message(participantId.stringify(), msgId.stringify()));
+ msgs.add(msgMap.get(msgId));
+ }
+
+ _accessor.createChildren(msgKeys, msgs);
+ }
+
+ /**
+ * set messages of participant
+ * @param participantId
+ * @param msgMap map of message-id to message
+ */
+ public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+ String participantName = participantId.stringify();
+ List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+ List<Message> msgs = new ArrayList<Message>();
+ for (MessageId msgId : msgMap.keySet()) {
+ msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+ msgs.add(msgMap.get(msgId));
+ }
+ _accessor.setChildren(msgKeys, msgs);
+ }
+
+ /**
+ * delete messages from participant
+ * @param participantId
+ * @param msgIdSet
+ */
+ public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+ String participantName = participantId.stringify();
+ List<PropertyKey> msgKeys = new ArrayList<PropertyKey>();
+ for (MessageId msgId : msgIdSet) {
+ msgKeys.add(_keyBuilder.message(participantName, msgId.stringify()));
+ }
+
+ // TODO impl batch remove
+ for (PropertyKey msgKey : msgKeys) {
+ _accessor.removeProperty(msgKey);
+ }
+ }
+
+ /**
+ * enable/disable partitions on a participant
+ * @param enabled
+ * @param participantId
+ * @param resourceId
+ * @param partitionIdSet
+ */
+ void enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+ final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
+ String participantName = participantId.stringify();
+ String resourceName = resourceId.stringify();
+
+ // check instanceConfig exists
+ PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
+ if (_accessor.getProperty(instanceConfigKey) == null) {
+ LOG.error("Config for participant: " + participantId + " does NOT exist in cluster: "
+ + _clusterId);
+ return;
+ }
+
+ // check resource exist. warn if not
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
+ if (idealState == null) {
+ LOG.warn("Disable partitions: " + partitionIdSet + " but Cluster: " + _clusterId
+ + ", resource: " + resourceId
+ + " does NOT exists. probably disable it during ERROR->DROPPED transtition");
+
+ } else {
+ // check partitions exist. warn if not
+ for (PartitionId partitionId : partitionIdSet) {
+ if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState
+ .getPreferenceList(partitionId) == null)
+ || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState
+ .getParticipantStateMap(partitionId) == null)) {
+ LOG.warn("Cluster: " + _clusterId + ", resource: " + resourceId + ", partition: "
+ + partitionId + ", partition does NOT exist in ideal state");
+ }
+ }
+ }
+
+ // TODO merge list logic should go to znrecord updater
+ // update participantConfig
+ // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ final List<String> partitionNames = new ArrayList<String>();
+ for (PartitionId partitionId : partitionIdSet) {
+ partitionNames.add(partitionId.stringify());
+ }
+
+ baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + _clusterId + ", instance: " + participantId
+ + ", participant config is null");
+ }
+
+ // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
+ List<String> list =
+ currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ Set<String> disabledPartitions = new HashSet<String>();
+ if (list != null) {
+ disabledPartitions.addAll(list);
+ }
+
+ if (enabled) {
+ disabledPartitions.removeAll(partitionNames);
+ } else {
+ disabledPartitions.addAll(partitionNames);
+ }
+
+ list = new ArrayList<String>(disabledPartitions);
+ Collections.sort(list);
+ currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+ return currentData;
+ }
+ }, AccessOption.PERSISTENT);
+ }
+
+ /**
+ * disable partitions on a participant
+ * @param participantId
+ * @param resourceId
+ * @param disablePartitionIdSet
+ */
+ public void disablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ Set<PartitionId> disablePartitionIdSet) {
+ enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+ }
+
+ /**
+ * enable partitions on a participant
+ * @param participantId
+ * @param resourceId
+ * @param enablePartitionIdSet
+ */
+ public void enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ Set<PartitionId> enablePartitionIdSet) {
+ enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
+ }
+
+ /**
+ * reset partitions on a participant
+ * @param participantId
+ * @param resourceId
+ * @param resetPartitionIdSet
+ */
+ public void resetPartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ Set<PartitionId> resetPartitionIdSet) {
+ // TODO impl this
+ }
+
+ /**
+ * Update a participant configuration
+ * @param participantId the participant to update
+ * @param participantDelta changes to the participant
+ * @return ParticipantConfig, or null if participant is not persisted
+ */
+ public ParticipantConfig updateParticipant(ParticipantId participantId,
+ ParticipantConfig.Delta participantDelta) {
+ Participant participant = readParticipant(participantId);
+ if (participant == null) {
+ LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+ return null;
+ }
+ ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+ setParticipant(config);
+ return config;
+ }
+
+ /**
+ * Set the configuration of an existing participant
+ * @param participantConfig participant configuration
+ * @return true if config was set, false if there was an error
+ */
+ public boolean setParticipant(ParticipantConfig participantConfig) {
+ if (participantConfig == null) {
+ LOG.error("Participant config not initialized");
+ return false;
+ }
+ InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId());
+ instanceConfig.setHostName(participantConfig.getHostName());
+ instanceConfig.setPort(Integer.toString(participantConfig.getPort()));
+ for (String tag : participantConfig.getTags()) {
+ instanceConfig.addTag(tag);
+ }
+ for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
+ instanceConfig.setInstanceEnabledForPartition(partitionId, false);
+ }
+ instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
+ instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
+ _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()),
+ instanceConfig);
+ return true;
+ }
+
+ /**
+ * create a participant based on physical model
+ * @param participantId
+ * @param instanceConfig
+ * @param userConfig
+ * @param liveInstance
+ * @param instanceMsgMap map of message-id to message
+ * @param instanceCurStateMap map of resource-id to current-state
+ * @return participant
+ */
+ static Participant createParticipant(ParticipantId participantId, InstanceConfig instanceConfig,
+ UserConfig userConfig, LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
+ Map<String, CurrentState> instanceCurStateMap) {
+
+ String hostName = instanceConfig.getHostName();
+
+ int port = -1;
+ try {
+ port = Integer.parseInt(instanceConfig.getPort());
+ } catch (IllegalArgumentException e) {
+ // keep as -1
+ }
+ if (port < 0 || port > 65535) {
+ port = -1;
+ }
+ boolean isEnabled = instanceConfig.getInstanceEnabled();
+
+ List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
+ Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
+ if (disabledPartitions != null) {
+ disabledPartitionIdSet = new HashSet<PartitionId>();
+ for (String partitionId : disabledPartitions) {
+ disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
+ PartitionId.stripResourceId(partitionId)));
+ }
+ }
+
+ Set<String> tags = new HashSet<String>(instanceConfig.getTags());
+
+ RunningInstance runningInstance = null;
+ if (liveInstance != null) {
+ runningInstance =
+ new RunningInstance(liveInstance.getSessionId(), liveInstance.getHelixVersion(),
+ liveInstance.getProcessId());
+ }
+
+ Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+ if (instanceMsgMap != null) {
+ for (String msgId : instanceMsgMap.keySet()) {
+ Message message = instanceMsgMap.get(msgId);
+ msgMap.put(MessageId.from(msgId), message);
+ }
+ }
+
+ Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
+ if (instanceCurStateMap != null) {
+
+ for (String resourceName : instanceCurStateMap.keySet()) {
+ curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName));
+ }
+ }
+
+ return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+ runningInstance, curStateMap, msgMap, userConfig);
+ }
+
+ /**
+ * read participant related data
+ * @param participantId
+ * @return participant, or null if participant not available
+ */
+ public Participant readParticipant(ParticipantId participantId) {
+ // read physical model
+ String participantName = participantId.stringify();
+ InstanceConfig instanceConfig = _accessor.getProperty(_keyBuilder.instance(participantName));
+
+ if (instanceConfig == null) {
+ LOG.error("Participant " + participantId + " is not present on the cluster");
+ return null;
+ }
+
+ UserConfig userConfig = UserConfig.from(instanceConfig);
+ LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
+
+ Map<String, Message> instanceMsgMap = Collections.emptyMap();
+ Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
+ if (liveInstance != null) {
+ SessionId sessionId = liveInstance.getSessionId();
+
+ instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+ instanceCurStateMap =
+ _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+ sessionId.stringify()));
+ }
+
+ return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+ instanceMsgMap, instanceCurStateMap);
+ }
+
+ /**
+ * update resource current state of a participant
+ * @param resourceId resource id
+ * @param participantId participant id
+ * @param sessionId session id
+ * @param curStateUpdate current state change delta
+ */
+ public void updateCurrentState(ResourceId resourceId, ParticipantId participantId,
+ SessionId sessionId, CurrentState curStateUpdate) {
+ _accessor.updateProperty(
+ _keyBuilder.currentState(participantId.stringify(), sessionId.stringify(),
+ resourceId.stringify()), curStateUpdate);
+ }
+
+ /**
+ * drop resource current state of a participant
+ * @param resourceId resource id
+ * @param participantId participant id
+ * @param sessionId session id
+ */
+ public void dropCurrentState(ResourceId resourceId, ParticipantId participantId,
+ SessionId sessionId) {
+ _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+ sessionId.stringify(), resourceId.stringify()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
new file mode 100644
index 0000000..cd55684
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -0,0 +1,265 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixConstants.StateModelToken;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.ResourceConfig.ResourceType;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.CustomRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfiguration;
+import org.apache.log4j.Logger;
+
+public class ResourceAccessor {
+ private static final Logger LOG = Logger.getLogger(ResourceAccessor.class);
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+
+ public ResourceAccessor(HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ }
+
+ /**
+ * Read a single snapshot of a resource
+ * @param resourceId the resource id to read
+ * @return Resource or null if not present
+ */
+ public Resource readResource(ResourceId resourceId) {
+ ResourceConfiguration config =
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceId.stringify()));
+
+ if (config == null && idealState == null) {
+ LOG.error("Resource " + resourceId + " not present on the cluster");
+ return null;
+ }
+
+ ExternalView externalView =
+ _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+ ResourceAssignment resourceAssignment =
+ _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+ }
+
+ /**
+ * Update a resource configuration
+ * @param resourceId the resource id to update
+ * @param resourceDelta changes to the resource
+ * @return ResourceConfig, or null if the resource is not persisted
+ */
+ public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+ Resource resource = readResource(resourceId);
+ if (resource == null) {
+ LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+ return null;
+ }
+ ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+ setResource(config);
+ return config;
+ }
+
+ /**
+ * save resource assignment
+ * @param resourceId
+ * @param resourceAssignment
+ */
+ public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+ resourceAssignment);
+ }
+
+ /**
+ * get resource assignment
+ * @param resourceId
+ * @return resource assignment or null
+ */
+ public ResourceAssignment getResourceAssignment(ResourceId resourceId) {
+ return _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+ }
+
+ /**
+ * Set a physical resource configuration, which may include user-defined configuration, as well as
+ * rebalancer configuration
+ * @param resourceId
+ * @param configuration
+ */
+ void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ // also set an ideal state if the resource supports it
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
+ IdealState idealState =
+ rebalancerConfigToIdealState(rebalancerConfig, configuration.getBucketSize(),
+ configuration.getBatchMessageMode());
+ if (idealState != null) {
+ _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+ }
+ }
+
+ /**
+ * Persist an existing resource's logical configuration
+ * @param resourceConfig logical resource configuration
+ * @return true if resource is set, false otherwise
+ */
+ public boolean setResource(ResourceConfig resourceConfig) {
+ if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) {
+ LOG.error("Resource not fully defined with a rebalancer context");
+ return false;
+ }
+ ResourceId resourceId = resourceConfig.getId();
+ ResourceConfiguration config = new ResourceConfiguration(resourceId);
+ config.addNamespacedConfig(resourceConfig.getUserConfig());
+ config.addNamespacedConfig(resourceConfig.getRebalancerConfig().toNamespacedConfig());
+ config.setBucketSize(resourceConfig.getBucketSize());
+ config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+ setConfiguration(resourceId, config);
+ return true;
+ }
+
+ /**
+ * Get a resource configuration, which may include user-defined configuration, as well as
+ * rebalancer configuration
+ * @param resourceId
+ * @return configuration
+ */
+ public void getConfiguration(ResourceId resourceId) {
+ _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ }
+
+ /**
+ * set external view of a resource
+ * @param resourceId
+ * @param extView
+ */
+ public void setExternalView(ResourceId resourceId, ExternalView extView) {
+ _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
+ }
+
+ /**
+ * drop external view of a resource
+ * @param resourceId
+ */
+ public void dropExternalView(ResourceId resourceId) {
+ _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
+ }
+
+ /**
+ * Get an ideal state from a rebalancer config if the resource is partitioned
+ * @param config RebalancerConfig instance
+ * @param bucketSize bucket size to use
+ * @param batchMessageMode true if batch messaging allowed, false otherwise
+ * @return IdealState, or null
+ */
+ static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize,
+ boolean batchMessageMode) {
+ PartitionedRebalancerContext partitionedContext =
+ config.getRebalancerContext(PartitionedRebalancerContext.class);
+ if (partitionedContext != null) {
+ IdealState idealState = new IdealState(partitionedContext.getResourceId());
+ idealState.setRebalanceMode(partitionedContext.getRebalanceMode());
+ idealState.setRebalancerRef(partitionedContext.getRebalancerRef());
+ String replicas = null;
+ if (partitionedContext.anyLiveParticipant()) {
+ replicas = StateModelToken.ANY_LIVEINSTANCE.toString();
+ } else {
+ replicas = Integer.toString(partitionedContext.getReplicaCount());
+ }
+ idealState.setReplicas(replicas);
+ idealState.setNumPartitions(partitionedContext.getPartitionSet().size());
+ idealState.setInstanceGroupTag(partitionedContext.getParticipantGroupTag());
+ idealState.setMaxPartitionsPerInstance(partitionedContext.getMaxPartitionsPerParticipant());
+ idealState.setStateModelDefId(partitionedContext.getStateModelDefId());
+ idealState.setStateModelFactoryId(partitionedContext.getStateModelFactoryId());
+ idealState.setBucketSize(bucketSize);
+ idealState.setBatchMessageMode(batchMessageMode);
+ if (partitionedContext.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
+ SemiAutoRebalancerContext semiAutoContext =
+ config.getRebalancerContext(SemiAutoRebalancerContext.class);
+ for (PartitionId partitionId : semiAutoContext.getPartitionSet()) {
+ idealState.setPreferenceList(partitionId, semiAutoContext.getPreferenceList(partitionId));
+ }
+ } else if (partitionedContext.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
+ CustomRebalancerContext customContext =
+ config.getRebalancerContext(CustomRebalancerContext.class);
+ for (PartitionId partitionId : customContext.getPartitionSet()) {
+ idealState.setParticipantStateMap(partitionId,
+ customContext.getPreferenceMap(partitionId));
+ }
+ }
+ return idealState;
+ }
+ return null;
+ }
+
+ /**
+ * Create a resource snapshot instance from the physical model
+ * @param resourceId the resource id
+ * @param resourceConfiguration physical resource configuration
+ * @param idealState ideal state of the resource
+ * @param externalView external view of the resource
+ * @param resourceAssignment current resource assignment
+ * @return Resource
+ */
+ static Resource createResource(ResourceId resourceId,
+ ResourceConfiguration resourceConfiguration, IdealState idealState,
+ ExternalView externalView, ResourceAssignment resourceAssignment) {
+ UserConfig userConfig;
+ ResourceType type = ResourceType.DATA;
+ if (resourceConfiguration != null) {
+ userConfig = UserConfig.from(resourceConfiguration);
+ type = resourceConfiguration.getType();
+ } else {
+ userConfig = new UserConfig(Scope.resource(resourceId));
+ }
+ int bucketSize = 0;
+ boolean batchMessageMode = false;
+ RebalancerContext rebalancerContext;
+ if (idealState != null) {
+ rebalancerContext = PartitionedRebalancerContext.from(idealState);
+ bucketSize = idealState.getBucketSize();
+ batchMessageMode = idealState.getBatchMessageMode();
+ } else {
+ if (resourceConfiguration != null) {
+ bucketSize = resourceConfiguration.getBucketSize();
+ batchMessageMode = resourceConfiguration.getBatchMessageMode();
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(resourceConfiguration);
+ rebalancerContext = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ } else {
+ rebalancerContext = new PartitionedRebalancerContext(RebalanceMode.NONE);
+ }
+ }
+ return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
+ rebalancerContext, userConfig, bucketSize, batchMessageMode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
new file mode 100644
index 0000000..3816507
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/StateModelDefinitionAccessor.java
@@ -0,0 +1,70 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.collect.ImmutableMap;
+
+public class StateModelDefinitionAccessor {
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+
+ /**
+ * @param accessor
+ */
+ public StateModelDefinitionAccessor(HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ }
+
+ /**
+ * Get all of the state model definitions available to the cluster
+ * @return map of state model ids to state model definition objects
+ */
+ public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+ Map<String, StateModelDefinition> stateModelDefs =
+ _accessor.getChildValuesMap(_keyBuilder.stateModelDefs());
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+
+ for (String stateModelDefName : stateModelDefs.keySet()) {
+ stateModelDefMap.put(StateModelDefId.from(stateModelDefName),
+ stateModelDefs.get(stateModelDefName));
+ }
+
+ return ImmutableMap.copyOf(stateModelDefMap);
+ }
+
+ /**
+ * Set a state model definition. Adds the state model definition if it does not exist
+ * @param stateModelDef fully initialized state model definition
+ * @return true if the model is persisted, false otherwise
+ */
+ public boolean setStateModelDefinition(StateModelDefinition stateModelDef) {
+ return _accessor.setProperty(_keyBuilder.stateModelDef(stateModelDef.getId()), stateModelDef);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/c070a765/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
new file mode 100644
index 0000000..79b4f61
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -0,0 +1,696 @@
+package org.apache.helix.api.config;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ConstraintId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.Transition;
+import org.apache.helix.model.builder.ConstraintItemBuilder;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Configuration properties of a cluster
+ */
+public class ClusterConfig {
+ private static final Logger LOG = Logger.getLogger(ClusterConfig.class);
+
+ private final ClusterId _id;
+ private final Map<ResourceId, ResourceConfig> _resourceMap;
+ private final Map<ParticipantId, ParticipantConfig> _participantMap;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
+ private final UserConfig _userConfig;
+ private final boolean _isPaused;
+
+ /**
+ * Initialize a cluster configuration. Also see ClusterConfig.Builder
+ * @param id cluster id
+ * @param resourceMap map of resource id to resource config
+ * @param participantMap map of participant id to participant config
+ * @param constraintMap map of constraint type to all constraints of that type
+ * @param stateModelMap map of state model id to state model definition
+ * @param userConfig user-defined cluster properties
+ * @param isPaused true if paused, false if active
+ */
+ private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
+ Map<ParticipantId, ParticipantConfig> participantMap,
+ Map<ConstraintType, ClusterConstraints> constraintMap,
+ Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+ boolean isPaused) {
+ _id = id;
+ _resourceMap = ImmutableMap.copyOf(resourceMap);
+ _participantMap = ImmutableMap.copyOf(participantMap);
+ _constraintMap = ImmutableMap.copyOf(constraintMap);
+ _stateModelMap = ImmutableMap.copyOf(stateModelMap);
+ _userConfig = userConfig;
+ _isPaused = isPaused;
+ }
+
+ /**
+ * Get cluster id
+ * @return cluster id
+ */
+ public ClusterId getId() {
+ return _id;
+ }
+
+ /**
+ * Get resources in the cluster
+ * @return a map of resource id to resource, or empty map if none
+ */
+ public Map<ResourceId, ResourceConfig> getResourceMap() {
+ return _resourceMap;
+ }
+
+ /**
+ * Get all the constraints on the cluster
+ * @return map of constraint type to constraints
+ */
+ public Map<ConstraintType, ClusterConstraints> getConstraintMap() {
+ return _constraintMap;
+ }
+
+ /**
+ * Get the maximum number of participants that can be in a state
+ * @param scope the scope for the bound
+ * @param stateModelDefId the state model of the state
+ * @param state the constrained state
+ * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
+ * number of replicas, or "N" for number of participants
+ */
+ public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state) {
+ // set up attributes to match based on the scope
+ ClusterConstraints stateConstraints = getConstraintMap().get(ConstraintType.STATE_CONSTRAINT);
+ Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+ matchAttributes.put(ConstraintAttribute.STATE, state.toString());
+ matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for state constraint: " + scope);
+ return "-1";
+ }
+ Set<ConstraintItem> matches = stateConstraints.match(matchAttributes);
+ int value = -1;
+ for (ConstraintItem item : matches) {
+ // match: if an R or N is found, always choose that one
+ // otherwise, take the minimum of the counts specified in the constraints
+ String constraintValue = item.getConstraintValue();
+ if (constraintValue != null) {
+ if (constraintValue.equals(ConstraintValue.N.toString())
+ || constraintValue.equals(ConstraintValue.R.toString())) {
+ return constraintValue;
+ } else {
+ try {
+ int current = Integer.parseInt(constraintValue);
+ if (value == -1 || current < value) {
+ value = current;
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid state upper bound: " + constraintValue);
+ }
+ }
+ }
+ }
+ return Integer.toString(value);
+ }
+
+ /**
+ * Get the limit of simultaneous execution of a transition
+ * @param scope the scope under which the transition is constrained
+ * @param stateModelDefId the state model of which the transition is a part
+ * @param transition the constrained transition
+ * @return the limit, or Integer.MAX_VALUE if there is no limit
+ */
+ public int getTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition) {
+ // set up attributes to match based on the scope
+ ClusterConstraints transitionConstraints =
+ getConstraintMap().get(ConstraintType.MESSAGE_CONSTRAINT);
+ Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
+ matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
+ matchAttributes.put(ConstraintAttribute.MESSAGE_TYPE, MessageType.STATE_TRANSITION.toString());
+ matchAttributes.put(ConstraintAttribute.TRANSITION, transition.toString());
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
+ break;
+ case PARTICIPANT:
+ matchAttributes.put(ConstraintAttribute.INSTANCE, scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for transition constraints: " + scope);
+ return Integer.MAX_VALUE;
+ }
+ Set<ConstraintItem> matches = transitionConstraints.match(matchAttributes);
+ int value = Integer.MAX_VALUE;
+ for (ConstraintItem item : matches) {
+ String constraintValue = item.getConstraintValue();
+ if (constraintValue != null) {
+ try {
+ int current = Integer.parseInt(constraintValue);
+ if (current < value) {
+ value = current;
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid in-flight transition cap: " + constraintValue);
+ }
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Get participants of the cluster
+ * @return a map of participant id to participant, or empty map if none
+ */
+ public Map<ParticipantId, ParticipantConfig> getParticipantMap() {
+ return _participantMap;
+ }
+
+ /**
+ * Get all the state model definitions on the cluster
+ * @return map of state model definition id to state model definition
+ */
+ public Map<StateModelDefId, StateModelDefinition> getStateModelMap() {
+ return _stateModelMap;
+ }
+
+ /**
+ * Get user-specified configuration properties of this cluster
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _userConfig;
+ }
+
+ /**
+ * Check the paused status of the cluster
+ * @return true if paused, false otherwise
+ */
+ public boolean isPaused() {
+ return _isPaused;
+ }
+
+ /**
+ * Update context for a ClusterConfig
+ */
+ public static class Delta {
+ private enum Fields {
+ USER_CONFIG
+ }
+
+ private Set<Fields> _updateFields;
+ private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
+ private Builder _builder;
+
+ /**
+ * Instantiate the delta for a cluster config
+ * @param clusterId the cluster to update
+ */
+ public Delta(ClusterId clusterId) {
+ _updateFields = Sets.newHashSet();
+ _removedConstraints = Maps.newHashMap();
+ for (ConstraintType type : ConstraintType.values()) {
+ Set<ConstraintId> constraints = Sets.newHashSet();
+ _removedConstraints.put(type, constraints);
+ }
+ _builder = new Builder(clusterId);
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param upperBound maximum number of replicas per partition in the state
+ * @return Delta
+ */
+ public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, int upperBound) {
+ return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+ Integer.toString(upperBound));
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+ * number, or the currently supported special bound values:<br />
+ * "R" - Refers to the number of replicas specified during resource
+ * creation. This allows having different replication factor for each
+ * resource without having to create a different state machine. <br />
+ * "N" - Refers to all nodes in the cluster. Useful for resources that need
+ * to exist on all nodes. This way one can add/remove nodes without having
+ * the change the bounds.
+ * @return Delta
+ */
+ public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, String dynamicUpperBound) {
+ _builder.addStateUpperBoundConstraint(scope, stateModelDefId, state, dynamicUpperBound);
+ return this;
+ }
+
+ /**
+ * Remove state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @return Delta
+ */
+ public Delta removeStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state) {
+ _removedConstraints.get(ConstraintType.STATE_CONSTRAINT).add(
+ ConstraintId.from(scope, stateModelDefId, state));
+ return this;
+ }
+
+ /**
+ * Add a constraint on the maximum number of in-flight transitions of a certain type
+ * @param scope scope of the constraint
+ * @param stateModelDefId identifies the state model containing the transition
+ * @param transition the transition to constrain
+ * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+ * @return Delta
+ */
+ public Delta addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition, int maxInFlightTransitions) {
+ _builder.addTransitionConstraint(scope, stateModelDefId, transition, maxInFlightTransitions);
+ return this;
+ }
+
+ /**
+ * Remove a constraint on the maximum number of in-flight transitions of a certain type
+ * @param scope scope of the constraint
+ * @param stateModelDefId identifies the state model containing the transition
+ * @param transition the transition to constrain
+ * @return Delta
+ */
+ public Delta removeTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition) {
+ _removedConstraints.get(ConstraintType.MESSAGE_CONSTRAINT).add(
+ ConstraintId.from(scope, stateModelDefId, transition));
+ return this;
+ }
+
+ /**
+ * Add a single constraint item
+ * @param type type of the constraint item
+ * @param constraintId unique constraint id
+ * @param item instantiated ConstraintItem
+ * @return Delta
+ */
+ public Delta addConstraintItem(ConstraintType type, ConstraintId constraintId,
+ ConstraintItem item) {
+ _builder.addConstraint(type, constraintId, item);
+ return this;
+ }
+
+ /**
+ * Remove a single constraint item
+ * @param type type of the constraint item
+ * @param constraintId unique constraint id
+ * @return Delta
+ */
+ public Delta removeConstraintItem(ConstraintType type, ConstraintId constraintId) {
+ _removedConstraints.get(type).add(constraintId);
+ return this;
+ }
+
+ /*
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Builder
+ */
+ public Delta setUserConfig(UserConfig userConfig) {
+ _builder.userConfig(userConfig);
+ _updateFields.add(Fields.USER_CONFIG);
+ return this;
+ }
+
+ /**
+ * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
+ * @param orig the original ClusterConfig
+ * @return updated ClusterConfig
+ */
+ public ClusterConfig mergeInto(ClusterConfig orig) {
+ // copy in original and updated fields
+ ClusterConfig deltaConfig = _builder.build();
+ Builder builder =
+ new Builder(orig.getId()).addResources(orig.getResourceMap().values())
+ .addParticipants(orig.getParticipantMap().values())
+ .addStateModelDefinitions(orig.getStateModelMap().values())
+ .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused());
+ for (Fields field : _updateFields) {
+ switch (field) {
+ case USER_CONFIG:
+ builder.userConfig(deltaConfig.getUserConfig());
+ break;
+ }
+ }
+ // add constraint deltas
+ for (ConstraintType type : ConstraintType.values()) {
+ ClusterConstraints constraints;
+ if (orig.getConstraintMap().containsKey(type)) {
+ constraints = orig.getConstraintMap().get(type);
+ } else {
+ constraints = new ClusterConstraints(type);
+ }
+ // add new constraints
+ if (deltaConfig.getConstraintMap().containsKey(type)) {
+ ClusterConstraints deltaConstraints = deltaConfig.getConstraintMap().get(type);
+ for (ConstraintId constraintId : deltaConstraints.getConstraintItems().keySet()) {
+ ConstraintItem constraintItem = deltaConstraints.getConstraintItem(constraintId);
+ constraints.addConstraintItem(constraintId, constraintItem);
+ }
+ }
+ // remove constraints
+ for (ConstraintId constraintId : _removedConstraints.get(type)) {
+ constraints.removeConstraintItem(constraintId);
+ }
+ builder.addConstraint(constraints);
+ }
+ return builder.build();
+ }
+ }
+
+ /**
+ * Assembles a cluster configuration
+ */
+ public static class Builder {
+ private final ClusterId _id;
+ private final Map<ResourceId, ResourceConfig> _resourceMap;
+ private final Map<ParticipantId, ParticipantConfig> _participantMap;
+ private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
+ private UserConfig _userConfig;
+ private boolean _isPaused;
+
+ /**
+ * Initialize builder for a cluster
+ * @param id cluster id
+ */
+ public Builder(ClusterId id) {
+ _id = id;
+ _resourceMap = new HashMap<ResourceId, ResourceConfig>();
+ _participantMap = new HashMap<ParticipantId, ParticipantConfig>();
+ _constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
+ _stateModelMap = new HashMap<StateModelDefId, StateModelDefinition>();
+ _isPaused = false;
+ _userConfig = new UserConfig(Scope.cluster(id));
+ }
+
+ /**
+ * Add a resource to the cluster
+ * @param resource resource configuration
+ * @return Builder
+ */
+ public Builder addResource(ResourceConfig resource) {
+ _resourceMap.put(resource.getId(), resource);
+ return this;
+ }
+
+ /**
+ * Add multiple resources to the cluster
+ * @param resources resource configurations
+ * @return Builder
+ */
+ public Builder addResources(Collection<ResourceConfig> resources) {
+ for (ResourceConfig resource : resources) {
+ addResource(resource);
+ }
+ return this;
+ }
+
+ /**
+ * Add a participant to the cluster
+ * @param participant participant configuration
+ * @return Builder
+ */
+ public Builder addParticipant(ParticipantConfig participant) {
+ _participantMap.put(participant.getId(), participant);
+ return this;
+ }
+
+ /**
+ * Add multiple participants to the cluster
+ * @param participants participant configurations
+ * @return Builder
+ */
+ public Builder addParticipants(Collection<ParticipantConfig> participants) {
+ for (ParticipantConfig participant : participants) {
+ addParticipant(participant);
+ }
+ return this;
+ }
+
+ /**
+ * Add a constraint to the cluster
+ * @param constraint cluster constraint of a specific type
+ * @return Builder
+ */
+ public Builder addConstraint(ClusterConstraints constraint) {
+ ClusterConstraints existConstraints = getConstraintsInstance(constraint.getType());
+ for (ConstraintId constraintId : constraint.getConstraintItems().keySet()) {
+ existConstraints
+ .addConstraintItem(constraintId, constraint.getConstraintItem(constraintId));
+ }
+ return this;
+ }
+
+ /**
+ * Add a single constraint item
+ * @param type type of the constraint
+ * @param constraintId unique constraint identifier
+ * @param item instantiated ConstraintItem
+ * @return Builder
+ */
+ public Builder addConstraint(ConstraintType type, ConstraintId constraintId, ConstraintItem item) {
+ ClusterConstraints existConstraints = getConstraintsInstance(type);
+ existConstraints.addConstraintItem(constraintId, item);
+ return this;
+ }
+
+ /**
+ * Add multiple constraints to the cluster
+ * @param constraints cluster constraints of multiple distinct types
+ * @return Builder
+ */
+ public Builder addConstraints(Collection<ClusterConstraints> constraints) {
+ for (ClusterConstraints constraint : constraints) {
+ addConstraint(constraint);
+ }
+ return this;
+ }
+
+ /**
+ * Add a constraint on the maximum number of in-flight transitions of a certain type
+ * @param scope scope of the constraint
+ * @param stateModelDefId identifies the state model containing the transition
+ * @param transition the transition to constrain
+ * @param maxInFlightTransitions number of allowed in-flight transitions in the scope
+ * @return Builder
+ */
+ public Builder addTransitionConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ Transition transition, int maxInFlightTransitions) {
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put(ConstraintAttribute.MESSAGE_TYPE.toString(),
+ MessageType.STATE_TRANSITION.toString());
+ attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(),
+ Integer.toString(maxInFlightTransitions));
+ attributes.put(ConstraintAttribute.TRANSITION.toString(), transition.toString());
+ attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+ break;
+ case PARTICIPANT:
+ attributes.put(ConstraintAttribute.INSTANCE.toString(), scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for adding a transition constraint: " + scope);
+ return this;
+ }
+ ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+ ClusterConstraints constraints = getConstraintsInstance(ConstraintType.MESSAGE_CONSTRAINT);
+ constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, transition), item);
+ return this;
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param upperBound maximum number of replicas per partition in the state
+ * @return Builder
+ */
+ public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, int upperBound) {
+ return addStateUpperBoundConstraint(scope, stateModelDefId, state,
+ Integer.toString(upperBound));
+ }
+
+ /**
+ * Add a state upper bound constraint
+ * @param scope scope under which the constraint is valid
+ * @param stateModelDefId identifier of the state model that owns the state
+ * @param state the state to constrain
+ * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
+ * number, or the currently supported special bound values:<br />
+ * "R" - Refers to the number of replicas specified during resource
+ * creation. This allows having different replication factor for each
+ * resource without having to create a different state machine. <br />
+ * "N" - Refers to all nodes in the cluster. Useful for resources that need
+ * to exist on all nodes. This way one can add/remove nodes without having
+ * the change the bounds.
+ * @return Builder
+ */
+ public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
+ State state, String dynamicUpperBound) {
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put(ConstraintAttribute.STATE.toString(), state.toString());
+ attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
+ attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), dynamicUpperBound);
+ switch (scope.getType()) {
+ case CLUSTER:
+ // cluster is implicit
+ break;
+ case RESOURCE:
+ attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
+ break;
+ default:
+ LOG.error("Unsupported scope for adding a state constraint: " + scope);
+ return this;
+ }
+ ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
+ ClusterConstraints constraints = getConstraintsInstance(ConstraintType.STATE_CONSTRAINT);
+ constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, state), item);
+ return this;
+ }
+
+ /**
+ * Add a state model definition to the cluster
+ * @param stateModelDef state model definition of the cluster
+ * @return Builder
+ */
+ public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
+ _stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
+ // add state constraints from the state model definition
+ for (State state : stateModelDef.getStatesPriorityList()) {
+ if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
+ addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
+ state, stateModelDef.getNumParticipantsPerState(state));
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Add multiple state model definitions
+ * @param stateModelDefs collection of state model definitions for the cluster
+ * @return Builder
+ */
+ public Builder addStateModelDefinitions(Collection<StateModelDefinition> stateModelDefs) {
+ for (StateModelDefinition stateModelDef : stateModelDefs) {
+ addStateModelDefinition(stateModelDef);
+ }
+ return this;
+ }
+
+ /**
+ * Set the paused status of the cluster
+ * @param isPaused true if paused, false otherwise
+ * @return Builder
+ */
+ public Builder pausedStatus(boolean isPaused) {
+ _isPaused = isPaused;
+ return this;
+ }
+
+ /**
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Builder
+ */
+ public Builder userConfig(UserConfig userConfig) {
+ _userConfig = userConfig;
+ return this;
+ }
+
+ /**
+ * Create the cluster configuration
+ * @return ClusterConfig
+ */
+ public ClusterConfig build() {
+ return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
+ _userConfig, _isPaused);
+ }
+
+ /**
+ * Get a valid instance of ClusterConstraints for a type
+ * @param type the type
+ * @return ClusterConstraints
+ */
+ private ClusterConstraints getConstraintsInstance(ConstraintType type) {
+ ClusterConstraints constraints = _constraintMap.get(type);
+ if (constraints == null) {
+ constraints = new ClusterConstraints(type);
+ _constraintMap.put(type, constraints);
+ }
+ return constraints;
+ }
+ }
+}