You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[26/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForStorageNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForStorageNode.java b/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForStorageNode.java
deleted file mode 100644
index 103ac93..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForStorageNode.java
+++ /dev/null
@@ -1,787 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-
-/**
- * IdealStateCalculatorForStorageNode tries to optimally allocate master/slave partitions among
- * espresso storage nodes.
- *
- * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a initial state
- * When new batches of storage nodes are added, the algorithm will calculate the new ideal state such that the total
- * partition movements are minimized.
- *
- */
-public class IdealStateCalculatorForStorageNode
-{
- static final String _MasterAssignmentMap = "MasterAssignmentMap";
- static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
- static final String _partitions = "partitions";
- static final String _replicas = "replicas";
-
- /**
- * Calculate the initial ideal state given a batch of storage instances, the replication factor and
- * number of partitions
- *
- * 1. Calculate the master assignment by random shuffling
- * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
- * 3. for each storage instance, calculate the i-th slave assignment map
- * 4. Combine the i-th slave assignment maps together
- *
- * @param instanceNames
- * list of storage node instances
- * @param partitions
- * number of partitions
- * @param replicas
- * The number of replicas (slave partitions) per master partition
- * @param masterStateValue
- * master state value: e.g. "MASTER" or "LEADER"
- * @param slaveStateValue
- * slave state value: e.g. "SLAVE" or "STANDBY"
- * @param resourceName
- * @return a ZNRecord that contain the idealstate info
- */
- public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
- String masterStateValue, String slaveStateValue)
- {
- Collections.sort(instanceNames);
- if(instanceNames.size() < replicas + 1)
- {
- throw new HelixException("Number of instances must not be less than replicas + 1. "
- + "instanceNr:" + instanceNames.size()
- + ", replicas:" + replicas);
- }
- else if(partitions < instanceNames.size())
- {
- ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
- int i = 0;
- for(String partitionId : idealState.getMapFields().keySet())
- {
- Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
- List<String> partitionAssignmentPriorityList = new ArrayList<String>();
- String masterInstance = "";
- for(String instanceName : partitionAssignmentMap.keySet())
- {
- if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
- && masterInstance.equals(""))
- {
- masterInstance = instanceName;
- }
- else
- {
- partitionAssignmentPriorityList.add(instanceName);
- }
- }
- Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
- partitionAssignmentPriorityList.add(0, masterInstance);
- idealState.setListField(partitionId, partitionAssignmentPriorityList);
- }
- return idealState;
- }
-
- Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
-
- return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
- }
-
- public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
- String masterStateValue, String slaveStateValue)
- {
- Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
-
- for(int i = 1; i < instanceBatches.size(); i++)
- {
- result = calculateNextIdealState(instanceBatches.get(i), result);
- }
-
- return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
- }
-
- /**
- * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
- */
- public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
- String masterStateValue, String slaveStateValue)
- {
- Map<String, List<Integer>> nodeMasterAssignmentMap
- = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
- Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
- = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
-
- int partitions = (Integer)(result.get("partitions"));
-
- ZNRecord idealState = new ZNRecord(resourceName);
- idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
-
-
- for(String instanceName : nodeMasterAssignmentMap.keySet())
- {
- for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
- {
- String partitionName = resourceName+"_"+partitionId;
- if(!idealState.getMapFields().containsKey(partitionName))
- {
- idealState.setMapField(partitionName, new TreeMap<String, String>());
- }
- idealState.getMapField(partitionName).put(instanceName, masterStateValue);
- }
- }
-
- for(String instanceName : nodeSlaveAssignmentMap.keySet())
- {
- Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
-
- for(String slaveNode: slaveAssignmentMap.keySet())
- {
- List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
- for(Integer partitionId: slaveAssignment)
- {
- String partitionName = resourceName+"_"+partitionId;
- idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
- }
- }
- }
- // generate the priority list of instances per partition. Master should be at front and slave follows.
-
- for(String partitionId : idealState.getMapFields().keySet())
- {
- Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
- List<String> partitionAssignmentPriorityList = new ArrayList<String>();
- String masterInstance = "";
- for(String instanceName : partitionAssignmentMap.keySet())
- {
- if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
- && masterInstance.equals(""))
- {
- masterInstance = instanceName;
- }
- else
- {
- partitionAssignmentPriorityList.add(instanceName);
- }
- }
- Collections.shuffle(partitionAssignmentPriorityList);
- partitionAssignmentPriorityList.add(0, masterInstance);
- idealState.setListField(partitionId, partitionAssignmentPriorityList);
- }
- assert(result.containsKey("replicas"));
- idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
- return idealState;
- }
- /**
- * Calculate the initial ideal state given a batch of storage instances, the replication factor and
- * number of partitions
- *
- * 1. Calculate the master assignment by random shuffling
- * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
- * 3. for each storage instance, calculate the i-th slave assignment map
- * 4. Combine the i-th slave assignment maps together
- *
- * @param instanceNames
- * list of storage node instances
- * @param weight
- * weight for the initial storage node (each node has the same weight)
- * @param partitions
- * number of partitions
- * @param replicas
- * The number of replicas (slave partitions) per master partition
- * @return a map that contain the idealstate info
- */
- public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
- {
- Random r = new Random(54321);
- assert(replicas <= instanceNames.size() - 1);
-
- ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
- for(int i = 0;i< partitions; i++)
- {
- masterPartitionAssignment.add(i);
- }
- // shuffle the partition id array
- Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
-
- // 1. Generate the random master partition assignment
- // instanceName -> List of master partitions on that instance
- Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
- for(int i = 0; i < masterPartitionAssignment.size(); i++)
- {
- String instanceName = instanceNames.get(i % instanceNames.size());
- if(!nodeMasterAssignmentMap.containsKey(instanceName))
- {
- nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
- }
- nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
- }
-
- // instanceName -> slave assignment for its master partitions
- // slave assignment: instanceName -> list of slave partitions on it
- List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
-
- Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
- Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
-
- if(replicas > 0)
- {
- // 2. For each node, calculate the evenly distributed slave as the first slave assignment
- // We will figure out the 2nd ...replicas-th slave assignment based on the first level slave assignment
- for(int i = 0; i < instanceNames.size(); i++)
- {
- List<String> slaveInstances = new ArrayList<String>();
- ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
- TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
-
- for(int j = 0;j < instanceNames.size(); j++)
- {
- if(j != i)
- {
- slaveInstances.add(instanceNames.get(j));
- slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
- }
- }
- // Get the number of master partitions on instanceName
- List<Integer> masterAssignment = nodeMasterAssignmentMap.get(instanceNames.get(i));
- // do a random shuffling as in step 1, so that the first-level slave are distributed among rest instances
-
-
- for(int j = 0;j < masterAssignment.size(); j++)
- {
- slaveAssignment.add(j);
- }
- Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
-
- Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
-
- // Get the slave assignment map of node instanceName
- for(int j = 0;j < masterAssignment.size(); j++)
- {
- String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
- if(!slaveAssignmentMap.containsKey(slaveInstanceName))
- {
- slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
- }
- slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
- }
- firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
- }
- nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
- // From the first slave assignment map, calculate the rest slave assignment maps
- for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
- {
- // calculate the next slave partition assignment map
- Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
- = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
- nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
- }
-
- // Combine the calculated 1...replicas-th slave assignment map together
-
- for(String instanceName : nodeMasterAssignmentMap.keySet())
- {
- Map<String, List<Integer>> combinedSlaveAssignmentMap = new TreeMap<String, List<Integer>>();
-
- for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
- {
- Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
-
- for(String slaveInstance : slaveAssignmentMap.keySet())
- {
- if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
- {
- combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
- }
- combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
- }
- }
- migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
- combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
- }
- }
- /*
- // Print the result master and slave assignment maps
- System.out.println("Master assignment:");
- for(String instanceName : nodeMasterAssignmentMap.keySet())
- {
- System.out.println(instanceName+":");
- for(Integer x : nodeMasterAssignmentMap.get(instanceName))
- {
- System.out.print(x+" ");
- }
- System.out.println();
- System.out.println("Slave assignment:");
-
- int slaveOrder = 1;
- for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
- {
- System.out.println("Slave assignment order :" + (slaveOrder++));
- Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
- for(String slaveName : slaveAssignmentMap.keySet())
- {
- System.out.print("\t" + slaveName +":\n\t" );
- for(Integer x : slaveAssignmentMap.get(slaveName))
- {
- System.out.print(x + " ");
- }
- System.out.println("\n");
- }
- }
- System.out.println("\nCombined slave assignment map");
- Map<String, List<Integer>> slaveAssignmentMap = combinedNodeSlaveAssignmentMap.get(instanceName);
- for(String slaveName : slaveAssignmentMap.keySet())
- {
- System.out.print("\t" + slaveName +":\n\t" );
- for(Integer x : slaveAssignmentMap.get(slaveName))
- {
- System.out.print(x + " ");
- }
- System.out.println("\n");
- }
- }*/
- Map<String, Object> result = new TreeMap<String, Object>();
- result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
- result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
- result.put("replicas", new Integer(replicas));
- result.put("partitions", new Integer(partitions));
- return result;
- }
- /**
- * In the case there are more than 1 slave, we use the following algorithm to calculate the n-th slave
- * assignment map based on the first level slave assignment map.
- *
- * @param firstInstanceSlaveAssignmentMap the first slave assignment map for all instances
- * @param order of the slave
- * @return the n-th slave assignment map for all the instances
- * */
- static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
- {
- Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
-
- for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
- {
- Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
- result.put(currentInstance, resultAssignmentMap);
- }
-
- for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
- {
- Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
- Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
- int offset = replicaOrder - 1;
- for(String instance : previousSlaveAssignmentMap.keySet())
- {
- List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
- // Obtain an array of other instances
- for(String otherInstance : previousSlaveAssignmentMap.keySet())
- {
- otherInstances.add(otherInstance);
- }
- Collections.sort(otherInstances);
- int instanceIndex = -1;
- for(int index = 0;index < otherInstances.size(); index++)
- {
- if(otherInstances.get(index).equalsIgnoreCase(instance))
- {
- instanceIndex = index;
- }
- }
- assert(instanceIndex >= 0);
- if(instanceIndex == otherInstances.size() - 1)
- {
- instanceIndex --;
- }
- // Since we need to evenly distribute the slaves on "instance" to other partitions, we
- // need to remove "instance" from the array.
- otherInstances.remove(instance);
-
- // distribute previous slave assignment to other instances.
- List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
- for(int i = 0; i < previousAssignmentList.size(); i++)
- {
-
- // Evenly distribute the previousAssignmentList to the remaining other instances
- int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
- String newInstance = otherInstances.get(newInstanceIndex);
- if(!resultAssignmentMap.containsKey(newInstance))
- {
- resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
- }
- resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
- }
- }
- }
- return result;
- }
-
- /**
- * Given the current idealState, and the list of new Instances needed to be added, calculate the
- * new Ideal state.
- *
- * 1. Calculate how many master partitions should be moved to the new cluster of instances
- * 2. assign the number of master partitions px to be moved to each previous node
- * 3. for each previous node,
- * 3.1 randomly choose px nodes, move them to temp list
- * 3.2 for each px nodes, remove them from the slave assignment map; record the map position of
- * the partition;
- * 3.3 calculate # of new nodes that should be put in the slave assignment map
- * 3.4 even-fy the slave assignment map;
- * 3.5 randomly place # of new nodes that should be placed in
- *
- * 4. from all the temp master node list get from 3.1,
- * 4.1 randomly assign them to nodes in the new cluster
- *
- * 5. for each node in the new cluster,
- * 5.1 assemble the slave assignment map
- * 5.2 even-fy the slave assignment map
- *
- * @param newInstances
- * list of new added storage node instances
- * @param weight
- * weight for the new storage nodes (each node has the same weight)
- * @param previousIdealState
- * The previous ideal state
- * @return a map that contain the updated idealstate info
- * */
- public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
- {
- // Obtain the master / slave assignment info maps
- Collections.sort(newInstances);
- Map<String, List<Integer>> previousMasterAssignmentMap
- = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
- Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
- = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
-
- List<String> oldInstances = new ArrayList<String>();
- for(String oldInstance : previousMasterAssignmentMap.keySet())
- {
- oldInstances.add(oldInstance);
- }
-
- int previousInstanceNum = previousMasterAssignmentMap.size();
- int partitions = (Integer)(previousIdealState.get("partitions"));
-
- // TODO: take weight into account when calculate this
-
- int totalMasterParitionsToMove
- = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
- int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
- int remain = totalMasterParitionsToMove % previousInstanceNum;
-
- // Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1) partitions.
- // And we should first choose those (numMastersFromEachNode + 1) moves from the instances that has more
- // master partitions
- List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
-
- // For corresponding moved slave partitions, keep track of their original location; the new node does not
- // need to migrate all of them.
- Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
-
- // Make sure that the instances that holds more master partitions are put in front
- List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
- for(String oldInstance : previousMasterAssignmentMap.keySet())
- {
- List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
- if(masterAssignmentList.size() > numMastersFromEachNode)
- {
- bigList.add(oldInstance);
- }
- else
- {
- smallList.add(oldInstance);
- }
- }
- // "sort" the list, such that the nodes that has more master partitions moves more partitions to the
- // new added batch of instances.
- bigList.addAll(smallList);
- int totalSlaveMoves = 0;
- for(String oldInstance : bigList)
- {
- List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
- int numToChoose = numMastersFromEachNode;
- if(remain > 0)
- {
- numToChoose = numMastersFromEachNode + 1;
- remain --;
- }
- // randomly remove numToChoose of master partitions to the new added nodes
- ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
- randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
-
- masterPartitionListToMove.addAll(masterPartionsMoved);
- Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
- removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
-
- // Make sure that for old instances, the slave placement map is evenly distributed
- // Trace the "local slave moves", which should together contribute to most of the slave migrations
- int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
- // System.out.println("local moves: "+ movesWithinInstance);
- totalSlaveMoves += movesWithinInstance;
- }
- // System.out.println("local slave moves total: "+ totalSlaveMoves);
- // calculate the master /slave assignment for the new added nodes
-
- // We already have the list of master partitions that will migrate to new batch of instances,
- // shuffle the partitions and assign them to new instances
- Collections.shuffle(masterPartitionListToMove, new Random(12345));
- for(int i = 0;i < newInstances.size(); i++)
- {
- String newInstance = newInstances.get(i);
- List<Integer> masterPartitionList = new ArrayList<Integer>();
- for(int j = 0;j < masterPartitionListToMove.size(); j++)
- {
- if(j % newInstances.size() == i)
- {
- masterPartitionList.add(masterPartitionListToMove.get(j));
- }
- }
-
- Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
- for(String oldInstance : oldInstances)
- {
- slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
- }
- // Build the slave assignment map for the new instance, based on the saved information
- // about those slave partition locations in slavePartitionsToMoveMap
- for(Integer x : masterPartitionList)
- {
- for(String oldInstance : slavePartitionsToMoveMap.keySet())
- {
- List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
- if(slaves.contains(x))
- {
- slavePartitionMap.get(oldInstance).add(x);
- }
- }
- }
- // add entry for other new instances into the slavePartitionMap
- List<String> otherNewInstances = new ArrayList<String>();
- for(String instance : newInstances)
- {
- if(!instance.equalsIgnoreCase(newInstance))
- {
- otherNewInstances.add(instance);
- }
- }
- // Make sure that slave partitions are evenly distributed
- migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
-
- // Update the result in the result map. We can reuse the input previousIdealState map as
- // the result.
- previousMasterAssignmentMap.put(newInstance, masterPartitionList);
- nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
-
- }
- /*
- // Print content of the master/ slave assignment maps
- for(String instanceName : previousMasterAssignmentMap.keySet())
- {
- System.out.println(instanceName+":");
- for(Integer x : previousMasterAssignmentMap.get(instanceName))
- {
- System.out.print(x+" ");
- }
- System.out.println("\nmaster partition moved:");
-
- System.out.println();
- System.out.println("Slave assignment:");
-
- Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
- for(String slaveName : slaveAssignmentMap.keySet())
- {
- System.out.print("\t" + slaveName +":\n\t" );
- for(Integer x : slaveAssignmentMap.get(slaveName))
- {
- System.out.print(x + " ");
- }
- System.out.println("\n");
- }
- }
-
- System.out.println("Master partitions migrated to new instances");
- for(Integer x : masterPartitionListToMove)
- {
- System.out.print(x+" ");
- }
- System.out.println();
-
- System.out.println("Slave partitions migrated to new instances");
- for(String oldInstance : slavePartitionsToMoveMap.keySet())
- {
- System.out.print(oldInstance + ": ");
- for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
- {
- System.out.print(x+" ");
- }
- System.out.println();
- }
- */
- return previousIdealState;
- }
-
- public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
- String resourceName, String masterStateValue, String slaveStateValue)
- {
- return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
- }
- /**
- * Given the list of master partition that will be migrated away from the storage instance,
- * Remove their entries from the local instance slave assignment map.
- *
- * @param slaveAssignmentMap the local instance slave assignment map
- * @param masterPartionsMoved the list of master partition ids that will be migrated away
- * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
- * used by new added storage nodes.
- * */
- static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
- {
- for(String instanceName : slaveAssignmentMap.keySet())
- {
- List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
- for(Integer partitionId: masterPartionsMoved)
- {
- if(slaveAssignment.contains(partitionId))
- {
- slaveAssignment.remove(partitionId);
- if(!removedAssignmentMap.containsKey(instanceName))
- {
- removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
- }
- removedAssignmentMap.get(instanceName).add(partitionId);
- }
- }
- }
- }
-
- /**
- * Since some new storage instances are added, each existing storage instance should migrate some
- * slave partitions to the new added instances.
- *
- * The algorithm keeps moving one partition to from the instance that hosts most slave partitions
- * to the instance that hosts least number of partitions, until max-min <= 1.
- *
- * In this way we can guarantee that all instances hosts almost same number of slave partitions, also
- * slave partitions are evenly distributed.
- *
- * @param slaveAssignmentMap the local instance slave assignment map
- * @param masterPartionsMoved the list of master partition ids that will be migrated away
- * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
- * used by new added storage nodes.
- * */
- static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
- {
- int moves = 0;
- boolean done = false;
- for(String newInstance : newInstances)
- {
- nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
- }
- while(!done)
- {
- List<Integer> maxAssignment = null, minAssignment = null;
- int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
- String minInstance = "";
- for(String instanceName : nodeSlaveAssignmentMap.keySet())
- {
- List<Integer> slaveAssignment = nodeSlaveAssignmentMap.get(instanceName);
- if(minCount > slaveAssignment.size())
- {
- minCount = slaveAssignment.size();
- minAssignment = slaveAssignment;
- minInstance = instanceName;
- }
- if(maxCount < slaveAssignment.size())
- {
- maxCount = slaveAssignment.size();
- maxAssignment = slaveAssignment;
- }
- }
- if(maxCount - minCount <= 1 )
- {
- done = true;
- }
- else
- {
- int indexToMove = -1;
- // find a partition that is not contained in the minAssignment list
- for(int i = 0; i < maxAssignment.size(); i++ )
- {
- if(!minAssignment.contains(maxAssignment.get(i)))
- {
- indexToMove = i;
- break;
- }
- }
-
- minAssignment.add(maxAssignment.get(indexToMove));
- maxAssignment.remove(indexToMove);
-
- if(newInstances.contains(minInstance))
- {
- moves++;
- }
- }
- }
- return moves;
- }
-
- /**
- * Randomly select a number of elements from original list and put them in the selectedList
- * The algorithm is used to select master partitions to be migrated when new instances are added.
- *
- *
- * @param originalList the original list
- * @param selectedList the list that contain selected elements
- * @param num number of elements to be selected
- * */
- static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num)
- {
- assert(originalList.size() >= num);
- int[] indexArray = new int[originalList.size()];
- for(int i = 0;i < indexArray.length; i++)
- {
- indexArray[i] = i;
- }
- int numRemains = originalList.size();
- Random r = new Random(numRemains);
- for(int j = 0;j < num; j++)
- {
- int randIndex = r.nextInt(numRemains);
- selectedList.add(originalList.get(randIndex));
- originalList.remove(randIndex);
- numRemains --;
- }
- }
-
- public static void main(String args[])
- {
- List<String> instanceNames = new ArrayList<String>();
- for(int i = 0;i < 10; i++)
- {
- instanceNames.add("localhost:123" + i);
- }
- int partitions = 48*3, replicas = 3;
- Map<String, Object> resultOriginal = IdealStateCalculatorForStorageNode.calculateInitialIdealState(instanceNames, partitions, replicas);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/JmxDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/JmxDumper.java b/helix-core/src/main/java/com/linkedin/helix/tools/JmxDumper.java
deleted file mode 100644
index ddcf4ad..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/JmxDumper.java
+++ /dev/null
@@ -1,471 +0,0 @@
-package com.linkedin.helix.tools;
-
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerDelegate;
-import javax.management.MBeanServerNotification;
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.relation.MBeanServerNotificationFilter;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-
-public class JmxDumper implements NotificationListener
-{
- public static final String help = "help";
- public static final String domain = "domain";
- public static final String fields = "fields";
- public static final String pattern = "pattern";
- public static final String operations = "operations";
- public static final String period = "period";
- public static final String className = "className";
- public static final String outputFile = "outputFile";
- public static final String jmxUrl = "jmxUrl";
- public static final String sampleCount = "sampleCount";
-
- private static final Logger _logger = Logger.getLogger(JmxDumper.class);
- String _domain;
- MBeanServerConnection _mbeanServer;
-
- String _beanClassName;
- String _namePattern;
- int _samplePeriod;
-
- Map<ObjectName,ObjectName> _mbeanNames = new ConcurrentHashMap<ObjectName,ObjectName>();
- Timer _timer;
-
- String _outputFileName;
-
- List<String> _outputFields = new ArrayList<String>();
- Set<String> _operations = new HashSet<String>();
- PrintWriter _outputFile;
- int _samples = 0;
- int _targetSamples = -1;
- String _jmxUrl;
-
- public JmxDumper(String jmxService,
- String domain,
- String beanClassName,
- String namePattern,
- int samplePeriod,
- List<String> fields,
- List<String> operations,
- String outputfile,
- int sampleCount
- ) throws Exception
- {
- _jmxUrl = jmxService;
- _domain = domain;
- _beanClassName = beanClassName;
- _samplePeriod = samplePeriod;
- _outputFields.addAll(fields);
- _operations.addAll(operations);
- _outputFileName = outputfile;
- _namePattern = namePattern;
- _targetSamples = sampleCount;
-
- JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + _jmxUrl + "/jmxrmi");
- JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
-
- _mbeanServer = jmxc.getMBeanServerConnection();
- MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
- filter.enableAllObjectNames();
- _mbeanServer.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, this, filter, null);
- init();
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(new SampleTask(), _samplePeriod, _samplePeriod);
- }
-
- class SampleTask extends TimerTask
- {
- @Override
- public void run()
- {
- List<ObjectName> errorMBeans = new ArrayList<ObjectName>();
- _logger.info("Sampling " + _mbeanNames.size() + " beans");
- for(ObjectName beanName : _mbeanNames.keySet())
- {
- MBeanInfo info;
- try
- {
- info = _mbeanServer.getMBeanInfo(beanName);
- }
- catch (Exception e)
- {
- _logger.error( e.getMessage()+" removing it");
- errorMBeans.add(beanName);
- continue;
- }
- if(!info.getClassName().equals(_beanClassName))
- {
- _logger.warn("Skip: className "+info.getClassName() + " expected : "+ _beanClassName);
- continue;
- }
- StringBuffer line = new StringBuffer();
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
- String date = dateFormat.format(new Date());
- line.append(date + " ");
- line.append(beanName.toString() + " ");
-
- MBeanAttributeInfo[] infos = info.getAttributes();
- Map<String, MBeanAttributeInfo> infoMap = new HashMap<String, MBeanAttributeInfo>();
- for(MBeanAttributeInfo infoItem : infos)
- {
- infoMap.put(infoItem.getName(), infoItem);
- }
-
- for(String outputField : _outputFields)
- {
- try
- {
- if(infoMap.containsKey(outputField))
- {
- Object mbeanAttributeValue = _mbeanServer.getAttribute(beanName, outputField);
- line.append(mbeanAttributeValue.toString() + " ");
- }
- else
- {
- _logger.warn(outputField + " not found");
- line.append("null ");
- }
- }
- catch (Exception e)
- {
- _logger.error("Error:", e);
- line.append("null ");
- continue;
- }
- }
- MBeanOperationInfo[] operations = info.getOperations();
- Map<String, MBeanOperationInfo> opeMap = new HashMap<String, MBeanOperationInfo>();
- for(MBeanOperationInfo opeItem : operations)
- {
- opeMap.put(opeItem.getName(), opeItem);
- }
-
- for(String ope : _operations)
- {
- if(opeMap.containsKey(ope))
- {
- try
- {
- _mbeanServer.invoke(beanName, ope, new Object[0], new String[0]);
- //System.out.println(ope+" invoked");
- }
- catch(Exception e)
- {
- _logger.error("Error:", e);
- continue;
- }
- }
- }
- _outputFile.println(line.toString());
- //System.out.println(line);
- }
- for(ObjectName deadBean : errorMBeans)
- {
- _mbeanNames.remove(deadBean);
- }
-
- _samples ++;
- //System.out.println("samples:"+_samples);
- if(_samples == _targetSamples)
- {
- synchronized(JmxDumper.this)
- {
- _logger.info(_samples + " samples done, exiting...");
- JmxDumper.this.notifyAll();
- }
- }
- }
- }
-
- void init() throws Exception
- {
- try
- {
- Set<ObjectInstance> existingInstances = _mbeanServer.queryMBeans(new ObjectName(_namePattern), null);
- _logger.info("Total " + existingInstances.size() + " mbeans matched " + _namePattern);
- for(ObjectInstance instance : existingInstances)
- {
- if(instance.getClassName().equals(_beanClassName))
- {
- _mbeanNames.put(instance.getObjectName(), instance.getObjectName());
- _logger.info("Sampling " + instance.getObjectName());
- }
- }
- FileWriter fos = new FileWriter(_outputFileName);
- System.out.println(_outputFileName);
- _outputFile = new PrintWriter(fos);
- }
- catch (Exception e)
- {
- _logger.error("fail to get all existing mbeans in " + _domain, e);
- throw e;
- }
- }
- @Override
- public void handleNotification(Notification notification, Object handback)
- {
- MBeanServerNotification mbs = (MBeanServerNotification) notification;
- if(MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(mbs.getType()))
- {
- //System.out.println("Adding mbean " + mbs.getMBeanName());
- _logger.info("Adding mbean " + mbs.getMBeanName());
- if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
- {
- addMBean( mbs.getMBeanName());
- }
- }
- else if(MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(mbs.getType()))
- {
- //System.out.println("Removing mbean " + mbs.getMBeanName());
- _logger.info("Removing mbean " + mbs.getMBeanName());
- if(mbs.getMBeanName().getDomain().equalsIgnoreCase(_domain))
- {
- removeMBean(mbs.getMBeanName());
- }
- }
- }
-
- private void addMBean(ObjectName beanName)
- {
- _mbeanNames.put(beanName, beanName);
- }
-
- private void removeMBean(ObjectName beanName)
- {
- _mbeanNames.remove(beanName);
- }
-
- public static int processCommandLineArgs(String[] cliArgs) throws Exception
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- CommandLine cmd = null;
-
- try
- {
- cmd = cliParser.parse(cliOptions, cliArgs);
- }
- catch (ParseException pe)
- {
- System.err.println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- boolean ret = checkOptionArgsNumber(cmd.getOptions());
- if (ret == false)
- {
- printUsage(cliOptions);
- System.exit(1);
- }
-
- String portStr = cmd.getOptionValue(jmxUrl);
- //int portVal = Integer.parseInt(portStr);
-
- String periodStr = cmd.getOptionValue(period);
- int periodVal = Integer.parseInt(periodStr);
-
- String domainStr = cmd.getOptionValue(domain);
- String classNameStr = cmd.getOptionValue(className);
- String patternStr = cmd.getOptionValue(pattern);
- String fieldsStr = cmd.getOptionValue(fields);
- String operationsStr = cmd.getOptionValue(operations);
- String resultFile = cmd.getOptionValue(outputFile);
- String sampleCountStr = cmd.getOptionValue(sampleCount, "-1");
- int sampleCount = Integer.parseInt(sampleCountStr);
-
- List<String> fields = Arrays.asList(fieldsStr.split(","));
- List<String> operations = Arrays.asList(operationsStr.split(","));
-
- JmxDumper dumper = null;
- try
- {
- dumper = new JmxDumper(portStr, domainStr, classNameStr, patternStr, periodVal, fields, operations, resultFile, sampleCount);
- synchronized(dumper)
- {
- dumper.wait();
- }
- }
- finally
- {
- if(dumper != null)
- {
- dumper.flushFile();
- }
- }
- return 0;
- }
-
- private void flushFile()
- {
- if(_outputFile != null)
- {
- _outputFile.flush();
- _outputFile.close();
- }
- }
- private static boolean checkOptionArgsNumber(Option[] options)
- {
- for (Option option : options)
- {
- int argNb = option.getArgs();
- String[] args = option.getValues();
- if (argNb == 0)
- {
- if (args != null && args.length > 0)
- {
- System.err.println(option.getArgName() + " shall have " + argNb
- + " arguments (was " + Arrays.toString(args) + ")");
- return false;
- }
- }
- else
- {
- if (args == null || args.length != argNb)
- {
- System.err.println(option.getArgName() + " shall have " + argNb
- + " arguments (was " + Arrays.toString(args) + ")");
- return false;
- }
- }
- }
- return true;
- }
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions()
- {
- Option helpOption =
- OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info")
- .create();
- Option domainOption =
- OptionBuilder.withLongOpt(domain)
- .withDescription("Domain of the JMX bean")
- .create();
-
- domainOption.setArgs(1);
- domainOption.setRequired(true);
-
- Option fieldsOption =
- OptionBuilder.withLongOpt(fields)
- .withDescription("Fields of the JMX bean to sample")
- .create();
- fieldsOption.setArgs(1);
- fieldsOption.setRequired(false);
-
- Option operationOption =
- OptionBuilder.withLongOpt(operations)
- .withDescription("Operation to invoke")
- .create();
- operationOption.setArgs(1);
- operationOption.setRequired(true);
-
- Option periodOption =
- OptionBuilder.withLongOpt(period)
- .withDescription("Sampling period in MS")
- .create();
- periodOption.setArgs(1);
- periodOption.setRequired(false);
-
- Option classOption =
- OptionBuilder.withLongOpt(className)
- .withDescription("Classname of the MBean")
- .create();
- classOption.setArgs(1);
- classOption.setRequired(true);
-
- Option patternOption =
- OptionBuilder.withLongOpt(pattern)
- .withDescription("pattern of the MBean")
- .create();
- patternOption.setArgs(1);
- patternOption.setRequired(true);
-
- Option outputFileOption =
- OptionBuilder.withLongOpt(outputFile)
- .withDescription("outputFileName")
- .create();
- outputFileOption.setArgs(1);
- outputFileOption.setRequired(false);
-
- Option jmxUrlOption =
- OptionBuilder.withLongOpt(jmxUrl)
- .withDescription("jmx port to connect to")
- .create();
- jmxUrlOption.setArgs(1);
- jmxUrlOption.setRequired(true);
-
- Option sampleCountOption =
- OptionBuilder.withLongOpt(sampleCount)
- .withDescription("# of samples to take")
- .create();
- sampleCountOption.setArgs(1);
- sampleCountOption.setRequired(false);
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(domainOption);
- options.addOption(fieldsOption);
- options.addOption(operationOption);
- options.addOption(classOption);
- options.addOption(outputFileOption);
- options.addOption(jmxUrlOption);
- options.addOption(patternOption);
- options.addOption(periodOption);
- options.addOption(sampleCountOption);
- return options;
- }
-
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("java " + JmxDumper.class.getName(), cliOptions);
- }
-
- public static void main(String[] args) throws Exception
- {
- /* List<String> fields = Arrays.asList(new String("AvgLatency,MaxLatency,MinLatency,PacketsReceived,PacketsSent").split(","));
- List<String> operations = Arrays.asList(new String("resetCounters").split(","));
-
- JmxDumper dumper = new JmxDumper(27961, "org.apache.zooKeeperService", "org.apache.zookeeper.server.ConnectionBean", "org.apache.ZooKeeperService:name0=*,name1=Connections,name2=*,name3=*", 1000, fields, operations, "/tmp/1.csv");
- Thread.currentThread().join();
- */
- int ret = processCommandLineArgs(args);
- System.exit(ret);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/LocalZKServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/LocalZKServer.java b/helix-core/src/main/java/com/linkedin/helix/tools/LocalZKServer.java
deleted file mode 100644
index 2da3d7e..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/LocalZKServer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
-
-/**
- * Provides ability to start zookeeper locally on a particular port
- *
- * @author kgopalak
- *
- */
-public class LocalZKServer
-{
- public void start(int port, String dataDir, String logDir) throws Exception
- {
-
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
- {
-
- @Override
- public void createDefaultNameSpace(ZkClient zkClient)
- {
-
- }
- };
- ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace, port);
- server.start();
- Thread.currentThread().join();
- }
-
- public static void main(String[] args) throws Exception
- {
- int port = 2199;
- String rootDir = System.getProperty("java.io.tmpdir") + "/zk-helix/"
- + System.currentTimeMillis();
- String dataDir = rootDir + "/dataDir";
- String logDir = rootDir + "/logDir";
-
- if (args.length > 0)
- {
- port = Integer.parseInt(args[0]);
- }
- if (args.length > 1)
- {
- dataDir = args[1];
- logDir = args[1];
- }
-
- if (args.length > 2)
- {
- logDir = args[2];
- }
- System.out.println("Starting Zookeeper locally at port:" + port
- + " dataDir:" + dataDir + " logDir:" + logDir);
- LocalZKServer localZKServer = new LocalZKServer();
-
- localZKServer.start(port, dataDir, logDir);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/MessagePoster.java b/helix-core/src/main/java/com/linkedin/helix/tools/MessagePoster.java
deleted file mode 100644
index 3ae00c5..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/MessagePoster.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.UUID;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.util.HelixUtil;
-
-public class MessagePoster
-{
- public void post(String zkServer,
- Message message,
- String clusterName,
- String instanceName)
- {
- ZkClient client = new ZkClient(zkServer);
- client.setZkSerializer(new ZNRecordSerializer());
- String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/" + message.getId();
- client.delete(path);
- ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName, instanceName));
- message.setTgtSessionId(record.getSimpleField(LiveInstanceProperty.SESSION_ID.toString()).toString());
- message.setTgtName(record.getId());
- //System.out.println(message);
- client.createPersistent(path, message.getRecord());
- }
-
- public void postFaultInjectionMessage(String zkServer,
- String clusterName,
- String instanceName,
- String payloadString,
- String partition)
- {
- Message message = new Message("FaultInjection", UUID.randomUUID().toString());
- if(payloadString != null)
- {
- message.getRecord().setSimpleField("faultType", payloadString);
- }
- if(partition != null)
- {
- message.setPartitionName(partition);
- }
-
- post(zkServer, message, clusterName, instanceName);
- }
-
- public void postTestMessage(String zkServer, String clusterName, String instanceName)
- {
- String msgSrc = "cm-instance-0";
- String msgId = "TestMessageId-2";
-
- Message message = new Message(MessageType.STATE_TRANSITION, msgId);
- message.setMsgId(msgId);
- message.setSrcName(msgSrc);
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setFromState("Slave");
- message.setToState("Master");
- message.setPartitionName("EspressoDB.partition-0." + instanceName);
-
- post(zkServer, message, clusterName, instanceName);
- }
-
- public static void main(String[] args)
- {
- if (args.length < 4 || args.length > 6)
- {
- System.err.println("Usage: java " + MessagePoster.class.getName()
- + " zkServer cluster instance msgType [payloadString] [partition]");
- System.err.println("msgType can be one of test, fault");
- System.err.println("payloadString is sent along with the fault msgType");
- System.exit(1);
- }
- String zkServer = args[0];
- String cluster = args[1];
- String instance = args[2];
- String msgType = args[3];
- String payloadString = (args.length >= 5 ? args[4] : null);
- String partition = (args.length == 6 ? args[5] : null);
-
- MessagePoster messagePoster = new MessagePoster();
- if (msgType.equals("test"))
- {
- messagePoster.postTestMessage(zkServer, cluster, instance);
- }
- else if (msgType.equals("fault"))
- {
- messagePoster.postFaultInjectionMessage(zkServer,
- cluster,
- instance,
- payloadString,
- partition);
- System.out.println("Posted " + msgType);
- }
- else
- {
- System.err.println("Message was not posted. Unknown msgType:" + msgType);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/PropertiesReader.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/PropertiesReader.java b/helix-core/src/main/java/com/linkedin/helix/tools/PropertiesReader.java
deleted file mode 100644
index 18317bf..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/PropertiesReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.io.InputStream;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-
-public class PropertiesReader
-{
- private static final Logger LOG = Logger
- .getLogger(PropertiesReader.class.getName());
-
- private final Properties _properties = new Properties();
-
- public PropertiesReader(String propertyFileName)
- {
- try
- {
- InputStream stream = Thread.currentThread().getContextClassLoader()
- .getResourceAsStream(propertyFileName);
- _properties.load(stream);
- }
- catch (Exception e)
- {
- String errMsg = "could not open properties file:" + propertyFileName;
- // LOG.error(errMsg, e);
- throw new IllegalArgumentException(errMsg, e);
- }
- }
-
- public String getProperty(String key)
- {
- String value = _properties.getProperty(key);
- if (value == null)
- {
- throw new IllegalArgumentException("no property exist for key:" + key);
- }
-
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/RUSHrHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/RUSHrHash.java b/helix-core/src/main/java/com/linkedin/helix/tools/RUSHrHash.java
deleted file mode 100644
index 187fa17..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/RUSHrHash.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.*;
-import java.util.ArrayList;
-import java.util.zip.CRC32;
-
-public class RUSHrHash
-{
- /**
- * @var int holds the value for how many replicas to create for an object
- */
- protected int replicationDegree = 1;
-
- /**
- * an array of hash maps where each hash map holds info on the sub cluster
- * that corresponds to the array indices meaning that array element 0 holds
- * data for server 0
- *
- * that is the total number of nodes in the cluster this property is populated
- * at construction time only
- *
- * @var
- */
-
- protected HashMap[] clusters;
-
- /**
- * an array of hash maps where each element holds data for a sub cluster
- */
- protected HashMap[] clusterConfig;
-
- /**
- * total number of sub-clusters in our data configuration this property is
- * populated at construction time only
- *
- * @var integer
- */
- protected int totalClusters = 0;
-
- /**
- * the total number of nodes in all of the subClusters this property is
- * populated at construction time only
- *
- * @var integer
- */
- protected int totalNodes = 0;
-
- /**
- * the total number of nodes in all of the clusters this property is populated
- * at construction time only
- *
- * @var integer
- */
- protected int totalNodesW = 0;
-
- /**
- * an array of HashMaps where each HashMap holds the data for a single node
- */
- protected HashMap[] nodes = null;
-
- /**
- * @var integer value used to help seed the random number generator
- */
- protected final int SEED_PARAM = 1560;
-
- /**
- * random number generator
- */
-
- Random ran = new Random();
-
- /**
- * maximum value we can have from the ran generator
- */
- float ranMax = (float) Math.pow(2.0, 16.0);
-
- /**
- * The constructor analyzes the passed config to obtain the fundamental values
- * and data structures for locating a node. Each of those values is described
- * in detail above with each property. briefly:
- *
- * this.clusters this.totalClusters this.totalNodes
- *
- * The values above are derived from the HashMap[] oonfig passed to the
- * locator.
- *
- * @param conf
- * dataConfig
- *
- * @throws Exception
- *
- */
-
- public RUSHrHash(HashMap<String, Object> conf) throws Exception
- {
-
- clusterConfig = (HashMap[]) conf.get("subClusters");
- replicationDegree = (Integer) conf.get("replicationDegree");
-
- HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
- totalClusters = subClusters.length;
- clusters = new HashMap[totalClusters];
- // check the confg for all of the params
- // throw a exception if they are not there
- if (totalClusters <= 0)
- {
- throw new Exception(
- "data config to the RUSHr locator does not contain a valid clusters property");
- }
-
- int nodeCt = 0;
- HashMap[] nodeData = null;
- ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
- HashMap subCluster = null, clusterData = null;
- Integer clusterDataList[] = null;
- for (int i = 0; i < totalClusters; i++)
- {
- subCluster = subClusters[i];
- nodeData = (HashMap[]) subCluster.get("nodes");
-
- nodeCt = nodeData.length;
- clusterDataList = new Integer[nodeCt];
- for (int n = 0; n < nodeCt; n++)
- {
- tempNodes.add(nodeData[n]);
- clusterDataList[n] = n;
- }
- totalNodes += nodeCt;
- totalNodesW += nodeCt * (Integer) subCluster.get("weight");
-
- clusterData = new HashMap<String, Object>();
- clusterData.put("count", nodeCt);
- clusterData.put("list", clusterDataList);
- clusters[i] = clusterData;
- }
- nodes = new HashMap[totalNodes];
- tempNodes.toArray(nodes);
- }
-
- /**
- * This function is an implementation of a RUSHr algorithm as described by R J
- * Honicky and Ethan Miller
- *
- * @param objKey
- * @throws Exception
- * @return
- */
- public ArrayList<HashMap> findNode(long objKey) throws Exception
- {
-
- HashMap[] c = this.clusters;
- int sumRemainingNodes = this.totalNodes;
- int sumRemainingNodesW = this.totalNodesW;
- int repDeg = this.replicationDegree;
- int totClu = this.totalClusters;
- int totNod = this.totalNodes;
- HashMap[] clusConfig = this.clusterConfig;
-
- // throw an exception if the data is no good
- if ((totNod <= 0) || (totClu <= 0))
- {
- throw new Exception(
- "the total nodes or total clusters is negative or 0. bad joo joos!");
- }
-
- // get the starting cluster
- int currentCluster = totClu - 1;
-
- /**
- * this loop is an implementation of the RUSHr algorithm for fast placement
- * and location of objects in a distributed storage system
- *
- * j = current cluster m = disks in current cluster n = remaining nodes
- */
- ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
- while (true)
- {
-
- // prevent an infinite loop, in case there is a bug
- if (currentCluster < 0)
- {
- throw new Exception(
- "the cluster index became negative while we were looking for the following id: objKey. This should never happen with any key. There is a bug or maybe your joo joos are BAD!");
- }
-
- HashMap clusterData = clusConfig[currentCluster];
- Integer weight = (Integer) clusterData.get("weight");
-
- Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
- sumRemainingNodes -= disksInCurrentCluster;
-
- Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
- sumRemainingNodesW -= disksInCurrentClusterW;
-
- // set the seed to our set id
- long seed = objKey + currentCluster;
- ran.setSeed(seed);
- int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes)
- : 0;
-
- int u = t
- + drawWHG(repDeg - t, disksInCurrentClusterW - t,
- disksInCurrentClusterW + sumRemainingNodesW - t, weight);
- if (u > 0)
- {
- if (u > disksInCurrentCluster)
- {
- u = disksInCurrentCluster;
- }
- ran.setSeed(objKey + currentCluster + SEED_PARAM);
- choose(u, currentCluster, sumRemainingNodes, nodeData);
- reset(u, currentCluster);
- repDeg -= u;
- }
- if (repDeg == 0)
- {
- break;
- }
- currentCluster--;
- }
- return nodeData;
- }
-
- /**
- * This function is an implementation of a RUSH algorithm as described by R J
- * Honicky and Ethan Miller
- *
- * @param objKey
- * - an int used as the prng seed. this int is usually derived from a
- * string hash
- *
- * @return node - holds three values: abs_node - an int which is the absolute
- * position of the located node in relation to all nodes on all
- * subClusters rel_node - an int which is the relative postion located
- * node within the located cluster cluster - an int which is the
- * located cluster
- * @throws Exception
- *
- */
- public ArrayList<HashMap> findNode(String objKey) throws Exception
- {
- // turn a string identifier into an integer for the random seed
- CRC32 crc32 = new CRC32();
- byte[] bytes = objKey.getBytes();
- crc32.update(bytes);
- long crc32Value = crc32.getValue();
- long objKeyLong = (crc32Value >> 16) & 0x7fff;
- return findNode(objKeyLong);
- }
-
- public void reset(int nodesToRetrieve, int currentCluster)
- {
- Integer[] list = (Integer[]) clusters[currentCluster].get("list");
- Integer count = (Integer) clusters[currentCluster].get("count");
-
- int listIdx;
- int val;
- for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
- {
- listIdx = count - nodesToRetrieve + nodeIdx;
- val = list[listIdx];
- if (val < (count - nodesToRetrieve))
- {
- list[val] = val;
- }
- list[listIdx] = listIdx;
- }
- }
-
- public void choose(int nodesToRetrieve, int currentCluster,
- int remainingNodes, ArrayList<HashMap> nodeData)
- {
- Integer[] list = (Integer[]) clusters[currentCluster].get("list");
- Integer count = (Integer) clusters[currentCluster].get("count");
-
- int maxIdx;
- int randNode;
- int chosen;
- for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
- {
- maxIdx = count - nodeIdx - 1;
- randNode = ran.nextInt(maxIdx + 1);
- // swap
- chosen = list[randNode];
- list[randNode] = list[maxIdx];
- list[maxIdx] = chosen;
- // add the remaining nodes so we can find the node data when we are done
- nodeData.add(nodes[remainingNodes + chosen]);
- }
- }
-
- /**
- * @param objKey
- * @return
- * @throws com.targetnode.data.locator.Exception
- */
- public ArrayList<HashMap> findNodes(String objKey) throws Exception
- {
- return findNode(objKey);
- }
-
- public int getReplicationDegree()
- {
- return replicationDegree;
- }
-
- public int getTotalNodes()
- {
- return totalNodes;
- }
-
- public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks,
- int weight)
- {
- int found = 0;
- float z;
- float prob;
- int ranInt;
-
- for (int i = 0; i < replicas; i++)
- {
- if (totalDisks != 0)
- {
- ranInt = ran.nextInt((int) (ranMax + 1));
- z = ((float) ranInt / ranMax);
- prob = ((float) disksInCurrentCluster / (float) totalDisks);
- if (z <= prob)
- {
- found++;
- disksInCurrentCluster -= weight;
- }
- totalDisks -= weight;
- }
- }
- return found;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java
deleted file mode 100644
index dea61a2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.model.StateModelDefinition.StateModelDefinitionProperty;
-
-public class StateModelConfigGenerator
-{
-
- public static void main(String[] args)
- {
- ZNRecordSerializer serializer = new ZNRecordSerializer();
- StateModelConfigGenerator generator = new StateModelConfigGenerator();
- System.out.println(new String(serializer.serialize(generator.generateConfigForMasterSlave())));
- }
-
- /**
- * count -1 dont care any numeric value > 0 will be tried to be satisfied based on
- * priority N all nodes in the cluster will be assigned to this state if possible R all
- * remaining nodes in the preference list will be assigned to this state, applies only
- * to last state
- */
-
- public ZNRecord generateConfigForStorageSchemata()
- {
- ZNRecord record = new ZNRecord("STORAGE_DEFAULT_SM_SCHEMATA");
- record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
- "OFFLINE");
- List<String> statePriorityList = new ArrayList<String>();
- statePriorityList.add("MASTER");
- statePriorityList.add("OFFLINE");
- statePriorityList.add("DROPPED");
- statePriorityList.add("ERROR");
- record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
- statePriorityList);
- for (String state : statePriorityList)
- {
- String key = state + ".meta";
- Map<String, String> metadata = new HashMap<String, String>();
- if (state.equals("MASTER"))
- {
- metadata.put("count", "N");
- record.setMapField(key, metadata);
- }
- else if (state.equals("OFFLINE"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- else if (state.equals("DROPPED"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- else if (state.equals("ERROR"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- }
- for (String state : statePriorityList)
- {
- String key = state + ".next";
- if (state.equals("MASTER"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("OFFLINE", "OFFLINE");
- metadata.put("DROPPED", "OFFLINE");
- record.setMapField(key, metadata);
- }
- if (state.equals("OFFLINE"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("MASTER", "MASTER");
- metadata.put("DROPPED", "DROPPED");
- record.setMapField(key, metadata);
- }
- if (state.equals("ERROR"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("OFFLINE", "OFFLINE");
- record.setMapField(key, metadata);
- }
- }
- List<String> stateTransitionPriorityList = new ArrayList<String>();
- stateTransitionPriorityList.add("MASTER-OFFLINE");
- stateTransitionPriorityList.add("OFFLINE-MASTER");
- record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
- stateTransitionPriorityList);
- return record;
- }
-
- public ZNRecord generateConfigForMasterSlave()
- {
- ZNRecord record = new ZNRecord("MasterSlave");
- record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
- "OFFLINE");
- List<String> statePriorityList = new ArrayList<String>();
- statePriorityList.add("MASTER");
- statePriorityList.add("SLAVE");
- statePriorityList.add("OFFLINE");
- statePriorityList.add("DROPPED");
- statePriorityList.add("ERROR");
- record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
- statePriorityList);
- for (String state : statePriorityList)
- {
- String key = state + ".meta";
- Map<String, String> metadata = new HashMap<String, String>();
- if (state.equals("MASTER"))
- {
- metadata.put("count", "1");
- record.setMapField(key, metadata);
- }
- else if (state.equals("SLAVE"))
- {
- metadata.put("count", "R");
- record.setMapField(key, metadata);
- }
- else if (state.equals("OFFLINE"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- else if (state.equals("DROPPED"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- else if (state.equals("ERROR"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- }
- for (String state : statePriorityList)
- {
- String key = state + ".next";
- if (state.equals("MASTER"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("SLAVE", "SLAVE");
- metadata.put("OFFLINE", "SLAVE");
- metadata.put("DROPPED", "SLAVE");
- record.setMapField(key, metadata);
- }
- else if (state.equals("SLAVE"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("MASTER", "MASTER");
- metadata.put("OFFLINE", "OFFLINE");
- metadata.put("DROPPED", "OFFLINE");
- record.setMapField(key, metadata);
- }
- else if (state.equals("OFFLINE"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("SLAVE", "SLAVE");
- metadata.put("MASTER", "SLAVE");
- metadata.put("DROPPED", "DROPPED");
- record.setMapField(key, metadata);
- }
- else if (state.equals("ERROR"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("OFFLINE", "OFFLINE");
- record.setMapField(key, metadata);
- }
- }
- List<String> stateTransitionPriorityList = new ArrayList<String>();
- stateTransitionPriorityList.add("MASTER-SLAVE");
- stateTransitionPriorityList.add("SLAVE-MASTER");
- stateTransitionPriorityList.add("OFFLINE-SLAVE");
- stateTransitionPriorityList.add("SLAVE-OFFLINE");
- stateTransitionPriorityList.add("OFFLINE-DROPPED");
- record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
- stateTransitionPriorityList);
- return record;
- // ZNRecordSerializer serializer = new ZNRecordSerializer();
- // System.out.println(new String(serializer.serialize(record)));
- }
-
- public ZNRecord generateConfigForLeaderStandby()
- {
- ZNRecord record = new ZNRecord("LeaderStandby");
- record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
- "OFFLINE");
- List<String> statePriorityList = new ArrayList<String>();
- statePriorityList.add("LEADER");
- statePriorityList.add("STANDBY");
- statePriorityList.add("OFFLINE");
- statePriorityList.add("DROPPED");
- record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
- statePriorityList);
- for (String state : statePriorityList)
- {
- String key = state + ".meta";
- Map<String, String> metadata = new HashMap<String, String>();
- if (state.equals("LEADER"))
- {
- metadata.put("count", "1");
- record.setMapField(key, metadata);
- }
- if (state.equals("STANDBY"))
- {
- metadata.put("count", "R");
- record.setMapField(key, metadata);
- }
- if (state.equals("OFFLINE"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- if (state.equals("DROPPED"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
-
- }
-
- for (String state : statePriorityList)
- {
- String key = state + ".next";
- if (state.equals("LEADER"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("STANDBY", "STANDBY");
- metadata.put("OFFLINE", "STANDBY");
- metadata.put("DROPPED", "STANDBY");
- record.setMapField(key, metadata);
- }
- if (state.equals("STANDBY"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("LEADER", "LEADER");
- metadata.put("OFFLINE", "OFFLINE");
- metadata.put("DROPPED", "OFFLINE");
- record.setMapField(key, metadata);
- }
- if (state.equals("OFFLINE"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("STANDBY", "STANDBY");
- metadata.put("LEADER", "STANDBY");
- metadata.put("DROPPED", "DROPPED");
- record.setMapField(key, metadata);
- }
-
- }
- List<String> stateTransitionPriorityList = new ArrayList<String>();
- stateTransitionPriorityList.add("LEADER-STANDBY");
- stateTransitionPriorityList.add("STANDBY-LEADER");
- stateTransitionPriorityList.add("OFFLINE-STANDBY");
- stateTransitionPriorityList.add("STANDBY-OFFLINE");
- stateTransitionPriorityList.add("OFFLINE-DROPPED");
-
- record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
- stateTransitionPriorityList);
- return record;
- // ZNRecordSerializer serializer = new ZNRecordSerializer();
- // System.out.println(new String(serializer.serialize(record)));
- }
-
- public ZNRecord generateConfigForOnlineOffline()
- {
- ZNRecord record = new ZNRecord("OnlineOffline");
- record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
- "OFFLINE");
- List<String> statePriorityList = new ArrayList<String>();
- statePriorityList.add("ONLINE");
- statePriorityList.add("OFFLINE");
- statePriorityList.add("DROPPED");
- record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
- statePriorityList);
- for (String state : statePriorityList)
- {
- String key = state + ".meta";
- Map<String, String> metadata = new HashMap<String, String>();
- if (state.equals("ONLINE"))
- {
- metadata.put("count", "R");
- record.setMapField(key, metadata);
- }
- if (state.equals("OFFLINE"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- if (state.equals("DROPPED"))
- {
- metadata.put("count", "-1");
- record.setMapField(key, metadata);
- }
- }
-
- for (String state : statePriorityList)
- {
- String key = state + ".next";
- if (state.equals("ONLINE"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("OFFLINE", "OFFLINE");
- metadata.put("DROPPED", "OFFLINE");
- record.setMapField(key, metadata);
- }
- if (state.equals("OFFLINE"))
- {
- Map<String, String> metadata = new HashMap<String, String>();
- metadata.put("ONLINE", "ONLINE");
- metadata.put("DROPPED", "DROPPED");
- record.setMapField(key, metadata);
- }
- }
- List<String> stateTransitionPriorityList = new ArrayList<String>();
- stateTransitionPriorityList.add("OFFLINE-ONLINE");
- stateTransitionPriorityList.add("ONLINE-OFFLINE");
- stateTransitionPriorityList.add("OFFLINE-DROPPED");
-
- record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
- stateTransitionPriorityList);
- return record;
- // ZNRecordSerializer serializer = new ZNRecordSerializer();
- // System.out.println(new String(serializer.serialize(record)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/TestCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/TestCommand.java b/helix-core/src/main/java/com/linkedin/helix/tools/TestCommand.java
deleted file mode 100644
index 494b0d2..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/TestCommand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import com.linkedin.helix.HelixManager;
-
-public class TestCommand
-{
- public enum CommandType
- {
- MODIFY,
- VERIFY,
- START,
- STOP
- }
-
- public static class NodeOpArg
- {
- public HelixManager _manager;
- public Thread _thread;
-
- public NodeOpArg(HelixManager manager, Thread thread)
- {
- _manager = manager;
- _thread = thread;
- }
- }
-
- public TestTrigger _trigger;
- public CommandType _commandType;
- public ZnodeOpArg _znodeOpArg;
- public NodeOpArg _nodeOpArg;
-
- public long _startTimestamp;
- public long _finishTimestamp;
-
- /**
- *
- * @param type
- * @param arg
- */
- public TestCommand(CommandType type, ZnodeOpArg arg)
- {
- this(type, new TestTrigger(), arg);
- }
-
- /**
- *
- * @param type
- * @param trigger
- * @param arg
- */
- public TestCommand(CommandType type, TestTrigger trigger, ZnodeOpArg arg)
- {
- _commandType = type;
- _trigger = trigger;
- _znodeOpArg = arg;
- }
-
- /**
- *
- * @param type
- * @param trigger
- * @param arg
- */
- public TestCommand(CommandType type, TestTrigger trigger, NodeOpArg arg)
- {
- _commandType = type;
- _trigger = trigger;
- _nodeOpArg = arg;
- }
-
- @Override
- public String toString()
- {
- String ret = super.toString().substring(super.toString().lastIndexOf(".") + 1) + " ";
- if (_finishTimestamp > 0)
- {
- ret += "FINISH@" + _finishTimestamp + "-START@" + _startTimestamp
- + "=" + (_finishTimestamp - _startTimestamp) + "ms ";
- }
- if (_commandType == CommandType.MODIFY || _commandType == CommandType.VERIFY)
- {
- ret += _commandType.toString() + "|" + _trigger.toString() + "|" + _znodeOpArg.toString();
- }
- else if (_commandType == CommandType.START || _commandType == CommandType.STOP)
- {
- ret += _commandType.toString() + "|" + _trigger.toString() + "|" + _nodeOpArg.toString();
- }
-
- return ret;
- }
-}