You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/03 18:25:23 UTC
git commit: [HELIX-215] Adding new recipe on how to write a custom
rebalancer
Updated Branches:
refs/heads/master 1d3c32ed2 -> 19c684174
[HELIX-215] Adding new recipe on how to write a custom rebalancer
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/19c68417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/19c68417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/19c68417
Branch: refs/heads/master
Commit: 19c684174e7d9f6bb84a7feab255a505c6f6ad2c
Parents: 1d3c32e
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Tue Sep 3 09:24:56 2013 -0700
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Tue Sep 3 09:24:56 2013 -0700
----------------------------------------------------------------------
helix-core/pom.xml | 9 +
.../stages/BestPossibleStateCalcStage.java | 8 +-
.../java/org/apache/helix/model/IdealState.java | 14 +-
.../apache/helix/tools/YAMLClusterSetup.java | 287 +++++++++++++++++++
pom.xml | 10 +
recipes/pom.xml | 1 +
recipes/user-defined-rebalancer/README.md | 254 ++++++++++++++++
recipes/user-defined-rebalancer/pom.xml | 139 +++++++++
.../src/main/config/log4j.properties | 31 ++
.../helix/userdefinedrebalancer/Lock.java | 48 ++++
.../userdefinedrebalancer/LockFactory.java | 34 +++
.../userdefinedrebalancer/LockManagerDemo.java | 192 +++++++++++++
.../LockManagerRebalancer.java | 84 ++++++
.../userdefinedrebalancer/LockProcess.java | 79 +++++
.../src/main/resources/lock-manager-config.yaml | 69 +++++
.../src/test/conf/testng.xml | 27 ++
16 files changed, 1279 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index af04d85..22d1b2c 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -150,6 +150,11 @@ under the License.
<artifactId>guava</artifactId>
<version>r09</version>
</dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.12</version>
+ </dependency>
</dependencies>
<build>
<resources>
@@ -213,6 +218,10 @@ under the License.
<mainClass>org.apache.helix.tools.JmxDumper</mainClass>
<name>JmxDumper</name>
</program>
+ <program>
+ <mainClass>org.apache.helix.tools.YAMLClusterSetup</mainClass>
+ <name>yaml-cluster-setup</name>
+ </program>
</programs>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index e812e16..11955f5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -118,9 +118,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
rebalancer.init(manager);
ResourceAssignment partitionStateAssignment =
rebalancer.computeResourceMapping(resource, idealState, currentStateOutput, cache);
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
- output.setState(resourceName, partition, newStateMap);
+ if (partitionStateAssignment != null) {
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> newStateMap = partitionStateAssignment.getReplicaMap(partition);
+ output.setState(resourceName, partition, newStateMap);
+ }
}
}
return output;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index e14940a..90a2dff 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -461,13 +461,19 @@ public class IdealState extends HelixProperty {
return _record.getSimpleField(IdealStateProperty.INSTANCE_GROUP_TAG.toString());
}
+ /**
+ * Update the ideal state mapping from a ResourceAssignment
+ * @param assignment ResourceAssignment result from the rebalancer
+ */
public void updateFromAssignment(ResourceAssignment assignment) {
_record.getMapFields().clear();
_record.getListFields().clear();
- for (Partition partition : assignment.getMappedPartitions()) {
- Map<String, String> replicaMap = assignment.getReplicaMap(partition);
- setInstanceStateMap(partition.getPartitionName(), replicaMap);
- setPreferenceList(partition.getPartitionName(), new ArrayList<String>(replicaMap.keySet()));
+ if (assignment != null) {
+ for (Partition partition : assignment.getMappedPartitions()) {
+ Map<String, String> replicaMap = assignment.getReplicaMap(partition);
+ setInstanceStateMap(partition.getPartitionName(), replicaMap);
+ setPreferenceList(partition.getPartitionName(), new ArrayList<String>(replicaMap.keySet()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
new file mode 100644
index 0000000..c7233ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/YAMLClusterSetup.java
@@ -0,0 +1,287 @@
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ParticipantConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.ConstraintsConfig;
+import org.apache.helix.tools.YAMLClusterSetup.YAMLClusterConfig.ResourceConfig.StateModelConfig;
+import org.apache.log4j.Logger;
+import org.yaml.snakeyaml.Yaml;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Supports HelixAdmin operations specified by a YAML configuration file defining a cluster,
+ * resources, participants, etc.
+ * See the user-rebalanced-lock-manager recipe for an annotated example file.
+ */
+public class YAMLClusterSetup {
+ private static final Logger LOG = Logger.getLogger(YAMLClusterSetup.class);
+
+ private final String _zkAddress;
+
+ /**
+ * Start the YAML parser for a given zookeeper instance
+ * @param zkAddress
+ */
+ public YAMLClusterSetup(String zkAddress) {
+ _zkAddress = zkAddress;
+ }
+
+ /**
+ * Set up the cluster by parsing a YAML file.
+ * @param input InputStream representing the file
+ * @return ClusterConfig Java wrapper of the configuration file
+ */
+ public YAMLClusterConfig setupCluster(InputStream input) {
+ // parse the YAML
+ Yaml yaml = new Yaml();
+ YAMLClusterConfig cfg = yaml.loadAs(input, YAMLClusterConfig.class);
+
+ // create the cluster
+ HelixAdmin helixAdmin = new ZKHelixAdmin(_zkAddress);
+ if (cfg.clusterName == null) {
+ throw new HelixException("Cluster name is required!");
+ }
+ helixAdmin.addCluster(cfg.clusterName);
+
+ // add each participant
+ if (cfg.participants != null) {
+ for (ParticipantConfig participant : cfg.participants) {
+ helixAdmin.addInstance(cfg.clusterName, getInstanceCfg(participant));
+ }
+ }
+
+ // add each resource
+ if (cfg.resources != null) {
+ for (ResourceConfig resource : cfg.resources) {
+ if (resource.name == null) {
+ throw new HelixException("Resources must be named!");
+ }
+ if (resource.stateModel == null || resource.stateModel.name == null) {
+ throw new HelixException("Resource must specify a named state model!");
+ }
+ // if states is null, assume using a built-in or already-added state model
+ if (resource.stateModel.states != null) {
+ StateModelDefinition stateModelDef =
+ getStateModelDef(resource.stateModel, resource.constraints);
+ helixAdmin.addStateModelDef(cfg.clusterName, resource.stateModel.name, stateModelDef);
+ }
+ int partitions = 1;
+ int replicas = 1;
+ if (resource.partitions != null) {
+ if (resource.partitions.containsKey("count")) {
+ partitions = resource.partitions.get("count");
+ }
+ if (resource.partitions.containsKey("replicas")) {
+ replicas = resource.partitions.get("replicas");
+ }
+ }
+
+ if (resource.rebalancer == null || !resource.rebalancer.containsKey("mode")) {
+ throw new HelixException("Rebalance mode is required!");
+ }
+ helixAdmin.addResource(cfg.clusterName, resource.name, partitions,
+ resource.stateModel.name, resource.rebalancer.get("mode"));
+ // user-defined rebalancer
+ if (resource.rebalancer.containsKey("class")
+ && resource.rebalancer.get("mode").equals(RebalanceMode.USER_DEFINED.toString())) {
+ IdealState idealState = helixAdmin.getResourceIdealState(cfg.clusterName, resource.name);
+ idealState.setRebalancerClassName(resource.rebalancer.get("class"));
+ helixAdmin.setResourceIdealState(cfg.clusterName, resource.name, idealState);
+ }
+ helixAdmin.rebalance(cfg.clusterName, resource.name, replicas);
+ }
+ }
+ return cfg;
+ }
+
+ private static InstanceConfig getInstanceCfg(ParticipantConfig participant) {
+ if (participant == null || participant.name == null || participant.host == null
+ || participant.port == null) {
+ throw new HelixException("Participant must have a specified name, host, and port!");
+ }
+ InstanceConfig instanceCfg = new InstanceConfig(participant.name);
+ instanceCfg.setHostName(participant.host);
+ instanceCfg.setPort(participant.port.toString());
+ return instanceCfg;
+ }
+
+ private static StateModelDefinition getStateModelDef(StateModelConfig stateModel,
+ ConstraintsConfig constraints) {
+ // Use a builder to define the state model
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(stateModel.name);
+ if (stateModel.states == null || stateModel.states.size() == 0) {
+ throw new HelixException("List of states are required in a state model!");
+ }
+ Set<String> stateSet = new HashSet<String>(stateModel.states);
+ if (stateModel.initialState == null) {
+ throw new HelixException("Initial state is required in a state model!");
+ } else if (!stateSet.contains(stateModel.initialState)) {
+ throw new HelixException("Initial state is not a valid state");
+ }
+ builder.initialState(stateModel.initialState);
+
+ // Build a helper for state priorities
+ Map<String, Integer> statePriorities = new HashMap<String, Integer>();
+ if (constraints != null && constraints.state != null && constraints.state.priorityList != null) {
+ int statePriority = 0;
+ for (String state : constraints.state.priorityList) {
+ if (!stateSet.contains(state)) {
+ throw new HelixException("State " + state
+ + " in the state priority list is not in the state list!");
+ }
+ statePriorities.put(state, statePriority);
+ statePriority++;
+ }
+ }
+
+ // Add states, set state priorities
+ for (String state : stateModel.states) {
+ if (statePriorities.containsKey(state)) {
+ builder.addState(state, statePriorities.get(state));
+ } else {
+ builder.addState(state);
+ }
+ }
+
+ // Set state counts
+ for (Map<String, String> counts : constraints.state.counts) {
+ String state = counts.get("name");
+ if (!stateSet.contains(state)) {
+ throw new HelixException("State " + state + " has a count, but not in the state list!");
+ }
+ builder.dynamicUpperBound(state, counts.get("count"));
+ }
+
+ // Build a helper for transition priorities
+ Map<String, Integer> transitionPriorities = new HashMap<String, Integer>();
+ if (constraints != null && constraints.transition != null
+ && constraints.transition.priorityList != null) {
+ int transitionPriority = 0;
+ for (String transition : constraints.transition.priorityList) {
+ transitionPriorities.put(transition, transitionPriority);
+ transitionPriority++;
+ }
+ }
+
+ // Add the transitions
+ if (stateModel.transitions == null || stateModel.transitions.size() == 0) {
+ throw new HelixException("Transitions are required!");
+ }
+ for (Map<String, String> transitions : stateModel.transitions) {
+ String name = transitions.get("name");
+ String from = transitions.get("from");
+ String to = transitions.get("to");
+ if (name == null || from == null || to == null) {
+ throw new HelixException("All transitions must have a name, a from state, and a to state");
+ }
+ if (transitionPriorities.containsKey(name)) {
+ builder.addTransition(from, to, transitionPriorities.get(name));
+ } else {
+ builder.addTransition(from, to);
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Java wrapper for the YAML input file
+ */
+ public static class YAMLClusterConfig {
+ public String clusterName;
+ public List<ResourceConfig> resources;
+ public List<ParticipantConfig> participants;
+
+ public static class ResourceConfig {
+ public String name;
+ public Map<String, String> rebalancer;
+ public Map<String, Integer> partitions;
+ public StateModelConfig stateModel;
+ public ConstraintsConfig constraints;
+
+ public static class StateModelConfig {
+ public String name;
+ public List<String> states;
+ public List<Map<String, String>> transitions;
+ public String initialState;
+ }
+
+ public static class ConstraintsConfig {
+ public StateConstraintsConfig state;
+ public TransitionConstraintsConfig transition;
+
+ public static class StateConstraintsConfig {
+ public List<Map<String, String>> counts;
+ public List<String> priorityList;
+ }
+
+ public static class TransitionConstraintsConfig {
+ public List<String> priorityList;
+ }
+ }
+ }
+
+ public static class ParticipantConfig {
+ public String name;
+ public String host;
+ public Integer port;
+ }
+ }
+
+ /**
+ * Start a cluster defined by a YAML file
+ * @param args zkAddr, yamlFile
+ */
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ LOG.error("USAGE: YAMLClusterSetup zkAddr yamlFile");
+ return;
+ }
+ String zkAddress = args[0];
+ String yamlFile = args[1];
+
+ InputStream input;
+ try {
+ input = new FileInputStream(new File(yamlFile));
+ } catch (FileNotFoundException e) {
+ LOG.error("Could not open " + yamlFile);
+ return;
+ }
+ new YAMLClusterSetup(zkAddress).setupCluster(input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ee6f573..6840410 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,6 +164,11 @@ under the License.
<enabled>false</enabled>
</snapshots>
</repository>
+ <repository>
+ <id>Sonatype-public</id>
+ <name>SnakeYAML repository</name>
+ <url>http://oss.sonatype.org/content/groups/public/</url>
+ </repository>
</repositories>
@@ -285,6 +290,11 @@ under the License.
<artifactId>testng</artifactId>
<version>6.0.1</version>
</dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.12</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/pom.xml b/recipes/pom.xml
index d0a93b1..3667650 100644
--- a/recipes/pom.xml
+++ b/recipes/pom.xml
@@ -33,6 +33,7 @@ under the License.
<module>rabbitmq-consumer-group</module>
<module>rsync-replicated-file-system</module>
<module>distributed-lock-manager</module>
+ <module>user-defined-rebalancer</module>
<module>task-execution</module>
<module>service-discovery</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/README.md
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/README.md b/recipes/user-defined-rebalancer/README.md
new file mode 100644
index 0000000..3dca51c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/README.md
@@ -0,0 +1,254 @@
+<!---
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+Distributed lock manager with a user-defined rebalancer and YAML configuration
+------------------------------------------------------------------------------
+This recipe is a second take on the distributed lock manager example with two key differences
+ * Instead of specifying the cluster using the HelixAdmin Java API, a YAML file indicates the cluster, its resources, and its participants. This is a simplified way to bootstrap cluster creation with a compact, logical hierarchy.
+ * The rebalancing process (i.e. the algorithm that uses the cluster state to determine an assignment of locks to participants) is specified in a class defined by the recipe itself, completely independent of Helix.
+
+For additional background and motivation, see the distributed-lock-manager recipe.
+
+### YAML Cluster Setup
+The YAML configuration below specifies a state model for a lock in which it can be locked and unlocked. At most one participant can hold the lock at any time, and there are 12 locks to distribute across 4 participants.
+
+```
+clusterName: lock-manager-custom-rebalancer # unique name for the cluster
+resources:
+ - name: lock-group # unique resource name
+ rebalancer: # we will provide our own rebalancer
+ mode: USER_DEFINED
+ class: org.apache.helix.userdefinedrebalancer.LockManagerRebalancer
+ partitions:
+ count: 12 # number of locks
+ replicas: 1 # number of simultaneous holders for each lock
+ stateModel:
+ name: lock-unlock # unique model name
+ states: [LOCKED, RELEASED, DROPPED] # the list of possible states
+ transitions: # the list of possible transitions
+ - name: Unlock
+ from: LOCKED
+ to: RELEASED
+ - name: Lock
+ from: RELEASED
+ to: LOCKED
+ - name: DropLock
+ from: LOCKED
+ to: DROPPED
+ - name: DropUnlock
+ from: RELEASED
+ to: DROPPED
+ - name: Undrop
+ from: DROPPED
+ to: RELEASED
+ initialState: RELEASED
+ constraints:
+ state:
+ counts: # maximum number of replicas of a partition that can be in each state
+ - name: LOCKED
+ count: "1"
+ - name: RELEASED
+ count: "-1"
+ - name: DROPPED
+ count: "-1"
+ priorityList: [LOCKED, RELEASED, DROPPED] # states in order of priority
+ transition: # transitions priority to enforce order that transitions occur
+ priorityList: [Unlock, Lock, Undrop, DropUnlock, DropLock]
+participants: # list of nodes that can acquire locks
+ - name: localhost_12001
+ host: localhost
+ port: 12001
+ - name: localhost_12002
+ host: localhost
+ port: 12002
+ - name: localhost_12003
+ host: localhost
+ port: 12003
+```
+
+### User-Defined Rebalancer
+The implementation of the Rebalancer interface is quite simple. It assumes a Lock/Unlock model where the lock state has highest priority. It uses a mod-based approach to fairly assign locks to participants so that no participant holds more than one instance of a lock, and each lock is only assigned to as many participants as can hold the same lock simultaneously. In the configuration above, only one participant can hold a given lock in the locked state.
+
+The result is a ResourceMapping, which maps each lock to its holder and its lock state. In Helix terminology, the lock manager is the resource, a lock is a partition, its holder is a participant, and the lock state is the current state of the lock based on one of the pre-defined states in the state model.
+
+```
+@Override
+public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ // Initialize an empty mapping of locks to participants
+ ResourceAssignment assignment = new ResourceAssignment(resource.getResourceName());
+
+ // Get the list of live participants in the cluster
+ List<String> liveParticipants = new ArrayList<String>(clusterData.getLiveInstances().keySet());
+
+ // Get the state model (should be a simple lock/unlock model) and the highest-priority state
+ String stateModelName = currentIdealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+ if (stateModelDef.getStatesPriorityList().size() < 1) {
+ LOG.error("Invalid state model definition. There should be at least one state.");
+ return assignment;
+ }
+ String lockState = stateModelDef.getStatesPriorityList().get(0);
+
+ // Count the number of participants allowed to lock each lock
+ String stateCount = stateModelDef.getNumInstancesPerState(lockState);
+ int lockHolders = 0;
+ try {
+ // a numeric value is a custom-specified number of participants allowed to lock the lock
+ lockHolders = Integer.parseInt(stateCount);
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid state model definition. The lock state does not have a valid count");
+ return assignment;
+ }
+
+ // Fairly assign the lock state to the participants using a simple mod-based sequential
+ // assignment. For instance, if each lock can be held by 3 participants, lock 0 would be held
+ // by participants (0, 1, 2), lock 1 would be held by (1, 2, 3), and so on, wrapping around the
+ // number of participants as necessary.
+ // This assumes a simple lock-unlock model where the only state of interest is which nodes have
+ // acquired each lock.
+ int i = 0;
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> replicaMap = new HashMap<String, String>();
+ for (int j = i; j < i + lockHolders; j++) {
+ int participantIndex = j % liveParticipants.size();
+ String participant = liveParticipants.get(participantIndex);
+ // enforce that a participant can only have one instance of a given lock
+ if (!replicaMap.containsKey(participant)) {
+ replicaMap.put(participant, lockState);
+ }
+ }
+ assignment.addReplicaMap(partition, replicaMap);
+ i++;
+ }
+ return assignment;
+}
+```
+----------------------------------------------------------------------------------------
+
+#### In Action
+
+##### Specifying a Lock StateModel
+In our configuration file, we indicated a special state model with two key states: LOCKED and RELEASED. Thus, we need to provide for the participant a subclass of StateModel that can respond to transitions between those states.
+
+```
+public class Lock extends StateModel {
+ private String lockName;
+
+ public Lock(String lockName) {
+ this.lockName = lockName;
+ }
+
+ @Transition(from = "RELEASED", to = "LOCKED")
+ public void lock(Message m, NotificationContext context) {
+ System.out.println(context.getManager().getInstanceName() + " acquired lock:" + lockName);
+ }
+
+ @Transition(from = "LOCKED", to = "RELEASED")
+ public void release(Message m, NotificationContext context) {
+ System.out.println(context.getManager().getInstanceName() + " releasing lock:" + lockName);
+ }
+}
+```
+
+##### Loading the configuration file
+We include a YAML file parser that will set up the cluster according to the specifications of the file. Here is the code that this example uses to set up the cluster:
+
+```
+YAMLClusterSetup setup = new YAMLClusterSetup(zkAddress);
+InputStream input =
+ Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream("lock-manager-config.yaml");
+YAMLClusterSetup.YAMLClusterConfig config = setup.setupCluster(input);
+```
+At this point, the cluster is set up and the configuration is persisted on Zookeeper. The config variable contains a snapshot of this configuration for further access.
+
+##### Building
+```
+git clone https://git-wip-us.apache.org/repos/asf/incubator-helix.git
+cd incubator-helix
+mvn clean install package -DskipTests
+cd recipes/user-rebalanced-lock-manager/target/user-rebalanced-lock-manager-pkg/bin
+chmod +x *
+./lock-manager-demo
+```
+
+##### Output
+
+```
+./lock-manager-demo
+STARTING localhost_12002
+STARTING localhost_12001
+STARTING localhost_12003
+STARTED localhost_12001
+STARTED localhost_12003
+STARTED localhost_12002
+localhost_12003 acquired lock:lock-group_4
+localhost_12002 acquired lock:lock-group_8
+localhost_12001 acquired lock:lock-group_10
+localhost_12001 acquired lock:lock-group_3
+localhost_12001 acquired lock:lock-group_6
+localhost_12003 acquired lock:lock-group_0
+localhost_12002 acquired lock:lock-group_5
+localhost_12001 acquired lock:lock-group_9
+localhost_12002 acquired lock:lock-group_2
+localhost_12003 acquired lock:lock-group_7
+localhost_12003 acquired lock:lock-group_11
+localhost_12002 acquired lock:lock-group_1
+lockName acquired By
+======================================
+lock-group_0 localhost_12003
+lock-group_1 localhost_12002
+lock-group_10 localhost_12001
+lock-group_11 localhost_12003
+lock-group_2 localhost_12002
+lock-group_3 localhost_12001
+lock-group_4 localhost_12003
+lock-group_5 localhost_12002
+lock-group_6 localhost_12001
+lock-group_7 localhost_12003
+lock-group_8 localhost_12002
+lock-group_9 localhost_12001
+Stopping the first participant
+localhost_12001 Interrupted
+localhost_12002 acquired lock:lock-group_3
+localhost_12003 acquired lock:lock-group_6
+localhost_12003 acquired lock:lock-group_10
+localhost_12002 acquired lock:lock-group_9
+lockName acquired By
+======================================
+lock-group_0 localhost_12003
+lock-group_1 localhost_12002
+lock-group_10 localhost_12003
+lock-group_11 localhost_12003
+lock-group_2 localhost_12002
+lock-group_3 localhost_12002
+lock-group_4 localhost_12003
+lock-group_5 localhost_12002
+lock-group_6 localhost_12003
+lock-group_7 localhost_12003
+lock-group_8 localhost_12002
+lock-group_9 localhost_12002
+```
+
+----------------------------------------------------------------------------------------
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/pom.xml b/recipes/user-defined-rebalancer/pom.xml
new file mode 100644
index 0000000..ebd972c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.helix.recipes</groupId>
+ <artifactId>recipes</artifactId>
+ <version>0.6.2-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>user-defined-rebalancer</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Helix :: Recipes :: user-defined-rebalancer</name>
+
+ <properties>
+ <osgi.import>
+ org.apache.helix*,
+ org.apache.log4j,
+ *
+ </osgi.import>
+ <osgi.export>org.apache.helix.userdefinedrebalancer*;version="${project.version};-noimport:=true</osgi.export>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <!-- Set the target configuration directory to be used in the bin scripts -->
+ <!-- <configurationDirectory>conf</configurationDirectory> -->
+ <!-- Copy the contents from "/src/main/config" to the target configuration
+ directory in the assembled application -->
+ <!-- <copyConfigurationDirectory>true</copyConfigurationDirectory> -->
+ <!-- Include the target configuration directory in the beginning of
+ the classpath declaration in the bin scripts -->
+ <includeConfigurationDirectoryInClasspath>true</includeConfigurationDirectoryInClasspath>
+ <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
+ <!-- Extra JVM arguments that will be included in the bin scripts -->
+ <extraJvmArguments>-Xms512m -Xmx512m</extraJvmArguments>
+ <!-- Generate bin scripts for windows and unix pr default -->
+ <platforms>
+ <platform>windows</platform>
+ <platform>unix</platform>
+ </platforms>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>org.apache.helix.userdefinedrebalancer.LockManagerDemo</mainClass>
+ <name>lock-manager-demo</name>
+ </program>
+ </programs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/config/log4j.properties b/recipes/user-defined-rebalancer/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
new file mode 100644
index 0000000..ceba1ed
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/Lock.java
@@ -0,0 +1,48 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(initialState = "RELEASED", states = {
+ "RELEASED", "LOCKED"
+})
+public class Lock extends StateModel {
+ private String lockName;
+
+ public Lock(String lockName) {
+ this.lockName = lockName;
+ }
+
+ @Transition(from = "RELEASED", to = "LOCKED")
+ public void lock(Message m, NotificationContext context) {
+ System.out.println(context.getManager().getInstanceName() + " acquired lock:" + lockName);
+ }
+
+ @Transition(from = "LOCKED", to = "RELEASED")
+ public void release(Message m, NotificationContext context) {
+ System.out.println(context.getManager().getInstanceName() + " releasing lock:" + lockName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
new file mode 100644
index 0000000..3aec20c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockFactory.java
@@ -0,0 +1,34 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ * This factory allows a participant to get the appropriate state model callbacks for the lock
+ * manager state model. This is used exactly once per participant to get a valid instance of a Lock,
+ * and then the same Lock instance is used for all state transition callbacks.
+ */
+public class LockFactory extends StateModelFactory<Lock> {
+ @Override
+ public Lock createNewStateModel(String lockName) {
+ return new Lock(lockName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java
new file mode 100644
index 0000000..727c5b7
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerDemo.java
@@ -0,0 +1,192 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.YAMLClusterSetup;
+import org.apache.log4j.Logger;
+
+public class LockManagerDemo {
+ private static final Logger LOG = Logger.getLogger(LockManagerDemo.class);
+
+ /**
+ * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ final String zkAddress = "localhost:2199";
+
+ // default participant parameters in case the config does not specify them
+ int numInstances = 3;
+ boolean instancesSpecified = false;
+ Thread[] processArray = new Thread[numInstances];
+
+ // HelixManager for setting up the controller
+ HelixManager controllerManager = null;
+
+ // Name of the lock group resource (specified by the config file)
+ String lockGroupName = null;
+ try {
+ startLocalZookeeper(2199);
+ YAMLClusterSetup setup = new YAMLClusterSetup(zkAddress);
+ InputStream input =
+ Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream("lock-manager-config.yaml");
+ final YAMLClusterSetup.YAMLClusterConfig config = setup.setupCluster(input);
+ if (config == null) {
+ LOG.error("Invalid YAML configuration");
+ return;
+ }
+ if (config.resources == null || config.resources.isEmpty()) {
+ LOG.error("Need to specify a resource!");
+ return;
+ }
+
+ // save resource name
+ lockGroupName = config.resources.get(0).name;
+
+ // save participants if specified
+ if (config.participants != null && config.participants.size() > 0) {
+ numInstances = config.participants.size();
+ instancesSpecified = true;
+ processArray = new Thread[numInstances];
+ }
+
+ // run each participant
+ for (int i = 0; i < numInstances; i++) {
+ String participantName;
+ if (instancesSpecified) {
+ participantName = config.participants.get(i).name;
+ } else {
+ participantName = "localhost_" + (12000 + i);
+ }
+ final String instanceName = participantName;
+ processArray[i] = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ LockProcess lockProcess = null;
+
+ try {
+ lockProcess =
+ new LockProcess(config.clusterName, zkAddress, instanceName,
+ config.resources.get(0).stateModel.name);
+ lockProcess.start();
+ Thread.currentThread().join();
+ } catch (InterruptedException e) {
+ System.out.println(instanceName + " Interrupted");
+ if (lockProcess != null) {
+ lockProcess.stop();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ });
+ processArray[i].start();
+ }
+ Thread.sleep(3000);
+
+ // start the controller
+ controllerManager =
+ HelixControllerMain.startHelixController(zkAddress, config.clusterName, "controller",
+ HelixControllerMain.STANDALONE);
+ Thread.sleep(5000);
+
+ // HelixAdmin for querying cluster state
+ HelixAdmin admin = new ZKHelixAdmin(zkAddress);
+
+ printStatus(admin, config.clusterName, lockGroupName);
+
+ // stop one participant
+ System.out.println("Stopping the first participant");
+ processArray[0].interrupt();
+ Thread.sleep(3000);
+ printStatus(admin, config.clusterName, lockGroupName);
+ Thread.currentThread().join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (controllerManager != null) {
+ controllerManager.disconnect();
+ }
+ for (Thread process : processArray) {
+ if (process != null) {
+ process.interrupt();
+ }
+ }
+ }
+ }
+
+ private static void printStatus(HelixAdmin admin, String cluster, String resource) {
+ ExternalView externalView = admin.getResourceExternalView(cluster, resource);
+ TreeSet<String> treeSet = new TreeSet<String>(externalView.getPartitionSet());
+ System.out.println("lockName" + "\t" + "acquired By");
+ System.out.println("======================================");
+ for (String lockName : treeSet) {
+ Map<String, String> stateMap = externalView.getStateMap(lockName);
+ String acquiredBy = null;
+ if (stateMap != null) {
+ for (String instanceName : stateMap.keySet()) {
+ if ("LOCKED".equals(stateMap.get(instanceName))) {
+ acquiredBy = instanceName;
+ break;
+ }
+ }
+ }
+ System.out.println(lockName + "\t" + ((acquiredBy != null) ? acquiredBy : "NONE"));
+ }
+ }
+
+ private static void startLocalZookeeper(int port) throws Exception {
+ ZkServer server = null;
+ String baseDir = "/tmp/IntegrationTest/";
+ final String dataDir = baseDir + "zk/dataDir";
+ final String logDir = baseDir + "/tmp/logDir";
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+ int zkPort = 2199;
+ server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+ server.start();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
new file mode 100644
index 0000000..e65113c
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockManagerRebalancer.java
@@ -0,0 +1,84 @@
+package org.apache.helix.userdefinedrebalancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class LockManagerRebalancer implements Rebalancer {
+ private static final Logger LOG = Logger.getLogger(LockManagerRebalancer.class);
+
+ @Override
+ public void init(HelixManager manager) {
+ // do nothing; this rebalancer is independent of the manager
+ }
+
+ /**
+ * This rebalancer is invoked whenever there is a change in the cluster, including when new
+ * participants join or leave, or the configuration of any participant changes. It is written
+ * specifically to handle assignment of locks to nodes under the very simple lock-unlock state
+ * model.
+ */
+ @Override
+ public ResourceAssignment computeResourceMapping(Resource resource, IdealState currentIdealState,
+ CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+ // Initialize an empty mapping of locks to participants
+ ResourceAssignment assignment = new ResourceAssignment(resource.getResourceName());
+
+ // Get the list of live participants in the cluster
+ List<String> liveParticipants = new ArrayList<String>(clusterData.getLiveInstances().keySet());
+
+ // Get the state model (should be a simple lock/unlock model) and the highest-priority state
+ String stateModelName = currentIdealState.getStateModelDefRef();
+ StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
+ if (stateModelDef.getStatesPriorityList().size() < 1) {
+ LOG.error("Invalid state model definition. There should be at least one state.");
+ return assignment;
+ }
+ String lockState = stateModelDef.getStatesPriorityList().get(0);
+
+ // Count the number of participants allowed to lock each lock
+ String stateCount = stateModelDef.getNumInstancesPerState(lockState);
+ int lockHolders = 0;
+ try {
+ // a numeric value is a custom-specified number of participants allowed to lock the lock
+ lockHolders = Integer.parseInt(stateCount);
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid state model definition. The lock state does not have a valid count");
+ return assignment;
+ }
+
+ // Fairly assign the lock state to the participants using a simple mod-based sequential
+ // assignment. For instance, if each lock can be held by 3 participants, lock 0 would be held
+ // by participants (0, 1, 2), lock 1 would be held by (1, 2, 3), and so on, wrapping around the
+ // number of participants as necessary.
+ // This assumes a simple lock-unlock model where the only state of interest is which nodes have
+ // acquired each lock.
+ int i = 0;
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> replicaMap = new HashMap<String, String>();
+ for (int j = i; j < i + lockHolders; j++) {
+ int participantIndex = j % liveParticipants.size();
+ String participant = liveParticipants.get(participantIndex);
+ // enforce that a participant can only have one instance of a given lock
+ if (!replicaMap.containsKey(participant)) {
+ replicaMap.put(participant, lockState);
+ }
+ }
+ assignment.addReplicaMap(partition, replicaMap);
+ i++;
+ }
+ return assignment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
new file mode 100644
index 0000000..ee363b5
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/java/org/apache/helix/userdefinedrebalancer/LockProcess.java
@@ -0,0 +1,79 @@
+package org.apache.helix.userdefinedrebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+
+public class LockProcess {
+ private final String clusterName;
+ private final String zkAddress;
+ private final String instanceName;
+ private final String stateModelName;
+ private HelixManager participantManager;
+
+ LockProcess(String clusterName, String zkAddress, String instanceName, String stateModelName) {
+ this.clusterName = clusterName;
+ this.zkAddress = zkAddress;
+ this.instanceName = instanceName;
+ this.stateModelName = stateModelName;
+
+ }
+
+ public void start() throws Exception {
+ System.out.println("STARTING " + instanceName);
+ configureInstance(instanceName);
+ participantManager =
+ HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT,
+ zkAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(stateModelName,
+ new LockFactory());
+ participantManager.connect();
+ System.out.println("STARTED " + instanceName);
+ }
+
+ /**
+ * Configure the instance, the configuration of each node is available to
+ * other nodes.
+ * @param instanceName
+ */
+ private void configureInstance(String instanceName) {
+ ZKHelixAdmin helixAdmin = new ZKHelixAdmin(zkAddress);
+
+ List<String> instancesInCluster = helixAdmin.getInstancesInCluster(clusterName);
+ if (instancesInCluster == null || !instancesInCluster.contains(instanceName)) {
+ InstanceConfig config = new InstanceConfig(instanceName);
+ config.setHostName("localhost");
+ config.setPort("12000");
+ helixAdmin.addInstance(clusterName, config);
+ }
+ }
+
+ public void stop() {
+ if (participantManager != null) {
+ participantManager.disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml b/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml
new file mode 100644
index 0000000..b312877
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/main/resources/lock-manager-config.yaml
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+clusterName: lock-manager-custom-rebalancer # unique name for the cluster
+resources:
+ - name: lock-group # unique resource name
+ rebalancer: # we will provide our own rebalancer
+ mode: USER_DEFINED
+ class: org.apache.helix.userdefinedrebalancer.LockManagerRebalancer
+ partitions:
+ count: 12 # number of locks
+ replicas: 1 # number of simultaneous holders for each lock
+ stateModel:
+ name: lock-unlock # unique model name
+ states: [LOCKED, RELEASED, DROPPED] # the list of possible states
+ transitions: # the list of possible transitions
+ - name: Unlock
+ from: LOCKED
+ to: RELEASED
+ - name: Lock
+ from: RELEASED
+ to: LOCKED
+ - name: DropLock
+ from: LOCKED
+ to: DROPPED
+ - name: DropUnlock
+ from: RELEASED
+ to: DROPPED
+ - name: Undrop
+ from: DROPPED
+ to: RELEASED
+ initialState: RELEASED
+ constraints:
+ state:
+ counts: # maximum number of replicas of a partition that can be in each state
+ - name: LOCKED
+ count: "1"
+ - name: RELEASED
+ count: "-1"
+ - name: DROPPED
+ count: "-1"
+ priorityList: [LOCKED, RELEASED, DROPPED] # states in order of priority
+ transition: # transitions priority to enforce order that transitions occur
+ priorityList: [Unlock, Lock, Undrop, DropUnlock, DropLock]
+participants: # list of nodes that can acquire locks
+ - name: localhost_12001
+ host: localhost
+ port: 12001
+ - name: localhost_12002
+ host: localhost
+ port: 12002
+ - name: localhost_12003
+ host: localhost
+ port: 12003
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/19c68417/recipes/user-defined-rebalancer/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/recipes/user-defined-rebalancer/src/test/conf/testng.xml b/recipes/user-defined-rebalancer/src/test/conf/testng.xml
new file mode 100644
index 0000000..58f0803
--- /dev/null
+++ b/recipes/user-defined-rebalancer/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+ <test name="Test" preserve-order="false">
+ <packages>
+ <package name="org.apache.helix"/>
+ </packages>
+ </test>
+</suite>