You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[23/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java
new file mode 100644
index 0000000..a7d116c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForStorageNode.java
@@ -0,0 +1,788 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+/**
+ * IdealStateCalculatorForStorageNode tries to optimally allocate master/slave partitions among
+ * espresso storage nodes.
+ *
+ * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a initial state
+ * When new batches of storage nodes are added, the algorithm will calculate the new ideal state such that the total
+ * partition movements are minimized.
+ *
+ */
+public class IdealStateCalculatorForStorageNode
+{
+ static final String _MasterAssignmentMap = "MasterAssignmentMap";
+ static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
+ static final String _partitions = "partitions";
+ static final String _replicas = "replicas";
+
+ /**
+ * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+ * number of partitions
+ *
+ * 1. Calculate the master assignment by random shuffling
+ * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+ * 3. for each storage instance, calculate the i-th slave assignment map
+ * 4. Combine the i-th slave assignment maps together
+ *
+ * @param instanceNames
+ * list of storage node instances
+ * @param partitions
+ * number of partitions
+ * @param replicas
+ * The number of replicas (slave partitions) per master partition
+ * @param masterStateValue
+ * master state value: e.g. "MASTER" or "LEADER"
+ * @param slaveStateValue
+ * slave state value: e.g. "SLAVE" or "STANDBY"
+ * @param resourceName
+ * @return a ZNRecord that contain the idealstate info
+ */
+ public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
+ String masterStateValue, String slaveStateValue)
+ {
+ Collections.sort(instanceNames);
+ if(instanceNames.size() < replicas + 1)
+ {
+ throw new HelixException("Number of instances must not be less than replicas + 1. "
+ + "instanceNr:" + instanceNames.size()
+ + ", replicas:" + replicas);
+ }
+ else if(partitions < instanceNames.size())
+ {
+ ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
+ int i = 0;
+ for(String partitionId : idealState.getMapFields().keySet())
+ {
+ Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+ List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+ String masterInstance = "";
+ for(String instanceName : partitionAssignmentMap.keySet())
+ {
+ if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+ && masterInstance.equals(""))
+ {
+ masterInstance = instanceName;
+ }
+ else
+ {
+ partitionAssignmentPriorityList.add(instanceName);
+ }
+ }
+ Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
+ partitionAssignmentPriorityList.add(0, masterInstance);
+ idealState.setListField(partitionId, partitionAssignmentPriorityList);
+ }
+ return idealState;
+ }
+
+ Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
+
+ return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+ }
+
+ public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
+ String masterStateValue, String slaveStateValue)
+ {
+ Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
+
+ for(int i = 1; i < instanceBatches.size(); i++)
+ {
+ result = calculateNextIdealState(instanceBatches.get(i), result);
+ }
+
+ return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
+ }
+
+ /**
+ * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
+ */
+ public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
+ String masterStateValue, String slaveStateValue)
+ {
+ Map<String, List<Integer>> nodeMasterAssignmentMap
+ = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
+ Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+ = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
+
+ int partitions = (Integer)(result.get("partitions"));
+
+ ZNRecord idealState = new ZNRecord(resourceName);
+ idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+
+
+ for(String instanceName : nodeMasterAssignmentMap.keySet())
+ {
+ for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
+ {
+ String partitionName = resourceName+"_"+partitionId;
+ if(!idealState.getMapFields().containsKey(partitionName))
+ {
+ idealState.setMapField(partitionName, new TreeMap<String, String>());
+ }
+ idealState.getMapField(partitionName).put(instanceName, masterStateValue);
+ }
+ }
+
+ for(String instanceName : nodeSlaveAssignmentMap.keySet())
+ {
+ Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+
+ for(String slaveNode: slaveAssignmentMap.keySet())
+ {
+ List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
+ for(Integer partitionId: slaveAssignment)
+ {
+ String partitionName = resourceName+"_"+partitionId;
+ idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
+ }
+ }
+ }
+ // generate the priority list of instances per partition. Master should be at front and slave follows.
+
+ for(String partitionId : idealState.getMapFields().keySet())
+ {
+ Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
+ List<String> partitionAssignmentPriorityList = new ArrayList<String>();
+ String masterInstance = "";
+ for(String instanceName : partitionAssignmentMap.keySet())
+ {
+ if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
+ && masterInstance.equals(""))
+ {
+ masterInstance = instanceName;
+ }
+ else
+ {
+ partitionAssignmentPriorityList.add(instanceName);
+ }
+ }
+ Collections.shuffle(partitionAssignmentPriorityList);
+ partitionAssignmentPriorityList.add(0, masterInstance);
+ idealState.setListField(partitionId, partitionAssignmentPriorityList);
+ }
+ assert(result.containsKey("replicas"));
+ idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
+ return idealState;
+ }
+ /**
+ * Calculate the initial ideal state given a batch of storage instances, the replication factor and
+ * number of partitions
+ *
+ * 1. Calculate the master assignment by random shuffling
+ * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
+ * 3. for each storage instance, calculate the i-th slave assignment map
+ * 4. Combine the i-th slave assignment maps together
+ *
+ * @param instanceNames
+ * list of storage node instances
+ * @param weight
+ * weight for the initial storage node (each node has the same weight)
+ * @param partitions
+ * number of partitions
+ * @param replicas
+ * The number of replicas (slave partitions) per master partition
+ * @return a map that contain the idealstate info
+ */
+ public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
+ {
+ Random r = new Random(54321);
+ assert(replicas <= instanceNames.size() - 1);
+
+ ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
+ for(int i = 0;i< partitions; i++)
+ {
+ masterPartitionAssignment.add(i);
+ }
+ // shuffle the partition id array
+ Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
+
+ // 1. Generate the random master partition assignment
+ // instanceName -> List of master partitions on that instance
+ Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
+ for(int i = 0; i < masterPartitionAssignment.size(); i++)
+ {
+ String instanceName = instanceNames.get(i % instanceNames.size());
+ if(!nodeMasterAssignmentMap.containsKey(instanceName))
+ {
+ nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
+ }
+ nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
+ }
+
+ // instanceName -> slave assignment for its master partitions
+ // slave assignment: instanceName -> list of slave partitions on it
+ List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
+
+ Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+ Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
+
+ if(replicas > 0)
+ {
+ // 2. For each node, calculate the evenly distributed slave as the first slave assignment
+ // We will figure out the 2nd ...replicas-th slave assignment based on the first level slave assignment
+ for(int i = 0; i < instanceNames.size(); i++)
+ {
+ List<String> slaveInstances = new ArrayList<String>();
+ ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
+ TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
+
+ for(int j = 0;j < instanceNames.size(); j++)
+ {
+ if(j != i)
+ {
+ slaveInstances.add(instanceNames.get(j));
+ slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
+ }
+ }
+ // Get the number of master partitions on instanceName
+ List<Integer> masterAssignment = nodeMasterAssignmentMap.get(instanceNames.get(i));
+ // do a random shuffling as in step 1, so that the first-level slave are distributed among rest instances
+
+
+ for(int j = 0;j < masterAssignment.size(); j++)
+ {
+ slaveAssignment.add(j);
+ }
+ Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
+
+ Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
+
+ // Get the slave assignment map of node instanceName
+ for(int j = 0;j < masterAssignment.size(); j++)
+ {
+ String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
+ if(!slaveAssignmentMap.containsKey(slaveInstanceName))
+ {
+ slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
+ }
+ slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
+ }
+ firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
+ }
+ nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
+ // From the first slave assignment map, calculate the rest slave assignment maps
+ for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
+ {
+ // calculate the next slave partition assignment map
+ Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
+ = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
+ nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
+ }
+
+ // Combine the calculated 1...replicas-th slave assignment map together
+
+ for(String instanceName : nodeMasterAssignmentMap.keySet())
+ {
+ Map<String, List<Integer>> combinedSlaveAssignmentMap = new TreeMap<String, List<Integer>>();
+
+ for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+ {
+ Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+
+ for(String slaveInstance : slaveAssignmentMap.keySet())
+ {
+ if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
+ {
+ combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
+ }
+ combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
+ }
+ }
+ migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
+ combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
+ }
+ }
+ /*
+ // Print the result master and slave assignment maps
+ System.out.println("Master assignment:");
+ for(String instanceName : nodeMasterAssignmentMap.keySet())
+ {
+ System.out.println(instanceName+":");
+ for(Integer x : nodeMasterAssignmentMap.get(instanceName))
+ {
+ System.out.print(x+" ");
+ }
+ System.out.println();
+ System.out.println("Slave assignment:");
+
+ int slaveOrder = 1;
+ for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
+ {
+ System.out.println("Slave assignment order :" + (slaveOrder++));
+ Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
+ for(String slaveName : slaveAssignmentMap.keySet())
+ {
+ System.out.print("\t" + slaveName +":\n\t" );
+ for(Integer x : slaveAssignmentMap.get(slaveName))
+ {
+ System.out.print(x + " ");
+ }
+ System.out.println("\n");
+ }
+ }
+ System.out.println("\nCombined slave assignment map");
+ Map<String, List<Integer>> slaveAssignmentMap = combinedNodeSlaveAssignmentMap.get(instanceName);
+ for(String slaveName : slaveAssignmentMap.keySet())
+ {
+ System.out.print("\t" + slaveName +":\n\t" );
+ for(Integer x : slaveAssignmentMap.get(slaveName))
+ {
+ System.out.print(x + " ");
+ }
+ System.out.println("\n");
+ }
+ }*/
+ Map<String, Object> result = new TreeMap<String, Object>();
+ result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
+ result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
+ result.put("replicas", new Integer(replicas));
+ result.put("partitions", new Integer(partitions));
+ return result;
+ }
+ /**
+ * In the case there are more than 1 slave, we use the following algorithm to calculate the n-th slave
+ * assignment map based on the first level slave assignment map.
+ *
+ * @param firstInstanceSlaveAssignmentMap the first slave assignment map for all instances
+ * @param order of the slave
+ * @return the n-th slave assignment map for all the instances
+ * */
+ static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
+ {
+ Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
+
+ for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+ {
+ Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
+ result.put(currentInstance, resultAssignmentMap);
+ }
+
+ for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
+ {
+ Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
+ Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
+ int offset = replicaOrder - 1;
+ for(String instance : previousSlaveAssignmentMap.keySet())
+ {
+ List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
+ // Obtain an array of other instances
+ for(String otherInstance : previousSlaveAssignmentMap.keySet())
+ {
+ otherInstances.add(otherInstance);
+ }
+ Collections.sort(otherInstances);
+ int instanceIndex = -1;
+ for(int index = 0;index < otherInstances.size(); index++)
+ {
+ if(otherInstances.get(index).equalsIgnoreCase(instance))
+ {
+ instanceIndex = index;
+ }
+ }
+ assert(instanceIndex >= 0);
+ if(instanceIndex == otherInstances.size() - 1)
+ {
+ instanceIndex --;
+ }
+ // Since we need to evenly distribute the slaves on "instance" to other partitions, we
+ // need to remove "instance" from the array.
+ otherInstances.remove(instance);
+
+ // distribute previous slave assignment to other instances.
+ List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
+ for(int i = 0; i < previousAssignmentList.size(); i++)
+ {
+
+ // Evenly distribute the previousAssignmentList to the remaining other instances
+ int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
+ String newInstance = otherInstances.get(newInstanceIndex);
+ if(!resultAssignmentMap.containsKey(newInstance))
+ {
+ resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
+ }
+ resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Given the current idealState, and the list of new Instances needed to be added, calculate the
+ * new Ideal state.
+ *
+ * 1. Calculate how many master partitions should be moved to the new cluster of instances
+ * 2. assign the number of master partitions px to be moved to each previous node
+ * 3. for each previous node,
+ * 3.1 randomly choose px nodes, move them to temp list
+ * 3.2 for each px nodes, remove them from the slave assignment map; record the map position of
+ * the partition;
+ * 3.3 calculate # of new nodes that should be put in the slave assignment map
+ * 3.4 even-fy the slave assignment map;
+ * 3.5 randomly place # of new nodes that should be placed in
+ *
+ * 4. from all the temp master node list get from 3.1,
+ * 4.1 randomly assign them to nodes in the new cluster
+ *
+ * 5. for each node in the new cluster,
+ * 5.1 assemble the slave assignment map
+ * 5.2 even-fy the slave assignment map
+ *
+ * @param newInstances
+ * list of new added storage node instances
+ * @param weight
+ * weight for the new storage nodes (each node has the same weight)
+ * @param previousIdealState
+ * The previous ideal state
+ * @return a map that contain the updated idealstate info
+ * */
+ public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
+ {
+ // Obtain the master / slave assignment info maps
+ Collections.sort(newInstances);
+ Map<String, List<Integer>> previousMasterAssignmentMap
+ = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
+ Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
+ = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
+
+ List<String> oldInstances = new ArrayList<String>();
+ for(String oldInstance : previousMasterAssignmentMap.keySet())
+ {
+ oldInstances.add(oldInstance);
+ }
+
+ int previousInstanceNum = previousMasterAssignmentMap.size();
+ int partitions = (Integer)(previousIdealState.get("partitions"));
+
+ // TODO: take weight into account when calculate this
+
+ int totalMasterParitionsToMove
+ = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
+ int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
+ int remain = totalMasterParitionsToMove % previousInstanceNum;
+
+ // Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1) partitions.
+ // And we should first choose those (numMastersFromEachNode + 1) moves from the instances that has more
+ // master partitions
+ List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
+
+ // For corresponding moved slave partitions, keep track of their original location; the new node does not
+ // need to migrate all of them.
+ Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
+
+ // Make sure that the instances that holds more master partitions are put in front
+ List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
+ for(String oldInstance : previousMasterAssignmentMap.keySet())
+ {
+ List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+ if(masterAssignmentList.size() > numMastersFromEachNode)
+ {
+ bigList.add(oldInstance);
+ }
+ else
+ {
+ smallList.add(oldInstance);
+ }
+ }
+ // "sort" the list, such that the nodes that has more master partitions moves more partitions to the
+ // new added batch of instances.
+ bigList.addAll(smallList);
+ int totalSlaveMoves = 0;
+ for(String oldInstance : bigList)
+ {
+ List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
+ int numToChoose = numMastersFromEachNode;
+ if(remain > 0)
+ {
+ numToChoose = numMastersFromEachNode + 1;
+ remain --;
+ }
+ // randomly remove numToChoose of master partitions to the new added nodes
+ ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
+ randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
+
+ masterPartitionListToMove.addAll(masterPartionsMoved);
+ Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
+ removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
+
+ // Make sure that for old instances, the slave placement map is evenly distributed
+ // Trace the "local slave moves", which should together contribute to most of the slave migrations
+ int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
+ // System.out.println("local moves: "+ movesWithinInstance);
+ totalSlaveMoves += movesWithinInstance;
+ }
+ // System.out.println("local slave moves total: "+ totalSlaveMoves);
+ // calculate the master /slave assignment for the new added nodes
+
+ // We already have the list of master partitions that will migrate to new batch of instances,
+ // shuffle the partitions and assign them to new instances
+ Collections.shuffle(masterPartitionListToMove, new Random(12345));
+ for(int i = 0;i < newInstances.size(); i++)
+ {
+ String newInstance = newInstances.get(i);
+ List<Integer> masterPartitionList = new ArrayList<Integer>();
+ for(int j = 0;j < masterPartitionListToMove.size(); j++)
+ {
+ if(j % newInstances.size() == i)
+ {
+ masterPartitionList.add(masterPartitionListToMove.get(j));
+ }
+ }
+
+ Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
+ for(String oldInstance : oldInstances)
+ {
+ slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
+ }
+ // Build the slave assignment map for the new instance, based on the saved information
+ // about those slave partition locations in slavePartitionsToMoveMap
+ for(Integer x : masterPartitionList)
+ {
+ for(String oldInstance : slavePartitionsToMoveMap.keySet())
+ {
+ List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
+ if(slaves.contains(x))
+ {
+ slavePartitionMap.get(oldInstance).add(x);
+ }
+ }
+ }
+ // add entry for other new instances into the slavePartitionMap
+ List<String> otherNewInstances = new ArrayList<String>();
+ for(String instance : newInstances)
+ {
+ if(!instance.equalsIgnoreCase(newInstance))
+ {
+ otherNewInstances.add(instance);
+ }
+ }
+ // Make sure that slave partitions are evenly distributed
+ migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
+
+ // Update the result in the result map. We can reuse the input previousIdealState map as
+ // the result.
+ previousMasterAssignmentMap.put(newInstance, masterPartitionList);
+ nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
+
+ }
+ /*
+ // Print content of the master/ slave assignment maps
+ for(String instanceName : previousMasterAssignmentMap.keySet())
+ {
+ System.out.println(instanceName+":");
+ for(Integer x : previousMasterAssignmentMap.get(instanceName))
+ {
+ System.out.print(x+" ");
+ }
+ System.out.println("\nmaster partition moved:");
+
+ System.out.println();
+ System.out.println("Slave assignment:");
+
+ Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
+ for(String slaveName : slaveAssignmentMap.keySet())
+ {
+ System.out.print("\t" + slaveName +":\n\t" );
+ for(Integer x : slaveAssignmentMap.get(slaveName))
+ {
+ System.out.print(x + " ");
+ }
+ System.out.println("\n");
+ }
+ }
+
+ System.out.println("Master partitions migrated to new instances");
+ for(Integer x : masterPartitionListToMove)
+ {
+ System.out.print(x+" ");
+ }
+ System.out.println();
+
+ System.out.println("Slave partitions migrated to new instances");
+ for(String oldInstance : slavePartitionsToMoveMap.keySet())
+ {
+ System.out.print(oldInstance + ": ");
+ for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
+ {
+ System.out.print(x+" ");
+ }
+ System.out.println();
+ }
+ */
+ return previousIdealState;
+ }
+
+ public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
+ String resourceName, String masterStateValue, String slaveStateValue)
+ {
+ return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
+ }
+ /**
+ * Given the list of master partition that will be migrated away from the storage instance,
+ * Remove their entries from the local instance slave assignment map.
+ *
+ * @param slaveAssignmentMap the local instance slave assignment map
+ * @param masterPartionsMoved the list of master partition ids that will be migrated away
+ * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+ * used by new added storage nodes.
+ * */
+ static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
+ {
+ for(String instanceName : slaveAssignmentMap.keySet())
+ {
+ List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
+ for(Integer partitionId: masterPartionsMoved)
+ {
+ if(slaveAssignment.contains(partitionId))
+ {
+ slaveAssignment.remove(partitionId);
+ if(!removedAssignmentMap.containsKey(instanceName))
+ {
+ removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
+ }
+ removedAssignmentMap.get(instanceName).add(partitionId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Since some new storage instances are added, each existing storage instance should migrate some
+ * slave partitions to the new added instances.
+ *
+ * The algorithm keeps moving one partition to from the instance that hosts most slave partitions
+ * to the instance that hosts least number of partitions, until max-min <= 1.
+ *
+ * In this way we can guarantee that all instances hosts almost same number of slave partitions, also
+ * slave partitions are evenly distributed.
+ *
+ * @param slaveAssignmentMap the local instance slave assignment map
+ * @param masterPartionsMoved the list of master partition ids that will be migrated away
+ * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
+ * used by new added storage nodes.
+ * */
+ static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
+ {
+ int moves = 0;
+ boolean done = false;
+ for(String newInstance : newInstances)
+ {
+ nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
+ }
+ while(!done)
+ {
+ List<Integer> maxAssignment = null, minAssignment = null;
+ int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
+ String minInstance = "";
+ for(String instanceName : nodeSlaveAssignmentMap.keySet())
+ {
+ List<Integer> slaveAssignment = nodeSlaveAssignmentMap.get(instanceName);
+ if(minCount > slaveAssignment.size())
+ {
+ minCount = slaveAssignment.size();
+ minAssignment = slaveAssignment;
+ minInstance = instanceName;
+ }
+ if(maxCount < slaveAssignment.size())
+ {
+ maxCount = slaveAssignment.size();
+ maxAssignment = slaveAssignment;
+ }
+ }
+ if(maxCount - minCount <= 1 )
+ {
+ done = true;
+ }
+ else
+ {
+ int indexToMove = -1;
+ // find a partition that is not contained in the minAssignment list
+ for(int i = 0; i < maxAssignment.size(); i++ )
+ {
+ if(!minAssignment.contains(maxAssignment.get(i)))
+ {
+ indexToMove = i;
+ break;
+ }
+ }
+
+ minAssignment.add(maxAssignment.get(indexToMove));
+ maxAssignment.remove(indexToMove);
+
+ if(newInstances.contains(minInstance))
+ {
+ moves++;
+ }
+ }
+ }
+ return moves;
+ }
+
+ /**
+ * Randomly select a number of elements from original list and put them in the selectedList
+ * The algorithm is used to select master partitions to be migrated when new instances are added.
+ *
+ *
+ * @param originalList the original list
+ * @param selectedList the list that contain selected elements
+ * @param num number of elements to be selected
+ * */
+ static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num)
+ {
+ assert(originalList.size() >= num);
+ int[] indexArray = new int[originalList.size()];
+ for(int i = 0;i < indexArray.length; i++)
+ {
+ indexArray[i] = i;
+ }
+ int numRemains = originalList.size();
+ Random r = new Random(numRemains);
+ for(int j = 0;j < num; j++)
+ {
+ int randIndex = r.nextInt(numRemains);
+ selectedList.add(originalList.get(randIndex));
+ originalList.remove(randIndex);
+ numRemains --;
+ }
+ }
+
+ public static void main(String args[])
+ {
+ List<String> instanceNames = new ArrayList<String>();
+ for(int i = 0;i < 10; i++)
+ {
+ instanceNames.add("localhost:123" + i);
+ }
+ int partitions = 48*3, replicas = 3;
+ Map<String, Object> resultOriginal = IdealStateCalculatorForStorageNode.calculateInitialIdealState(instanceNames, partitions, replicas);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java b/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java
new file mode 100644
index 0000000..b022d78
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/JmxDumper.java
@@ -0,0 +1,471 @@
+package org.apache.helix.tools;
+
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerDelegate;
+import javax.management.MBeanServerNotification;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.relation.MBeanServerNotificationFilter;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.log4j.Logger;
+
+public class JmxDumper implements NotificationListener
+{
+ public static final String help = "help";
+ public static final String domain = "domain";
+ public static final String fields = "fields";
+ public static final String pattern = "pattern";
+ public static final String operations = "operations";
+ public static final String period = "period";
+ public static final String className = "className";
+ public static final String outputFile = "outputFile";
+ public static final String jmxUrl = "jmxUrl";
+ public static final String sampleCount = "sampleCount";
+
+ private static final Logger _logger = Logger.getLogger(JmxDumper.class);
+ String _domain;
+ MBeanServerConnection _mbeanServer;
+
+ String _beanClassName;
+ String _namePattern;
+ int _samplePeriod;
+
+ Map<ObjectName,ObjectName> _mbeanNames = new ConcurrentHashMap<ObjectName,ObjectName>();
+ Timer _timer;
+
+ String _outputFileName;
+
+ List<String> _outputFields = new ArrayList<String>();
+ Set<String> _operations = new HashSet<String>();
+ PrintWriter _outputFile;
+ int _samples = 0;
+ int _targetSamples = -1;
+ String _jmxUrl;
+
+ public JmxDumper(String jmxService,
+ String domain,
+ String beanClassName,
+ String namePattern,
+ int samplePeriod,
+ List<String> fields,
+ List<String> operations,
+ String outputfile,
+ int sampleCount
+ ) throws Exception
+ {
+ _jmxUrl = jmxService;
+ _domain = domain;
+ _beanClassName = beanClassName;
+ _samplePeriod = samplePeriod;
+ _outputFields.addAll(fields);
+ _operations.addAll(operations);
+ _outputFileName = outputfile;
+ _namePattern = namePattern;
+ _targetSamples = sampleCount;
+
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + _jmxUrl + "/jmxrmi");
+ JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+
+ _mbeanServer = jmxc.getMBeanServerConnection();
+ MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
+ filter.enableAllObjectNames();
+ _mbeanServer.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this, filter, null);
+ init();
+ _timer = new Timer(true);
+ _timer.scheduleAtFixedRate(new SampleTask(), _samplePeriod, _samplePeriod);
+ }
+
+ class SampleTask extends TimerTask
+ {
+ @Override
+ public void run()
+ {
+ List<ObjectName> errorMBeans = new ArrayList<ObjectName>();
+ _logger.info("Sampling " + _mbeanNames.size() + " beans");
+ for(ObjectName beanName : _mbeanNames.keySet())
+ {
+ MBeanInfo info;
+ try
+ {
+ info = _mbeanServer.getMBeanInfo(beanName);
+ }
+ catch (Exception e)
+ {
+ _logger.error( e.getMessage()+" removing it");
+ errorMBeans.add(beanName);
+ continue;
+ }
+ if(!info.getClassName().equals(_beanClassName))
+ {
+ _logger.warn("Skip: className "+info.getClassName() + " expected : "+ _beanClassName);
+ continue;
+ }
+ StringBuffer line = new StringBuffer();
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
+ String date = dateFormat.format(new Date());
+ line.append(date + " ");
+ line.append(beanName.toString() + " ");
+
+ MBeanAttributeInfo[] infos = info.getAttributes();
+ Map<String, MBeanAttributeInfo> infoMap = new HashMap<String, MBeanAttributeInfo>();
+ for(MBeanAttributeInfo infoItem : infos)
+ {
+ infoMap.put(infoItem.getName(), infoItem);
+ }
+
+ for(String outputField : _outputFields)
+ {
+ try
+ {
+ if(infoMap.containsKey(outputField))
+ {
+ Object mbeanAttributeValue = _mbeanServer.getAttribute(beanName, outputField);
+ line.append(mbeanAttributeValue.toString() + " ");
+ }
+ else
+ {
+ _logger.warn(outputField + " not found");
+ line.append("null ");
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error:", e);
+ line.append("null ");
+ continue;
+ }
+ }
+ MBeanOperationInfo[] operations = info.getOperations();
+ Map<String, MBeanOperationInfo> opeMap = new HashMap<String, MBeanOperationInfo>();
+ for(MBeanOperationInfo opeItem : operations)
+ {
+ opeMap.put(opeItem.getName(), opeItem);
+ }
+
+ for(String ope : _operations)
+ {
+ if(opeMap.containsKey(ope))
+ {
+ try
+ {
+ _mbeanServer.invoke(beanName, ope, new Object[0], new String[0]);
+ //System.out.println(ope+" invoked");
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error:", e);
+ continue;
+ }
+ }
+ }
+ _outputFile.println(line.toString());
+ //System.out.println(line);
+ }
+ for(ObjectName deadBean : errorMBeans)
+ {
+ _mbeanNames.remove(deadBean);
+ }
+
+ _samples ++;
+ //System.out.println("samples:"+_samples);
+ if(_samples == _targetSamples)
+ {
+ synchronized(JmxDumper.this)
+ {
+ _logger.info(_samples + " samples done, exiting...");
+ JmxDumper.this.notifyAll();
+ }
+ }
+ }
+ }
+
+ void init() throws Exception
+ {
+ try
+ {
+ Set<ObjectInstance> existingInstances = _mbeanServer.queryMBeans(new ObjectName(_namePattern), null);
+ _logger.info("Total " + existingInstances.size() + " mbeans matched " + _namePattern);
+ for(ObjectInstance instance : existingInstances)
+ {
+ if(instance.getClassName().equals(_beanClassName))
+ {
+ _mbeanNames.put(instance.getObjectName(), instance.getObjectName());
+ _logger.info("Sampling " + instance.getObjectName());
+ }
+ }
+ FileWriter fos = new FileWriter(_outputFileName);
+ System.out.println(_outputFileName);
+ _outputFile = new PrintWriter(fos);
+ }
+ catch (Exception e)
+ {
+ _logger.error("fail to get all existing mbeans in " + _domain, e);
+ throw e;
+ }
+ }
+ @Override
+ public void handleNotification(Notification notification, Object handback)
+ {
+ MBeanServerNotification mbs = (MBeanServerNotification) notification;
+ if(MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(mbs.getType()))
+ {
+ //System.out.println("Adding mbean " + mbs.getMBeanName());
+ _logger.info("Adding mbean " + mbs.getMBeanName());
+ if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+ {
+ addMBean( mbs.getMBeanName());
+ }
+ }
+ else if(MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(mbs.getType()))
+ {
+ //System.out.println("Removing mbean " + mbs.getMBeanName());
+ _logger.info("Removing mbean " + mbs.getMBeanName());
+ if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
+ {
+ removeMBean(mbs.getMBeanName());
+ }
+ }
+ }
+
+ private void addMBean(ObjectName beanName)
+ {
+ _mbeanNames.put(beanName, beanName);
+ }
+
+ private void removeMBean(ObjectName beanName)
+ {
+ _mbeanNames.remove(beanName);
+ }
+
+ public static int processCommandLineArgs(String[] cliArgs) throws Exception
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try
+ {
+ cmd = cliParser.parse(cliOptions, cliArgs);
+ }
+ catch (ParseException pe)
+ {
+ System.err.println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ boolean ret = checkOptionArgsNumber(cmd.getOptions());
+ if (ret == false)
+ {
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+
+ String portStr = cmd.getOptionValue(jmxUrl);
+ //int portVal = Integer.parseInt(portStr);
+
+ String periodStr = cmd.getOptionValue(period);
+ int periodVal = Integer.parseInt(periodStr);
+
+ String domainStr = cmd.getOptionValue(domain);
+ String classNameStr = cmd.getOptionValue(className);
+ String patternStr = cmd.getOptionValue(pattern);
+ String fieldsStr = cmd.getOptionValue(fields);
+ String operationsStr = cmd.getOptionValue(operations);
+ String resultFile = cmd.getOptionValue(outputFile);
+ String sampleCountStr = cmd.getOptionValue(sampleCount, "-1");
+ int sampleCount = Integer.parseInt(sampleCountStr);
+
+ List<String> fields = Arrays.asList(fieldsStr.split(","));
+ List<String> operations = Arrays.asList(operationsStr.split(","));
+
+ JmxDumper dumper = null;
+ try
+ {
+ dumper = new JmxDumper(portStr, domainStr, classNameStr, patternStr, periodVal, fields, operations, resultFile, sampleCount);
+ synchronized(dumper)
+ {
+ dumper.wait();
+ }
+ }
+ finally
+ {
+ if(dumper != null)
+ {
+ dumper.flushFile();
+ }
+ }
+ return 0;
+ }
+
+ private void flushFile()
+ {
+ if(_outputFile != null)
+ {
+ _outputFile.flush();
+ _outputFile.close();
+ }
+ }
+ private static boolean checkOptionArgsNumber(Option[] options)
+ {
+ for (Option option : options)
+ {
+ int argNb = option.getArgs();
+ String[] args = option.getValues();
+ if (argNb == 0)
+ {
+ if (args != null && args.length > 0)
+ {
+ System.err.println(option.getArgName() + " shall have " + argNb
+ + " arguments (was " + Arrays.toString(args) + ")");
+ return false;
+ }
+ }
+ else
+ {
+ if (args == null || args.length != argNb)
+ {
+ System.err.println(option.getArgName() + " shall have " + argNb
+ + " arguments (was " + Arrays.toString(args) + ")");
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption =
+ OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info")
+ .create();
+ Option domainOption =
+ OptionBuilder.withLongOpt(domain)
+ .withDescription("Domain of the JMX bean")
+ .create();
+
+ domainOption.setArgs(1);
+ domainOption.setRequired(true);
+
+ Option fieldsOption =
+ OptionBuilder.withLongOpt(fields)
+ .withDescription("Fields of the JMX bean to sample")
+ .create();
+ fieldsOption.setArgs(1);
+ fieldsOption.setRequired(false);
+
+ Option operationOption =
+ OptionBuilder.withLongOpt(operations)
+ .withDescription("Operation to invoke")
+ .create();
+ operationOption.setArgs(1);
+ operationOption.setRequired(true);
+
+ Option periodOption =
+ OptionBuilder.withLongOpt(period)
+ .withDescription("Sampling period in MS")
+ .create();
+ periodOption.setArgs(1);
+ periodOption.setRequired(false);
+
+ Option classOption =
+ OptionBuilder.withLongOpt(className)
+ .withDescription("Classname of the MBean")
+ .create();
+ classOption.setArgs(1);
+ classOption.setRequired(true);
+
+ Option patternOption =
+ OptionBuilder.withLongOpt(pattern)
+ .withDescription("pattern of the MBean")
+ .create();
+ patternOption.setArgs(1);
+ patternOption.setRequired(true);
+
+ Option outputFileOption =
+ OptionBuilder.withLongOpt(outputFile)
+ .withDescription("outputFileName")
+ .create();
+ outputFileOption.setArgs(1);
+ outputFileOption.setRequired(false);
+
+ Option jmxUrlOption =
+ OptionBuilder.withLongOpt(jmxUrl)
+ .withDescription("jmx port to connect to")
+ .create();
+ jmxUrlOption.setArgs(1);
+ jmxUrlOption.setRequired(true);
+
+ Option sampleCountOption =
+ OptionBuilder.withLongOpt(sampleCount)
+ .withDescription("# of samples to take")
+ .create();
+ sampleCountOption.setArgs(1);
+ sampleCountOption.setRequired(false);
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(domainOption);
+ options.addOption(fieldsOption);
+ options.addOption(operationOption);
+ options.addOption(classOption);
+ options.addOption(outputFileOption);
+ options.addOption(jmxUrlOption);
+ options.addOption(patternOption);
+ options.addOption(periodOption);
+ options.addOption(sampleCountOption);
+ return options;
+ }
+
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + JmxDumper.class.getName(), cliOptions);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ /* List<String> fields = Arrays.asList(new String("AvgLatency,MaxLatency,MinLatency,PacketsReceived,PacketsSent").split(","));
+ List<String> operations = Arrays.asList(new String("resetCounters").split(","));
+
+ JmxDumper dumper = new JmxDumper(27961, "org.apache.zooKeeperService", "org.apache.zookeeper.server.ConnectionBean", "org.apache.ZooKeeperService:name0=*,name1=Connections,name2=*,name3=*", 1000, fields, operations, "/tmp/1.csv");
+ Thread.currentThread().join();
+ */
+ int ret = processCommandLineArgs(args);
+ System.exit(ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java b/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java
new file mode 100644
index 0000000..1693fbf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/LocalZKServer.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+
+/**
+ * Provides ability to start zookeeper locally on a particular port
+ *
+ * @author kgopalak
+ *
+ */
+public class LocalZKServer
+{
+ public void start(int port, String dataDir, String logDir) throws Exception
+ {
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient)
+ {
+
+ }
+ };
+ ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ server.start();
+ Thread.currentThread().join();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ int port = 2199;
+ String rootDir = System.getProperty("java.io.tmpdir") + "/zk-helix/"
+ + System.currentTimeMillis();
+ String dataDir = rootDir + "/dataDir";
+ String logDir = rootDir + "/logDir";
+
+ if (args.length > 0)
+ {
+ port = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1)
+ {
+ dataDir = args[1];
+ logDir = args[1];
+ }
+
+ if (args.length > 2)
+ {
+ logDir = args[2];
+ }
+ System.out.println("Starting Zookeeper locally at port:" + port
+ + " dataDir:" + dataDir + " logDir:" + logDir);
+ LocalZKServer localZKServer = new LocalZKServer();
+
+ localZKServer.start(port, dataDir, logDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
new file mode 100644
index 0000000..9e1bf09
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
@@ -0,0 +1,120 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.UUID;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.util.HelixUtil;
+
+
+public class MessagePoster
+{
+ public void post(String zkServer,
+ Message message,
+ String clusterName,
+ String instanceName)
+ {
+ ZkClient client = new ZkClient(zkServer);
+ client.setZkSerializer(new ZNRecordSerializer());
+ String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/" + message.getId();
+ client.delete(path);
+ ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName, instanceName));
+ message.setTgtSessionId(record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString()).toString());
+ message.setTgtName(record.getId());
+ //System.out.println(message);
+ client.createPersistent(path, message.getRecord());
+ }
+
+ public void postFaultInjectionMessage(String zkServer,
+ String clusterName,
+ String instanceName,
+ String payloadString,
+ String partition)
+ {
+ Message message = new Message("FaultInjection", UUID.randomUUID().toString());
+ if(payloadString != null)
+ {
+ message.getRecord().setSimpleField("faultType", payloadString);
+ }
+ if(partition != null)
+ {
+ message.setPartitionName(partition);
+ }
+
+ post(zkServer, message, clusterName, instanceName);
+ }
+
+ public void postTestMessage(String zkServer, String clusterName, String instanceName)
+ {
+ String msgSrc = "cm-instance-0";
+ String msgId = "TestMessageId-2";
+
+ Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+ message.setMsgId(msgId);
+ message.setSrcName(msgSrc);
+ message.setTgtName(instanceName);
+ message.setMsgState(MessageState.NEW);
+ message.setFromState("Slave");
+ message.setToState("Master");
+ message.setPartitionName("EspressoDB.partition-0." + instanceName);
+
+ post(zkServer, message, clusterName, instanceName);
+ }
+
+ public static void main(String[] args)
+ {
+ if (args.length < 4 || args.length > 6)
+ {
+ System.err.println("Usage: java " + MessagePoster.class.getName()
+ + " zkServer cluster instance msgType [payloadString] [partition]");
+ System.err.println("msgType can be one of test, fault");
+ System.err.println("payloadString is sent along with the fault msgType");
+ System.exit(1);
+ }
+ String zkServer = args[0];
+ String cluster = args[1];
+ String instance = args[2];
+ String msgType = args[3];
+ String payloadString = (args.length >= 5 ? args[4] : null);
+ String partition = (args.length == 6 ? args[5] : null);
+
+ MessagePoster messagePoster = new MessagePoster();
+ if (msgType.equals("test"))
+ {
+ messagePoster.postTestMessage(zkServer, cluster, instance);
+ }
+ else if (msgType.equals("fault"))
+ {
+ messagePoster.postFaultInjectionMessage(zkServer,
+ cluster,
+ instance,
+ payloadString,
+ partition);
+ System.out.println("Posted " + msgType);
+ }
+ else
+ {
+ System.err.println("Message was not posted. Unknown msgType:" + msgType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java b/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java
new file mode 100644
index 0000000..a5d5472
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/PropertiesReader.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class PropertiesReader
+{
+ private static final Logger LOG = Logger
+ .getLogger(PropertiesReader.class.getName());
+
+ private final Properties _properties = new Properties();
+
+ public PropertiesReader(String propertyFileName)
+ {
+ try
+ {
+ InputStream stream = Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream(propertyFileName);
+ _properties.load(stream);
+ }
+ catch (Exception e)
+ {
+ String errMsg = "could not open properties file:" + propertyFileName;
+ // LOG.error(errMsg, e);
+ throw new IllegalArgumentException(errMsg, e);
+ }
+ }
+
+ public String getProperty(String key)
+ {
+ String value = _properties.getProperty(key);
+ if (value == null)
+ {
+ throw new IllegalArgumentException("no property exist for key:" + key);
+ }
+
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java b/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java
new file mode 100644
index 0000000..202763e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/RUSHrHash.java
@@ -0,0 +1,352 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.*;
+import java.util.ArrayList;
+import java.util.zip.CRC32;
+
+public class RUSHrHash
+{
+ /**
+ * @var int holds the value for how many replicas to create for an object
+ */
+ protected int replicationDegree = 1;
+
+ /**
+ * an array of hash maps where each hash map holds info on the sub cluster
+ * that corresponds to the array indices meaning that array element 0 holds
+ * data for server 0
+ *
+ * that is the total number of nodes in the cluster this property is populated
+ * at construction time only
+ *
+ * @var
+ */
+
+ protected HashMap[] clusters;
+
+ /**
+ * an array of hash maps where each element holds data for a sub cluster
+ */
+ protected HashMap[] clusterConfig;
+
+ /**
+ * total number of sub-clusters in our data configuration this property is
+ * populated at construction time only
+ *
+ * @var integer
+ */
+ protected int totalClusters = 0;
+
+ /**
+ * the total number of nodes in all of the subClusters this property is
+ * populated at construction time only
+ *
+ * @var integer
+ */
+ protected int totalNodes = 0;
+
+ /**
+ * the total number of nodes in all of the clusters this property is populated
+ * at construction time only
+ *
+ * @var integer
+ */
+ protected int totalNodesW = 0;
+
+ /**
+ * an array of HashMaps where each HashMap holds the data for a single node
+ */
+ protected HashMap[] nodes = null;
+
+ /**
+ * @var integer value used to help seed the random number generator
+ */
+ protected final int SEED_PARAM = 1560;
+
+ /**
+ * random number generator
+ */
+
+ Random ran = new Random();
+
+ /**
+ * maximum value we can have from the ran generator
+ */
+ float ranMax = (float) Math.pow(2.0, 16.0);
+
+ /**
+ * The constructor analyzes the passed config to obtain the fundamental values
+ * and data structures for locating a node. Each of those values is described
+ * in detail above with each property. briefly:
+ *
+ * this.clusters this.totalClusters this.totalNodes
+ *
+ * The values above are derived from the HashMap[] oonfig passed to the
+ * locator.
+ *
+ * @param conf
+ * dataConfig
+ *
+ * @throws Exception
+ *
+ */
+
+ public RUSHrHash(HashMap<String, Object> conf) throws Exception
+ {
+
+ clusterConfig = (HashMap[]) conf.get("subClusters");
+ replicationDegree = (Integer) conf.get("replicationDegree");
+
+ HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
+ totalClusters = subClusters.length;
+ clusters = new HashMap[totalClusters];
+ // check the confg for all of the params
+ // throw a exception if they are not there
+ if (totalClusters <= 0)
+ {
+ throw new Exception(
+ "data config to the RUSHr locator does not contain a valid clusters property");
+ }
+
+ int nodeCt = 0;
+ HashMap[] nodeData = null;
+ ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
+ HashMap subCluster = null, clusterData = null;
+ Integer clusterDataList[] = null;
+ for (int i = 0; i < totalClusters; i++)
+ {
+ subCluster = subClusters[i];
+ nodeData = (HashMap[]) subCluster.get("nodes");
+
+ nodeCt = nodeData.length;
+ clusterDataList = new Integer[nodeCt];
+ for (int n = 0; n < nodeCt; n++)
+ {
+ tempNodes.add(nodeData[n]);
+ clusterDataList[n] = n;
+ }
+ totalNodes += nodeCt;
+ totalNodesW += nodeCt * (Integer) subCluster.get("weight");
+
+ clusterData = new HashMap<String, Object>();
+ clusterData.put("count", nodeCt);
+ clusterData.put("list", clusterDataList);
+ clusters[i] = clusterData;
+ }
+ nodes = new HashMap[totalNodes];
+ tempNodes.toArray(nodes);
+ }
+
+ /**
+ * This function is an implementation of a RUSHr algorithm as described by R J
+ * Honicky and Ethan Miller
+ *
+ * @param objKey
+ * @throws Exception
+ * @return
+ */
+ public ArrayList<HashMap> findNode(long objKey) throws Exception
+ {
+
+ HashMap[] c = this.clusters;
+ int sumRemainingNodes = this.totalNodes;
+ int sumRemainingNodesW = this.totalNodesW;
+ int repDeg = this.replicationDegree;
+ int totClu = this.totalClusters;
+ int totNod = this.totalNodes;
+ HashMap[] clusConfig = this.clusterConfig;
+
+ // throw an exception if the data is no good
+ if ((totNod <= 0) || (totClu <= 0))
+ {
+ throw new Exception(
+ "the total nodes or total clusters is negative or 0. bad joo joos!");
+ }
+
+ // get the starting cluster
+ int currentCluster = totClu - 1;
+
+ /**
+ * this loop is an implementation of the RUSHr algorithm for fast placement
+ * and location of objects in a distributed storage system
+ *
+ * j = current cluster m = disks in current cluster n = remaining nodes
+ */
+ ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
+ while (true)
+ {
+
+ // prevent an infinite loop, in case there is a bug
+ if (currentCluster < 0)
+ {
+ throw new Exception(
+ "the cluster index became negative while we were looking for the following id: objKey. This should never happen with any key. There is a bug or maybe your joo joos are BAD!");
+ }
+
+ HashMap clusterData = clusConfig[currentCluster];
+ Integer weight = (Integer) clusterData.get("weight");
+
+ Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
+ sumRemainingNodes -= disksInCurrentCluster;
+
+ Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
+ sumRemainingNodesW -= disksInCurrentClusterW;
+
+ // set the seed to our set id
+ long seed = objKey + currentCluster;
+ ran.setSeed(seed);
+ int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes)
+ : 0;
+
+ int u = t
+ + drawWHG(repDeg - t, disksInCurrentClusterW - t,
+ disksInCurrentClusterW + sumRemainingNodesW - t, weight);
+ if (u > 0)
+ {
+ if (u > disksInCurrentCluster)
+ {
+ u = disksInCurrentCluster;
+ }
+ ran.setSeed(objKey + currentCluster + SEED_PARAM);
+ choose(u, currentCluster, sumRemainingNodes, nodeData);
+ reset(u, currentCluster);
+ repDeg -= u;
+ }
+ if (repDeg == 0)
+ {
+ break;
+ }
+ currentCluster--;
+ }
+ return nodeData;
+ }
+
+ /**
+ * This function is an implementation of a RUSH algorithm as described by R J
+ * Honicky and Ethan Miller
+ *
+ * @param objKey
+ * - an int used as the prng seed. this int is usually derived from a
+ * string hash
+ *
+ * @return node - holds three values: abs_node - an int which is the absolute
+ * position of the located node in relation to all nodes on all
+ * subClusters rel_node - an int which is the relative postion located
+ * node within the located cluster cluster - an int which is the
+ * located cluster
+ * @throws Exception
+ *
+ */
+ public ArrayList<HashMap> findNode(String objKey) throws Exception
+ {
+ // turn a string identifier into an integer for the random seed
+ CRC32 crc32 = new CRC32();
+ byte[] bytes = objKey.getBytes();
+ crc32.update(bytes);
+ long crc32Value = crc32.getValue();
+ long objKeyLong = (crc32Value >> 16) & 0x7fff;
+ return findNode(objKeyLong);
+ }
+
+ public void reset(int nodesToRetrieve, int currentCluster)
+ {
+ Integer[] list = (Integer[]) clusters[currentCluster].get("list");
+ Integer count = (Integer) clusters[currentCluster].get("count");
+
+ int listIdx;
+ int val;
+ for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
+ {
+ listIdx = count - nodesToRetrieve + nodeIdx;
+ val = list[listIdx];
+ if (val < (count - nodesToRetrieve))
+ {
+ list[val] = val;
+ }
+ list[listIdx] = listIdx;
+ }
+ }
+
+ public void choose(int nodesToRetrieve, int currentCluster,
+ int remainingNodes, ArrayList<HashMap> nodeData)
+ {
+ Integer[] list = (Integer[]) clusters[currentCluster].get("list");
+ Integer count = (Integer) clusters[currentCluster].get("count");
+
+ int maxIdx;
+ int randNode;
+ int chosen;
+ for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
+ {
+ maxIdx = count - nodeIdx - 1;
+ randNode = ran.nextInt(maxIdx + 1);
+ // swap
+ chosen = list[randNode];
+ list[randNode] = list[maxIdx];
+ list[maxIdx] = chosen;
+ // add the remaining nodes so we can find the node data when we are done
+ nodeData.add(nodes[remainingNodes + chosen]);
+ }
+ }
+
+ /**
+ * @param objKey
+ * @return
+ * @throws com.targetnode.data.locator.Exception
+ */
+ public ArrayList<HashMap> findNodes(String objKey) throws Exception
+ {
+ return findNode(objKey);
+ }
+
+ public int getReplicationDegree()
+ {
+ return replicationDegree;
+ }
+
+ public int getTotalNodes()
+ {
+ return totalNodes;
+ }
+
+ public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks,
+ int weight)
+ {
+ int found = 0;
+ float z;
+ float prob;
+ int ranInt;
+
+ for (int i = 0; i < replicas; i++)
+ {
+ if (totalDisks != 0)
+ {
+ ranInt = ran.nextInt((int) (ranMax + 1));
+ z = ((float) ranInt / ranMax);
+ prob = ((float) disksInCurrentCluster / (float) totalDisks);
+ if (z <= prob)
+ {
+ found++;
+ disksInCurrentCluster -= weight;
+ }
+ totalDisks -= weight;
+ }
+ }
+ return found;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
new file mode 100644
index 0000000..9c07c13
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -0,0 +1,347 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+
+
+public class StateModelConfigGenerator
+{
+
+ public static void main(String[] args)
+ {
+ ZNRecordSerializer serializer = new ZNRecordSerializer();
+ StateModelConfigGenerator generator = new StateModelConfigGenerator();
+ System.out.println(new String(serializer.serialize(generator.generateConfigForMasterSlave())));
+ }
+
+ /**
+ * count -1 dont care any numeric value > 0 will be tried to be satisfied based on
+ * priority N all nodes in the cluster will be assigned to this state if possible R all
+ * remaining nodes in the preference list will be assigned to this state, applies only
+ * to last state
+ */
+
+ public ZNRecord generateConfigForStorageSchemata()
+ {
+ ZNRecord record = new ZNRecord("STORAGE_DEFAULT_SM_SCHEMATA");
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+ "OFFLINE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("MASTER");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("DROPPED");
+ statePriorityList.add("ERROR");
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList)
+ {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("MASTER"))
+ {
+ metadata.put("count", "N");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("OFFLINE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("DROPPED"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("ERROR"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ }
+ for (String state : statePriorityList)
+ {
+ String key = state + ".next";
+ if (state.equals("MASTER"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("OFFLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("MASTER", "MASTER");
+ metadata.put("DROPPED", "DROPPED");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("ERROR"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("OFFLINE", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("MASTER-OFFLINE");
+ stateTransitionPriorityList.add("OFFLINE-MASTER");
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return record;
+ }
+
+ public ZNRecord generateConfigForMasterSlave()
+ {
+ ZNRecord record = new ZNRecord("MasterSlave");
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+ "OFFLINE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("MASTER");
+ statePriorityList.add("SLAVE");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("DROPPED");
+ statePriorityList.add("ERROR");
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList)
+ {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("MASTER"))
+ {
+ metadata.put("count", "1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("SLAVE"))
+ {
+ metadata.put("count", "R");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("OFFLINE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("DROPPED"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("ERROR"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ }
+ for (String state : statePriorityList)
+ {
+ String key = state + ".next";
+ if (state.equals("MASTER"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("SLAVE", "SLAVE");
+ metadata.put("OFFLINE", "SLAVE");
+ metadata.put("DROPPED", "SLAVE");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("SLAVE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("MASTER", "MASTER");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("OFFLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("SLAVE", "SLAVE");
+ metadata.put("MASTER", "SLAVE");
+ metadata.put("DROPPED", "DROPPED");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("ERROR"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("OFFLINE", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("MASTER-SLAVE");
+ stateTransitionPriorityList.add("SLAVE-MASTER");
+ stateTransitionPriorityList.add("OFFLINE-SLAVE");
+ stateTransitionPriorityList.add("SLAVE-OFFLINE");
+ stateTransitionPriorityList.add("OFFLINE-DROPPED");
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return record;
+ // ZNRecordSerializer serializer = new ZNRecordSerializer();
+ // System.out.println(new String(serializer.serialize(record)));
+ }
+
+ public ZNRecord generateConfigForLeaderStandby()
+ {
+ ZNRecord record = new ZNRecord("LeaderStandby");
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+ "OFFLINE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("LEADER");
+ statePriorityList.add("STANDBY");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("DROPPED");
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList)
+ {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("LEADER"))
+ {
+ metadata.put("count", "1");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("STANDBY"))
+ {
+ metadata.put("count", "R");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("OFFLINE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("DROPPED"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+
+ }
+
+ for (String state : statePriorityList)
+ {
+ String key = state + ".next";
+ if (state.equals("LEADER"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("STANDBY", "STANDBY");
+ metadata.put("OFFLINE", "STANDBY");
+ metadata.put("DROPPED", "STANDBY");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("STANDBY"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("LEADER", "LEADER");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("OFFLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("STANDBY", "STANDBY");
+ metadata.put("LEADER", "STANDBY");
+ metadata.put("DROPPED", "DROPPED");
+ record.setMapField(key, metadata);
+ }
+
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("LEADER-STANDBY");
+ stateTransitionPriorityList.add("STANDBY-LEADER");
+ stateTransitionPriorityList.add("OFFLINE-STANDBY");
+ stateTransitionPriorityList.add("STANDBY-OFFLINE");
+ stateTransitionPriorityList.add("OFFLINE-DROPPED");
+
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return record;
+ // ZNRecordSerializer serializer = new ZNRecordSerializer();
+ // System.out.println(new String(serializer.serialize(record)));
+ }
+
+ public ZNRecord generateConfigForOnlineOffline()
+ {
+ ZNRecord record = new ZNRecord("OnlineOffline");
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+ "OFFLINE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("ONLINE");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("DROPPED");
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList)
+ {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("ONLINE"))
+ {
+ metadata.put("count", "R");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("OFFLINE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("DROPPED"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ }
+
+ for (String state : statePriorityList)
+ {
+ String key = state + ".next";
+ if (state.equals("ONLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("OFFLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "ONLINE");
+ metadata.put("DROPPED", "DROPPED");
+ record.setMapField(key, metadata);
+ }
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("OFFLINE-ONLINE");
+ stateTransitionPriorityList.add("ONLINE-OFFLINE");
+ stateTransitionPriorityList.add("OFFLINE-DROPPED");
+
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return record;
+ // ZNRecordSerializer serializer = new ZNRecordSerializer();
+ // System.out.println(new String(serializer.serialize(record)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java b/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java
new file mode 100644
index 0000000..28ca786
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestCommand.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import org.apache.helix.HelixManager;
+
+public class TestCommand
+{
+ public enum CommandType
+ {
+ MODIFY,
+ VERIFY,
+ START,
+ STOP
+ }
+
+ public static class NodeOpArg
+ {
+ public HelixManager _manager;
+ public Thread _thread;
+
+ public NodeOpArg(HelixManager manager, Thread thread)
+ {
+ _manager = manager;
+ _thread = thread;
+ }
+ }
+
+ public TestTrigger _trigger;
+ public CommandType _commandType;
+ public ZnodeOpArg _znodeOpArg;
+ public NodeOpArg _nodeOpArg;
+
+ public long _startTimestamp;
+ public long _finishTimestamp;
+
+ /**
+ *
+ * @param type
+ * @param arg
+ */
+ public TestCommand(CommandType type, ZnodeOpArg arg)
+ {
+ this(type, new TestTrigger(), arg);
+ }
+
+ /**
+ *
+ * @param type
+ * @param trigger
+ * @param arg
+ */
+ public TestCommand(CommandType type, TestTrigger trigger, ZnodeOpArg arg)
+ {
+ _commandType = type;
+ _trigger = trigger;
+ _znodeOpArg = arg;
+ }
+
+ /**
+ *
+ * @param type
+ * @param trigger
+ * @param arg
+ */
+ public TestCommand(CommandType type, TestTrigger trigger, NodeOpArg arg)
+ {
+ _commandType = type;
+ _trigger = trigger;
+ _nodeOpArg = arg;
+ }
+
+ @Override
+ public String toString()
+ {
+ String ret = super.toString().substring(super.toString().lastIndexOf(".") + 1) + " ";
+ if (_finishTimestamp > 0)
+ {
+ ret += "FINISH@" + _finishTimestamp + "-START@" + _startTimestamp
+ + "=" + (_finishTimestamp - _startTimestamp) + "ms ";
+ }
+ if (_commandType == CommandType.MODIFY || _commandType == CommandType.VERIFY)
+ {
+ ret += _commandType.toString() + "|" + _trigger.toString() + "|" + _znodeOpArg.toString();
+ }
+ else if (_commandType == CommandType.START || _commandType == CommandType.STOP)
+ {
+ ret += _commandType.toString() + "|" + _trigger.toString() + "|" + _nodeOpArg.toString();
+ }
+
+ return ret;
+ }
+}