You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[25/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/com/linkedin/helix/tools/ClusterStateVerifier.java
deleted file mode 100644
index 0ee1147..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/ClusterStateVerifier.java
+++ /dev/null
@@ -1,909 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ClusterView;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyPathConfig;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.controller.pipeline.Stage;
-import com.linkedin.helix.controller.pipeline.StageContext;
-import com.linkedin.helix.controller.stages.AttributeName;
-import com.linkedin.helix.controller.stages.BestPossibleStateCalcStage;
-import com.linkedin.helix.controller.stages.BestPossibleStateOutput;
-import com.linkedin.helix.controller.stages.ClusterDataCache;
-import com.linkedin.helix.controller.stages.ClusterEvent;
-import com.linkedin.helix.controller.stages.CurrentStateComputationStage;
-import com.linkedin.helix.controller.stages.ResourceComputationStage;
-import com.linkedin.helix.manager.file.FileHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.Partition;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.store.PropertyJsonComparator;
-import com.linkedin.helix.store.PropertyJsonSerializer;
-import com.linkedin.helix.store.file.FilePropertyStore;
-import com.linkedin.helix.util.ZKClientPool;
-
-public class ClusterStateVerifier
-{
- public static String cluster = "cluster";
- public static String zkServerAddress = "zkSvr";
- public static String help = "help";
- public static String timeout = "timeout";
- public static String period = "period";
-
- private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
-
- public interface Verifier
- {
- boolean verify();
- }
-
- public interface ZkVerifier extends Verifier
- {
- ZkClient getZkClient();
-
- String getClusterName();
- }
-
- static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener
- {
- final CountDownLatch _countDown;
- final ZkClient _zkClient;
- final Verifier _verifier;
-
- public ExtViewVeriferZkListener(CountDownLatch countDown,
- ZkClient zkClient,
- ZkVerifier verifier)
- {
- _countDown = countDown;
- _zkClient = zkClient;
- _verifier = verifier;
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception
- {
- boolean result = _verifier.verify();
- if (result == true)
- {
- _countDown.countDown();
- }
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
- {
- for (String child : currentChilds)
- {
- String childPath =
- parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
- _zkClient.subscribeDataChanges(childPath, this);
- }
-
- boolean result = _verifier.verify();
- if (result == true)
- {
- _countDown.countDown();
- }
- }
-
- }
-
- /**
- * verifier that verifies best possible state and external view
- */
- public static class BestPossAndExtViewZkVerifier implements ZkVerifier
- {
- private final String zkAddr;
- private final String clusterName;
- private final Map<String, Map<String, String>> errStates;
- private final ZkClient zkClient;
-
- public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName)
- {
- this(zkAddr, clusterName, null);
- }
-
- public BestPossAndExtViewZkVerifier(String zkAddr,
- String clusterName,
- Map<String, Map<String, String>> errStates)
- {
- if (zkAddr == null || clusterName == null)
- {
- throw new IllegalArgumentException("requires zkAddr|clusterName");
- }
- this.zkAddr = zkAddr;
- this.clusterName = clusterName;
- this.errStates = errStates;
- this.zkClient = ZKClientPool.getZkClient(zkAddr); // null;
- }
-
- @Override
- public boolean verify()
- {
- try
- {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName,
- new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
- return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
- }
- catch (Exception e)
- {
- LOG.error("exception in verification", e);
- }
- return false;
- }
-
- @Override
- public ZkClient getZkClient()
- {
- return zkClient;
- }
-
- @Override
- public String getClusterName()
- {
- return clusterName;
- }
-
- @Override
- public String toString()
- {
- String verifierName = getClass().getName();
- verifierName =
- verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
- return verifierName + "(" + clusterName + "@" + zkAddr + ")";
- }
- }
-
- public static class BestPossAndExtViewFileVerifier implements Verifier
- {
- private final String rootPath;
- private final String clusterName;
- private final Map<String, Map<String, String>> errStates;
- private final FilePropertyStore<ZNRecord> fileStore;
-
- public BestPossAndExtViewFileVerifier(String rootPath, String clusterName)
- {
- this(rootPath, clusterName, null);
- }
-
- public BestPossAndExtViewFileVerifier(String rootPath,
- String clusterName,
- Map<String, Map<String, String>> errStates)
- {
- if (rootPath == null || clusterName == null)
- {
- throw new IllegalArgumentException("requires rootPath|clusterName");
- }
- this.rootPath = rootPath;
- this.clusterName = clusterName;
- this.errStates = errStates;
-
- this.fileStore =
- new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
- rootPath,
- new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
- }
-
- @Override
- public boolean verify()
- {
- try
- {
- HelixDataAccessor accessor = new FileHelixDataAccessor(fileStore, clusterName);
-
- return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
- }
- catch (Exception e)
- {
- LOG.error("exception in verification", e);
- return false;
- }
- finally
- {
- }
- }
-
- @Override
- public String toString()
- {
- String verifierName = getClass().getName();
- verifierName =
- verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
- return verifierName + "(" + rootPath + "@" + clusterName + ")";
- }
- }
-
- public static class MasterNbInExtViewVerifier implements ZkVerifier
- {
- private final String zkAddr;
- private final String clusterName;
- private final ZkClient zkClient;
-
- public MasterNbInExtViewVerifier(String zkAddr, String clusterName)
- {
- if (zkAddr == null || clusterName == null)
- {
- throw new IllegalArgumentException("requires zkAddr|clusterName");
- }
- this.zkAddr = zkAddr;
- this.clusterName = clusterName;
- this.zkClient = ZKClientPool.getZkClient(zkAddr);
- }
-
- @Override
- public boolean verify()
- {
- try
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName,
- new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
- return ClusterStateVerifier.verifyMasterNbInExtView(accessor);
- }
- catch (Exception e)
- {
- LOG.error("exception in verification", e);
- }
- return false;
- }
-
- @Override
- public ZkClient getZkClient()
- {
- return zkClient;
- }
-
- @Override
- public String getClusterName()
- {
- return clusterName;
- }
-
- }
-
- static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
- Map<String, Map<String, String>> errStates)
- {
- try
- {
- Builder keyBuilder = accessor.keyBuilder();
- // read cluster once and do verification
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(accessor);
-
- Map<String, IdealState> idealStates = cache.getIdealStates();
- if (idealStates == null) // || idealStates.isEmpty())
- {
- // ideal state is null because ideal state is dropped
- idealStates = Collections.emptyMap();
- }
-
- Map<String, ExternalView> extViews =
- accessor.getChildValuesMap(keyBuilder.externalViews());
- if (extViews == null) // || extViews.isEmpty())
- {
- extViews = Collections.emptyMap();
- }
-
- // if externalView is not empty and idealState doesn't exist
- // add empty idealState for the resource
- for (String resource : extViews.keySet())
- {
- if (!idealStates.containsKey(resource))
- {
- idealStates.put(resource, new IdealState(resource));
- }
- }
-
- // calculate best possible state
- BestPossibleStateOutput bestPossOutput =
- ClusterStateVerifier.calcBestPossState(cache);
-
- // set error states
- if (errStates != null)
- {
- for (String resourceName : errStates.keySet())
- {
- Map<String, String> partErrStates = errStates.get(resourceName);
- for (String partitionName : partErrStates.keySet())
- {
- String instanceName = partErrStates.get(partitionName);
- Map<String, String> partStateMap =
- bestPossOutput.getInstanceStateMap(resourceName,
- new Partition(partitionName));
- partStateMap.put(instanceName, "ERROR");
- }
- }
- }
-
-
- for (String resourceName : idealStates.keySet())
- {
- ExternalView extView = extViews.get(resourceName);
- if (extView == null)
- {
- LOG.info("externalView for " + resourceName + " is not available");
- return false;
- }
-
- // step 0: remove empty map and DROPPED state from best possible state
- Map<Partition, Map<String, String>> bpStateMap =
- bestPossOutput.getResourceMap(resourceName);
- Iterator<Entry<Partition, Map<String, String>>> iter =
- bpStateMap.entrySet().iterator();
- while (iter.hasNext())
- {
- Map.Entry<Partition, Map<String, String>> entry = iter.next();
- Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.isEmpty())
- {
- iter.remove();
- } else
- {
- // remove instances with DROPPED state
- Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
- while (insIter.hasNext())
- {
- Map.Entry<String, String> insEntry = insIter.next();
- String state = insEntry.getValue();
- if (state.equalsIgnoreCase("DROPPED"))
- {
- insIter.remove();
- }
- }
- }
- }
-
- // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
-
- // step 1: externalView and bestPossibleState has equal size
- int extViewSize = extView.getRecord().getMapFields().size();
- int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
- if (extViewSize != bestPossStateSize)
- {
- LOG.info("exterView size (" + extViewSize
- + ") is different from bestPossState size (" + bestPossStateSize
- + ") for resource: " + resourceName);
- // System.out.println("extView: " + extView.getRecord().getMapFields());
- // System.out.println("bestPossState: " +
- // bestPossOutput.getResourceMap(resourceName));
- return false;
- }
-
- // step 2: every entry in external view is contained in best possible state
- for (String partition : extView.getRecord().getMapFields().keySet())
- {
- Map<String, String> evInstanceStateMap =
- extView.getRecord().getMapField(partition);
- Map<String, String> bpInstanceStateMap =
- bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
-
- boolean result =
- ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
- bpInstanceStateMap);
- if (result == false)
- {
- LOG.info("externalView is different from bestPossibleState for partition:"
- + partition);
- return false;
- }
- }
- }
- return true;
- }
- catch (Exception e)
- {
- LOG.error("exception in verification", e);
- return false;
- }
-
- }
-
- static boolean verifyMasterNbInExtView(HelixDataAccessor accessor)
- {
- Builder keyBuilder = accessor.keyBuilder();
-
- Map<String, IdealState> idealStates =
- accessor.getChildValuesMap(keyBuilder.idealStates());
- if (idealStates == null || idealStates.size() == 0)
- {
- LOG.info("No resource idealState");
- return true;
- }
-
- Map<String, ExternalView> extViews =
- accessor.getChildValuesMap(keyBuilder.externalViews());
- if (extViews == null || extViews.size() < idealStates.size())
- {
- LOG.info("No externalViews | externalView.size() < idealState.size()");
- return false;
- }
-
- for (String resource : extViews.keySet())
- {
- int partitions = idealStates.get(resource).getNumPartitions();
- Map<String, Map<String, String>> instanceStateMap =
- extViews.get(resource).getRecord().getMapFields();
- if (instanceStateMap.size() < partitions)
- {
- LOG.info("Number of externalViews (" + instanceStateMap.size()
- + ") < partitions (" + partitions + ")");
- return false;
- }
-
- for (String partition : instanceStateMap.keySet())
- {
- boolean foundMaster = false;
- for (String instance : instanceStateMap.get(partition).keySet())
- {
- if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER"))
- {
- foundMaster = true;
- break;
- }
- }
- if (!foundMaster)
- {
- LOG.info("No MASTER for partition: " + partition);
- return false;
- }
- }
- }
- return true;
- }
-
- static void runStage(ClusterEvent event, Stage stage) throws Exception
- {
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- stage.process(event);
- stage.postProcess();
- }
-
- /**
- * calculate the best possible state note that DROPPED states are not checked since when
- * kick off the BestPossibleStateCalcStage we are providing an empty current state map
- *
- * @param cache
- * @return
- * @throws Exception
- */
-
- static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception
- {
- ClusterEvent event = new ClusterEvent("sampleEvent");
- event.addAttribute("ClusterDataCache", cache);
-
- ResourceComputationStage rcState = new ResourceComputationStage();
- CurrentStateComputationStage csStage = new CurrentStateComputationStage();
- BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
-
- runStage(event, rcState);
- runStage(event, csStage);
- runStage(event, bpStage);
-
- BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-
- // System.out.println("output:" + output);
- return output;
- }
-
- public static <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2)
- {
- boolean isEqual = true;
- if (map1 == null && map2 == null)
- {
- // OK
- }
- else if (map1 == null && map2 != null)
- {
- if (!map2.isEmpty())
- {
- isEqual = false;
- }
- }
- else if (map1 != null && map2 == null)
- {
- if (!map1.isEmpty())
- {
- isEqual = false;
- }
- }
- else
- {
- // verify size
- if (map1.size() != map2.size())
- {
- isEqual = false;
- }
- // verify each <key, value> in map1 is contained in map2
- for (K key : map1.keySet())
- {
- if (!map1.get(key).equals(map2.get(key)))
- {
- LOG.debug("different value for key: " + key + "(map1: " + map1.get(key)
- + ", map2: " + map2.get(key) + ")");
- isEqual = false;
- break;
- }
- }
- }
- return isEqual;
- }
-
- public static boolean verifyByPolling(Verifier verifier)
- {
- return verifyByPolling(verifier, 30 * 1000);
- }
-
- public static boolean verifyByPolling(Verifier verifier, long timeout)
- {
- return verifyByPolling(verifier, timeout, 1000);
- }
-
- public static boolean verifyByPolling(Verifier verifier, long timeout, long period)
- {
- long startTime = System.currentTimeMillis();
- boolean result = false;
- try
- {
- long curTime;
- do
- {
- Thread.sleep(period);
- result = verifier.verify();
- if (result == true)
- {
- break;
- }
- curTime = System.currentTimeMillis();
- }
- while (curTime <= startTime + timeout);
- return result;
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- long endTime = System.currentTimeMillis();
-
- // debug
- System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
- + "ms to verify");
-
- }
- return false;
- }
-
- public static boolean verifyByZkCallback(ZkVerifier verifier)
- {
- return verifyByZkCallback(verifier, 30000);
- }
-
- public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout)
- {
- long startTime = System.currentTimeMillis();
- CountDownLatch countDown = new CountDownLatch(1);
- ZkClient zkClient = verifier.getZkClient();
- String clusterName = verifier.getClusterName();
-
- // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
- // so when analyze zk log, we know when a test ends
- zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
-
- ExtViewVeriferZkListener listener =
- new ExtViewVeriferZkListener(countDown, zkClient, verifier);
-
- String extViewPath =
- PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
- zkClient.subscribeChildChanges(extViewPath, listener);
- for (String child : zkClient.getChildren(extViewPath))
- {
- String childPath =
- extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
- zkClient.subscribeDataChanges(childPath, listener);
- }
-
- // do initial verify
- boolean result = verifier.verify();
- if (result == false)
- {
- try
- {
- result = countDown.await(timeout, TimeUnit.MILLISECONDS);
- if (result == false)
- {
- // make a final try if timeout
- result = verifier.verify();
- }
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- // clean up
- zkClient.unsubscribeChildChanges(extViewPath, listener);
- for (String child : zkClient.getChildren(extViewPath))
- {
- String childPath =
- extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
- zkClient.unsubscribeDataChanges(childPath, listener);
- }
-
- long endTime = System.currentTimeMillis();
-
- zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
- // debug
- System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
-
- return result;
- }
-
- public static boolean verifyFileBasedClusterStates(String file,
- String instanceName,
- StateModelFactory<StateModel> stateModelFactory)
- {
- ClusterView clusterView = ClusterViewSerializer.deserialize(new File(file));
- boolean ret = true;
- int nonOfflineStateNr = 0;
-
- // ideal_state for instance with name $instanceName
- Map<String, String> instanceIdealStates = new HashMap<String, String>();
- for (ZNRecord idealStateItem : clusterView.getPropertyList(PropertyType.IDEALSTATES))
- {
- Map<String, Map<String, String>> idealStates = idealStateItem.getMapFields();
-
- for (Map.Entry<String, Map<String, String>> entry : idealStates.entrySet())
- {
- if (entry.getValue().containsKey(instanceName))
- {
- String state = entry.getValue().get(instanceName);
- instanceIdealStates.put(entry.getKey(), state);
- }
- }
- }
-
- Map<String, StateModel> currentStateMap = stateModelFactory.getStateModelMap();
-
- if (currentStateMap.size() != instanceIdealStates.size())
- {
- LOG.warn("Number of current states (" + currentStateMap.size() + ") mismatch "
- + "number of ideal states (" + instanceIdealStates.size() + ")");
- return false;
- }
-
- for (Map.Entry<String, String> entry : instanceIdealStates.entrySet())
- {
-
- String stateUnitKey = entry.getKey();
- String idealState = entry.getValue();
-
- if (!idealState.equalsIgnoreCase("offline"))
- {
- nonOfflineStateNr++;
- }
-
- if (!currentStateMap.containsKey(stateUnitKey))
- {
- LOG.warn("Current state does not contain " + stateUnitKey);
- // return false;
- ret = false;
- continue;
- }
-
- String curState = currentStateMap.get(stateUnitKey).getCurrentState();
- if (!idealState.equalsIgnoreCase(curState))
- {
- LOG.info("State mismatch--unit_key:" + stateUnitKey + " cur:" + curState
- + " ideal:" + idealState + " instance_name:" + instanceName);
- // return false;
- ret = false;
- continue;
- }
- }
-
- if (ret == true)
- {
- System.out.println(instanceName + ": verification succeed");
- LOG.info(instanceName + ": verification succeed (" + nonOfflineStateNr + " states)");
- }
-
- return ret;
- }
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions()
- {
- Option helpOption =
- OptionBuilder.withLongOpt(help)
- .withDescription("Prints command-line options info")
- .create();
-
- Option zkServerOption =
- OptionBuilder.withLongOpt(zkServerAddress)
- .withDescription("Provide zookeeper address")
- .create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption =
- OptionBuilder.withLongOpt(cluster)
- .withDescription("Provide cluster name")
- .create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option timeoutOption =
- OptionBuilder.withLongOpt(timeout)
- .withDescription("Timeout value for verification")
- .create();
- timeoutOption.setArgs(1);
- timeoutOption.setArgName("Timeout value (Optional), default=30s");
-
- Option sleepIntervalOption =
- OptionBuilder.withLongOpt(period)
- .withDescription("Polling period for verification")
- .create();
- sleepIntervalOption.setArgs(1);
- sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(timeoutOption);
- options.addOption(sleepIntervalOption);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions)
- {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.setWidth(1000);
- helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs)
- {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- // CommandLine cmd = null;
-
- try
- {
- return cliParser.parse(cliOptions, cliArgs);
- }
- catch (ParseException pe)
- {
- System.err.println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static boolean verifyState(String[] args)
- {
- // TODO Auto-generated method stub
- String clusterName = "storage-cluster";
- String zkServer = "localhost:2181";
- long timeoutValue = 0;
- long periodValue = 1000;
-
- if (args.length > 0)
- {
- CommandLine cmd = processCommandLineArgs(args);
- zkServer = cmd.getOptionValue(zkServerAddress);
- clusterName = cmd.getOptionValue(cluster);
- String timeoutStr = cmd.getOptionValue(timeout);
- String periodStr = cmd.getOptionValue(period);
- if (timeoutStr != null)
- {
- try
- {
- timeoutValue = Long.parseLong(timeoutStr);
- }
- catch (Exception e)
- {
- System.err.println("Exception in converting " + timeoutStr
- + " to long. Use default (0)");
- }
- }
-
- if (periodStr != null)
- {
- try
- {
- periodValue = Long.parseLong(periodStr);
- }
- catch (Exception e)
- {
- System.err.println("Exception in converting " + periodStr
- + " to long. Use default (1000)");
- }
- }
-
- }
- // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
- // timeoutValue,
- // periodValue);
-
- return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
- timeoutValue);
- }
-
- public static void main(String[] args)
- {
- boolean result = verifyState(args);
- System.out.println(result ? "Successful" : "failed");
- System.exit(1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/ClusterViewSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/ClusterViewSerializer.java b/helix-core/src/main/java/com/linkedin/helix/tools/ClusterViewSerializer.java
deleted file mode 100644
index 2cfc83e..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/ClusterViewSerializer.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-
-import com.linkedin.helix.ClusterView;
-import com.linkedin.helix.manager.file.StaticFileHelixManager;
-
-public class ClusterViewSerializer
-{
- private static Logger logger = Logger.getLogger(ClusterViewSerializer.class);
-
- public static void serialize(ClusterView view, File file)
- {
- ObjectMapper mapper = new ObjectMapper();
-
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- // serializationConfig.set(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, true);
-
- try
- {
- mapper.writeValue(file, view);
- }
- catch (Exception e)
- {
- logger.error("Error during serialization of data:" + view, e);
- }
- }
-
- public static byte[] serialize(ClusterView view)
- {
- ObjectMapper mapper = new ObjectMapper();
-
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
- serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- // serializationConfig.set(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, true);
-
- StringWriter sw = new StringWriter();
-
- try
- {
- mapper.writeValue(sw, view);
- return sw.toString().getBytes();
- }
- catch (Exception e)
- {
- logger.error("Error during serialization of data:" + view, e);
- }
-
- return new byte[0];
- }
-
- public static ClusterView deserialize(File file)
- {
- if (!file.exists())
- {
- logger.error(String.format("Static config file \"%s\" doesn't exist", file.getAbsolutePath()));
- return null;
- }
-
- ObjectMapper mapper = new ObjectMapper();
-
- DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
- // deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
-
- try
- {
- ClusterView view = mapper.readValue(file, ClusterView.class);
- return view;
- }
- catch (Exception e)
- {
- logger.error("Error during deserialization of file:" + file.getAbsolutePath(), e);
- }
-
- return null;
- }
-
-
- public static ClusterView deserialize(byte[] bytes)
- {
- ObjectMapper mapper = new ObjectMapper();
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-
- DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
- deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
- // deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
- deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
-
- try
- {
- ClusterView view = mapper.readValue(bais, ClusterView.class);
- return view;
- }
- catch (Exception e)
- {
- logger.error("Error during deserialization of bytes:" + new String(bytes), e);
- }
-
- return null;
- }
-
- public static void main(String[] args) throws JsonGenerationException,
- JsonMappingException, IOException
- {
- // temporary test only
- // create fake db names and nodes
- List<StaticFileHelixManager.DBParam> dbParams = new ArrayList<StaticFileHelixManager.DBParam>();
- // dbParams.add(new FileBasedClusterManager.DBParam("BizFollow", 1));
- dbParams.add(new StaticFileHelixManager.DBParam("BizProfile_qatest218a", 128));
- // dbParams.add(new FileBasedClusterManager.DBParam("EspressoDB", 10));
- // dbParams.add(new FileBasedClusterManager.DBParam("MailboxDB", 128));
- // dbParams.add(new FileBasedClusterManager.DBParam("MyDB", 8));
- // dbParams.add(new FileBasedClusterManager.DBParam("schemata", 1));
- String[] nodesInfo = { "localhost:8900", "localhost:8901",
- "localhost:8902", "localhost:8903",
- "localhost:8904" };
- // String[] nodesInfo = { "esv4-app75.stg.linkedin.com:12918" };
- int replica = 0;
-
- ClusterView view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, replica);
- String file = "/tmp/cluster-view-bizprofile.json";
- // ClusterViewSerializer serializer = new ClusterViewSerializer(file);
-
- byte[] bytes = ClusterViewSerializer.serialize(view);
- // logger.info("serialized bytes=" );
- // logger.info(new String(bytes));
- System.out.println("serialized bytes=");
- System.out.println(new String(bytes));
-
- ClusterView restoredView = ClusterViewSerializer.deserialize(bytes);
- // logger.info(restoredView);
-
- bytes = ClusterViewSerializer.serialize(restoredView);
- // logger.info(new String(bytes));
- System.out.println("restored cluster view=");
- System.out.println(new String(bytes));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/IdealCalculatorByConsistentHashing.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/IdealCalculatorByConsistentHashing.java b/helix-core/src/main/java/com/linkedin/helix/tools/IdealCalculatorByConsistentHashing.java
deleted file mode 100644
index ca6a410..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/IdealCalculatorByConsistentHashing.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-
-public class IdealCalculatorByConsistentHashing
-{
- /**
- * Interface to calculate the hash function value of a string
- */
- public interface HashFunction
- {
- public int getHashValue(String key);
- }
-
- /**
- * The default string hash function. Same as the default function used by
- * Voldmort
- */
- public static class FnvHash implements HashFunction
- {
- private static final long FNV_BASIS = 0x811c9dc5;
- private static final long FNV_PRIME = (1 << 24) + 0x193;
- public static final long FNV_BASIS_64 = 0xCBF29CE484222325L;
- public static final long FNV_PRIME_64 = 1099511628211L;
-
- public int hash(byte[] key)
- {
- long hash = FNV_BASIS;
- for (int i = 0; i < key.length; i++)
- {
- hash ^= 0xFF & key[i];
- hash *= FNV_PRIME;
- }
- return (int) hash;
- }
-
- public long hash64(long val)
- {
- long hashval = FNV_BASIS_64;
- for (int i = 0; i < 8; i++)
- {
- long octet = val & 0x00ff;
- val = val >> 8;
- hashval = hashval ^ octet;
- hashval = hashval * FNV_PRIME_64;
- }
- return Math.abs(hashval);
- }
-
- @Override
- public int getHashValue(String key)
- {
- return hash(key.getBytes());
- }
-
- }
-
- /**
- * Calculate the ideal state for list of instances clusters using consistent
- * hashing.
- *
- * @param instanceNames
- * List of instance names.
- * @param partitions
- * the partition number of the database
- * @param replicas
- * the replication degree
- * @param dbName
- * the name of the database
- * @return The ZNRecord that contains the ideal state
- */
- public static ZNRecord calculateIdealState(List<String> instanceNames,
- int partitions, int replicas, String dbName, HashFunction hashFunc)
- {
- return calculateIdealState(instanceNames, partitions, replicas, dbName,
- hashFunc, 65536);
- }
-
- /**
- * Calculate the ideal state for list of instances clusters using consistent
- * hashing.
- *
- * @param instanceNames
- * List of instance names.
- * @param partitions
- * the partition number of the database
- * @param replicas
- * the replication degree
- * @param dbName
- * the name of the database
- * @param hashringSize
- * the size of the hash ring used by consistent hashing
- * @return The ZNRecord that contains the ideal state
- */
- public static ZNRecord calculateIdealState(List<String> instanceNames,
- int partitions, int replicas, String dbName, HashFunction hashFunc,
- int hashRingSize)
- {
- ZNRecord result = new ZNRecord(dbName);
-
- int[] hashRing = generateEvenHashRing(instanceNames, hashRingSize);
- result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
- Random rand = new Random(0xc0ffee);
- for (int i = 0; i < partitions; i++)
- {
- String partitionName = dbName + ".partition-" + i;
- int hashPos = rand.nextInt() % hashRingSize;
- // (int)(hashFunc.getHashValue(partitionName) % hashRingSize);
- hashPos = hashPos < 0 ? (hashPos + hashRingSize) : hashPos;
- // System.out.print(hashPos+ " ");
- // if(i % 120 == 0) System.out.println();
- Map<String, String> partitionAssignment = new TreeMap<String, String>();
- // the first in the list is the node that contains the master
- int masterPos = hashRing[hashPos];
- partitionAssignment.put(instanceNames.get(masterPos), "MASTER");
-
- // partitionAssignment.put("hash", "" + hashPos + " " + masterPos);
-
- // Put slaves in next has ring positions. We need to make sure that no
- // more than 2 slaves
- // are mapped to one node.
- for (int j = 1; j <= replicas; j++)
- {
- String next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
- while (partitionAssignment.containsKey(next))
- {
- hashPos++;
- next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
- }
- partitionAssignment.put(next, "SLAVE");
- }
- result.setMapField(partitionName, partitionAssignment);
- }
- return result;
- }
-
- /**
- * Generate the has ring for consistent hashing.
- *
- * @param instanceNames
- * List of instance names.
- * @param hashringSize
- * the size of the hash ring used by consistent hashing
- * @return The int array as the hashing. it contains random values ranges from
- * 0..size of instanceNames-1
- */
- public static int[] generateHashRing(List<String> instanceNames,
- int hashRingSize)
- {
- int[] result = new int[hashRingSize];
- for (int i = 0; i < result.length; i++)
- {
- result[i] = 0;
- }
- int instances = instanceNames.size();
- // The following code generates the random distribution
- for (int i = 1; i < instances; i++)
- {
- putNodeOnHashring(result, i, hashRingSize / (i + 1), i);
- }
- return result;
- }
-
- public static int[] generateEvenHashRing(List<String> instanceNames,
- int hashRingSize)
- {
- int[] result = new int[hashRingSize];
- for (int i = 0; i < result.length; i++)
- {
- result[i] = 0;
- }
- int instances = instanceNames.size();
- // The following code generates the random distribution
- for (int i = 1; i < instances; i++)
- {
- putNodeEvenOnHashRing(result, i, i + 1);
- }
- return result;
- }
-
- private static void putNodeEvenOnHashRing(int[] hashRing, int nodeVal,
- int totalValues)
- {
- int newValNum = hashRing.length / totalValues;
- assert (newValNum > 0);
- Map<Integer, List<Integer>> valueIndex = buildValueIndex(hashRing);
- int nSources = valueIndex.size();
- int remainder = newValNum % nSources;
-
- List<List<Integer>> positionLists = new ArrayList<List<Integer>>();
- for (List<Integer> list : valueIndex.values())
- {
- positionLists.add(list);
- }
- class ListComparator implements Comparator<List<Integer>>
- {
- @Override
- public int compare(List<Integer> o1, List<Integer> o2)
- {
- return (o1.size() > o2.size() ? -1 : (o1.size() == o2.size() ? 0 : 1));
- }
- }
- Collections.sort(positionLists, new ListComparator());
-
- for (List<Integer> oldValPositions : positionLists)
- {
- // List<Integer> oldValPositions = valueIndex.get(oldVal);
- int nValsToReplace = newValNum / nSources;
- assert (nValsToReplace > 0);
- if (remainder > 0)
- {
- nValsToReplace++;
- remainder--;
- }
- // System.out.print(oldValPositions.size()+" "+nValsToReplace+" ");
- putNodeValueOnHashRing(hashRing, nodeVal, nValsToReplace, oldValPositions);
- // randomly take nValsToReplace positions in oldValPositions and make them
- }
- // System.out.println();
- }
-
- private static void putNodeValueOnHashRing(int[] hashRing, int nodeVal,
- int numberOfValues, List<Integer> positions)
- {
- Random rand = new Random(nodeVal);
- // initialize the index array
- int[] index = new int[positions.size()];
- for (int i = 0; i < index.length; i++)
- {
- index[i] = i;
- }
-
- int nodesLeft = index.length;
-
- for (int i = 0; i < numberOfValues; i++)
- {
- // Calculate a random index
- int randIndex = rand.nextInt() % nodesLeft;
- if (randIndex < 0)
- {
- randIndex += nodesLeft;
- }
- hashRing[positions.get(index[randIndex])] = nodeVal;
-
- // swap the random index and the last available index, and decrease the
- // nodes left
- int temp = index[randIndex];
- index[randIndex] = index[nodesLeft - 1];
- index[nodesLeft - 1] = temp;
- nodesLeft--;
- }
- }
-
- private static Map<Integer, List<Integer>> buildValueIndex(int[] hashRing)
- {
- Map<Integer, List<Integer>> result = new TreeMap<Integer, List<Integer>>();
- for (int i = 0; i < hashRing.length; i++)
- {
- if (!result.containsKey(hashRing[i]))
- {
- List<Integer> list = new ArrayList<Integer>();
- result.put(hashRing[i], list);
- }
- result.get(hashRing[i]).add(i);
- }
- return result;
- }
-
- /**
- * Uniformly put node values on the hash ring. Derived from the shuffling
- * algorithm
- *
- * @param result
- * the hash ring array.
- * @param nodeValue
- * the int value to be added to the hash ring this time
- * @param numberOfNodes
- * number of node values to put on the hash ring array
- * @param randomSeed
- * the random seed
- */
- public static void putNodeOnHashring(int[] result, int nodeValue,
- int numberOfNodes, int randomSeed)
- {
- Random rand = new Random(randomSeed);
- // initialize the index array
- int[] index = new int[result.length];
- for (int i = 0; i < index.length; i++)
- {
- index[i] = i;
- }
-
- int nodesLeft = index.length;
-
- for (int i = 0; i < numberOfNodes; i++)
- {
- // Calculate a random index
- int randIndex = rand.nextInt() % nodesLeft;
- if (randIndex < 0)
- {
- randIndex += nodesLeft;
- }
- if (result[index[randIndex]] == nodeValue)
- {
- assert (false);
- }
- result[index[randIndex]] = nodeValue;
-
- // swap the random index and the last available index, and decrease the
- // nodes left
- int temp = index[randIndex];
- index[randIndex] = index[nodesLeft - 1];
- index[nodesLeft - 1] = temp;
-
- nodesLeft--;
- }
- }
-
- /**
- * Helper function to see how many partitions are mapped to different
- * instances in two ideal states
- * */
- public static void printDiff(ZNRecord record1, ZNRecord record2)
- {
- int diffCount = 0;
- for (String key : record1.getMapFields().keySet())
- {
- Map<String, String> map1 = record1.getMapField(key);
- Map<String, String> map2 = record2.getMapField(key);
-
- for (String k : map1.keySet())
- {
- if (!map2.containsKey(k))
- {
- diffCount++;
- } else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
- {
- diffCount++;
- }
- }
- }
- System.out.println("diff count = " + diffCount);
- }
-
- /**
- * Helper function to compare the difference between two hashing buffers
- * */
- public static void compareHashrings(int[] ring1, int[] ring2)
- {
- int diff = 0;
- for (int i = 0; i < ring1.length; i++)
- {
- if (ring1[i] != ring2[i])
- {
- diff++;
- }
- }
- System.out.println("ring diff: " + diff);
- }
-
- public static void printNodeOfflineOverhead(ZNRecord record)
- {
- // build node -> partition map
- Map<String, Set<String>> nodeNextMap = new TreeMap<String, Set<String>>();
- for (String partitionName : record.getMapFields().keySet())
- {
- Map<String, String> map1 = record.getMapField(partitionName);
- String master = "", slave = "";
- for (String nodeName : map1.keySet())
- {
- if (!nodeNextMap.containsKey(nodeName))
- {
- nodeNextMap.put(nodeName, new TreeSet<String>());
- }
-
- // String master = "", slave = "";
- if (map1.get(nodeName).equalsIgnoreCase("MASTER"))
- {
- master = nodeName;
- } else
- {
- if (slave.equalsIgnoreCase(""))
- {
- slave = nodeName;
- }
- }
-
- }
- nodeNextMap.get(master).add(slave);
- }
- System.out.println("next count: ");
- for (String key : nodeNextMap.keySet())
- {
- System.out.println(nodeNextMap.get(key).size() + " ");
- }
- System.out.println();
- }
-
- /**
- * Helper function to calculate and print the standard deviation of the
- * partition assignment ideal state, also the min/max of master partitions
- * that is hosted on each node
- * */
- public static void printIdealStateStats(ZNRecord record, String value)
- {
- Map<String, Integer> countsMap = new TreeMap<String, Integer>();
- for (String key : record.getMapFields().keySet())
- {
- Map<String, String> map1 = record.getMapField(key);
- for (String k : map1.keySet())
- {
- if (!countsMap.containsKey(k))
- {
- countsMap.put(k, new Integer(0));//
- }
- if (value.equals("") || map1.get(k).equalsIgnoreCase(value))
- {
- countsMap.put(k, countsMap.get(k).intValue() + 1);
- }
- }
- }
- double sum = 0;
- int maxCount = 0;
- int minCount = Integer.MAX_VALUE;
-
- System.out.println("Partition distributions: ");
- for (String k : countsMap.keySet())
- {
- int count = countsMap.get(k);
- sum += count;
- if (maxCount < count)
- {
- maxCount = count;
- }
- if (minCount > count)
- {
- minCount = count;
- }
- System.out.print(count + " ");
- }
- System.out.println();
- double mean = sum / (countsMap.size());
- // calculate the deviation of the node distribution
- double deviation = 0;
- for (String k : countsMap.keySet())
- {
- double count = countsMap.get(k);
- deviation += (count - mean) * (count - mean);
- }
- System.out.println("Mean: " + mean + " normal deviation:"
- + Math.sqrt(deviation / countsMap.size()));
-
- System.out.println("Max count: " + maxCount + " min count:" + minCount);
- /*
- * int steps = 10; int stepLen = (maxCount - minCount)/steps; List<Integer>
- * histogram = new ArrayList<Integer>((maxCount - minCount)/stepLen + 1);
- *
- * for(int i = 0; i< (maxCount - minCount)/stepLen + 1; i++) {
- * histogram.add(0); } for(String k :countsMap.keySet()) { int count =
- * countsMap.get(k); int stepNo = (count - minCount)/stepLen;
- * histogram.set(stepNo, histogram.get(stepNo) +1); }
- * System.out.println("histogram:"); for(Integer x : histogram) {
- * System.out.print(x+" "); }
- */
- }
-
- public static void printHashRingStat(int[] hashRing)
- {
- double sum = 0, mean = 0, deviation = 0;
- Map<Integer, Integer> countsMap = new TreeMap<Integer, Integer>();
- for (int i = 0; i < hashRing.length; i++)
- {
- if (!countsMap.containsKey(hashRing[i]))
- {
- countsMap.put(hashRing[i], new Integer(0));//
- }
- countsMap.put(hashRing[i], countsMap.get(hashRing[i]).intValue() + 1);
- }
- int maxCount = Integer.MIN_VALUE;
- int minCount = Integer.MAX_VALUE;
- for (Integer k : countsMap.keySet())
- {
- int count = countsMap.get(k);
- sum += count;
- if (maxCount < count)
- {
- maxCount = count;
- }
- if (minCount > count)
- {
- minCount = count;
- }
- }
- mean = sum / countsMap.size();
- for (Integer k : countsMap.keySet())
- {
- int count = countsMap.get(k);
- deviation += (count - mean) * (count - mean);
- }
- System.out.println("hashring Mean: " + mean + " normal deviation:"
- + Math.sqrt(deviation / countsMap.size()));
-
- }
-
- static int[] getFnvHashArray(List<String> strings)
- {
- int[] result = new int[strings.size()];
- int i = 0;
- IdealCalculatorByConsistentHashing.FnvHash hashfunc = new IdealCalculatorByConsistentHashing.FnvHash();
- for (String s : strings)
- {
- int val = hashfunc.getHashValue(s) % 65536;
- if (val < 0)
- val += 65536;
- result[i++] = val;
- }
- return result;
- }
-
- static void printArrayStat(int[] vals)
- {
- double sum = 0, mean = 0, deviation = 0;
-
- for (int i = 0; i < vals.length; i++)
- {
- sum += vals[i];
- }
- mean = sum / vals.length;
- for (int i = 0; i < vals.length; i++)
- {
- deviation += (mean - vals[i]) * (mean - vals[i]);
- }
- System.out.println("normalied deviation: "
- + Math.sqrt(deviation / vals.length) / mean);
- }
-
- public static void main(String args[]) throws Exception
- {
- // Test the hash ring generation
- List<String> instanceNames = new ArrayList<String>();
- for (int i = 0; i < 10; i++)
- {
- instanceNames.add("localhost_123" + i);
- }
-
- // int[] ring1 =
- // IdealCalculatorByConsistentHashing.generateEvenHashRing(instanceNames,
- // 65535);
- // printHashRingStat(ring1);
- // int[] ring1 = getFnvHashArray(instanceNames);
- // printArrayStat(ring1);
-
- int partitions = 200, replicas = 2;
- String dbName = "espressoDB1";
-
- ZNRecord result = IdealCalculatorByConsistentHashing.calculateIdealState(
- instanceNames, partitions, replicas, dbName,
- new IdealCalculatorByConsistentHashing.FnvHash());
- System.out.println("\nMaster :");
- printIdealStateStats(result, "MASTER");
-
- System.out.println("\nSlave :");
- printIdealStateStats(result, "SLAVE");
-
- System.out.println("\nTotal :");
- printIdealStateStats(result, "");
-
- printNodeOfflineOverhead(result);
- /*
- * ZNRecordSerializer serializer = new ZNRecordSerializer(); byte[] bytes;
- * bytes = serializer.serialize(result); // System.out.println(new
- * String(bytes));
- *
- * List<String> instanceNames2 = new ArrayList<String>(); for(int i = 0;i <
- * 40; i++) { instanceNames2.add("localhost_123"+i); }
- *
- * ZNRecord result2 =
- * IdealCalculatorByConsistentHashing.calculateIdealState( instanceNames2,
- * partitions, replicas, dbName, new
- * IdealCalculatorByConsistentHashing.FnvHash());
- *
- * printDiff(result, result2);
- *
- * //IdealCalculatorByConsistentHashing.printIdealStateStats(result2);
- *
- *
- *
- * int[] ring2 =
- * IdealCalculatorByConsistentHashing.generateHashRing(instanceNames2,
- * 30000);
- *
- * IdealCalculatorByConsistentHashing.compareHashrings(ring1, ring2);
- * //printNodeStats(result); //printNodeStats(result2); bytes =
- * serializer.serialize(result2); printHashRingStat(ring2); //
- * System.out.println(new String(bytes));
- */
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByRush.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByRush.java b/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByRush.java
deleted file mode 100644
index 21ca8b9..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByRush.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-
-public class IdealStateCalculatorByRush
-{
- /**
- * Build the config map for RUSH algorithm. The input of RUSH algorithm groups
- * nodes into "cluster"s, and different clusters can be assigned with
- * different weights.
- *
- * @param numClusters
- * number of node clusters
- * @param instancesPerCluster
- * List of clusters, each contain a list of node name strings.
- * @param replicationDegree
- * the replication degree
- * @param clusterWeights
- * the weight for each node cluster
- * @return this config map structure for RUSH algorithm.
- */
- static HashMap<String, Object> buildRushConfig(int numClusters,
- List<List<String>> instancesPerCluster, int replicationDegree,
- List<Integer> clusterWeights)
- {
- HashMap<String, Object> config = new HashMap<String, Object>();
- config.put("replicationDegree", replicationDegree);
-
- HashMap[] clusterList = new HashMap[numClusters];
- config.put("subClusters", clusterList);
-
- HashMap[] nodes;
- HashMap<String, String> node;
- HashMap<String, Object> clusterData;
- for (int n = 0; n < numClusters; n++)
- {
- int numNodes = instancesPerCluster.get(n).size();
- nodes = new HashMap[numNodes];
- for (int i = 0; i < numNodes; i++)
- {
- node = new HashMap<String, String>();
- node.put("partition", instancesPerCluster.get(n).get(i));
- nodes[i] = node;
- }
- clusterData = new HashMap<String, Object>();
- clusterData.put("weight", clusterWeights.get(n));
- clusterData.put("nodes", nodes);
- clusterList[n] = clusterData;
- }
- return config;
- }
-
- /**
- * Calculate the ideal state for list of instances clusters.
- *
- * @param numClusters
- * number of node clusters
- * @param instanceClusters
- * List of clusters, each contain a list of node name strings.
- * @param instanceClusterWeights
- * the weight for each instance cluster
- * @param partitions
- * the partition number of the database
- * @param replicas
- * the replication degree
- * @param dbName
- * the name of the database
- * @return The ZNRecord that contains the ideal state
- */
- public static ZNRecord calculateIdealState(
- List<List<String>> instanceClusters,
- List<Integer> instanceClusterWeights, int partitions, int replicas,
- String dbName) throws Exception
- {
- ZNRecord result = new ZNRecord(dbName);
-
- int numberOfClusters = instanceClusters.size();
- List<List<String>> nodesInClusters = instanceClusters;
- List<Integer> clusterWeights = instanceClusterWeights;
-
- HashMap<String, Object> rushConfig = buildRushConfig(numberOfClusters,
- nodesInClusters, replicas + 1, clusterWeights);
- RUSHrHash rushHash = new RUSHrHash(rushConfig);
-
- Random r = new Random(0);
- for (int i = 0; i < partitions; i++)
- {
- int partitionId = i;
- String partitionName = dbName + ".partition-" + partitionId;
-
- ArrayList<HashMap> partitionAssignmentResult = rushHash
- .findNode(i);
- List<String> nodeNames = new ArrayList<String>();
- for (HashMap p : partitionAssignmentResult)
- {
- for (Object key : p.keySet())
- {
- if (p.get(key) instanceof String)
- {
- nodeNames.add(p.get(key).toString());
- }
- }
- }
- Map<String, String> partitionAssignment = new TreeMap<String, String>();
-
- for (int j = 0; j < nodeNames.size(); j++)
- {
- partitionAssignment.put(nodeNames.get(j), "SLAVE");
- }
- int master = r.nextInt(nodeNames.size());
- //master = nodeNames.size()/2;
- partitionAssignment.put(nodeNames.get(master), "MASTER");
-
-
- result.setMapField(partitionName, partitionAssignment);
- }
- result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
- return result;
- }
-
- public static ZNRecord calculateIdealState(
- List<String> instanceClusters,
- int instanceClusterWeight, int partitions, int replicas,
- String dbName) throws Exception
- {
- List<List<String>> instanceClustersList = new ArrayList<List<String>>();
- instanceClustersList.add(instanceClusters);
-
- List<Integer> instanceClusterWeightList = new ArrayList<Integer>();
- instanceClusterWeightList.add(instanceClusterWeight);
-
- return calculateIdealState(
- instanceClustersList,
- instanceClusterWeightList, partitions, replicas,
- dbName);
- }
- /**
- * Helper function to see how many partitions are mapped to different
- * instances in two ideal states
- * */
- public static void printDiff(ZNRecord record1, ZNRecord record2)
- {
- int diffCount = 0;
- int diffCountMaster = 0;
- for (String key : record1.getMapFields().keySet())
- {
- Map<String, String> map1 = record1.getMapField(key);
- Map<String, String> map2 = record2.getMapField(key);
-
- for (String k : map1.keySet())
- {
- if (!map2.containsKey(k))
- {
- diffCount++;
- }
- else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
- {
- diffCountMaster++;
- }
- }
- }
- System.out.println("\ndiff count = " + diffCount);
- System.out.println("\nmaster diff count:"+ diffCountMaster);
- }
-
- /**
- * Helper function to calculate and print the standard deviation of the
- * partition assignment ideal state.
- * */
- public static void printIdealStateStats(ZNRecord record)
- {
- Map<String, Integer> countsMap = new TreeMap<String, Integer>();
- Map<String, Integer> masterCountsMap = new TreeMap<String, Integer>();
- for (String key : record.getMapFields().keySet())
- {
- Map<String, String> map1 = record.getMapField(key);
- for (String k : map1.keySet())
- {
- if (!countsMap.containsKey(k))
- {
- countsMap.put(k, new Integer(0));
- }
- else
- {
- countsMap.put(k, countsMap.get(k).intValue() + 1);
- }
- if (!masterCountsMap.containsKey(k))
- {
- masterCountsMap.put(k, new Integer(0));
-
- }
- else if (map1.get(k).equalsIgnoreCase("MASTER"))
- {
- masterCountsMap.put(k, masterCountsMap.get(k).intValue() + 1);
- }
- }
- }
- double sum = 0;
- int maxCount = 0;
- int minCount = Integer.MAX_VALUE;
- for (String k : countsMap.keySet())
- {
- int count = countsMap.get(k);
- sum += count;
- if (maxCount < count)
- {
- maxCount = count;
- }
- if (minCount > count)
- {
- minCount = count;
- }
- System.out.print(count + " ");
- }
- System.out.println("\nMax count: " + maxCount + " min count:" + minCount);
- System.out.println("\n master:");
- double sumMaster = 0;
- int maxCountMaster = 0;
- int minCountMaster = Integer.MAX_VALUE;
- for (String k : masterCountsMap.keySet())
- {
- int count = masterCountsMap.get(k);
- sumMaster += count;
- if (maxCountMaster < count)
- {
- maxCountMaster = count;
- }
- if (minCountMaster > count)
- {
- minCountMaster = count;
- }
- System.out.print(count + " ");
- }
- System.out.println("\nMean master: "+ 1.0*sumMaster/countsMap.size());
- System.out.println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
- double mean = sum / (countsMap.size());
- // calculate the deviation of the node distribution
- double deviation = 0;
- for (String k : countsMap.keySet())
- {
- double count = countsMap.get(k);
- deviation += (count - mean) * (count - mean);
- }
- System.out.println("Mean: " + mean + " normal deviation:"
- + Math.sqrt(deviation / countsMap.size()) / mean);
-
- //System.out.println("Max count: " + maxCount + " min count:" + minCount);
- int steps = 10;
- int stepLen = (maxCount - minCount) / steps;
- if(stepLen == 0) return;
- List<Integer> histogram = new ArrayList<Integer>((maxCount - minCount)
- / stepLen + 1);
-
- for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++)
- {
- histogram.add(0);
- }
- for (String k : countsMap.keySet())
- {
- int count = countsMap.get(k);
- int stepNo = (count - minCount) / stepLen;
- histogram.set(stepNo, histogram.get(stepNo) + 1);
- }
- System.out.println("histogram:");
- for (Integer x : histogram)
- {
- System.out.print(x + " ");
- }
- }
-
- public static void main(String args[]) throws Exception
- {
- int partitions = 4096, replicas = 2;
- String dbName = "espressoDB1";
- List<String> instanceNames = new ArrayList<String>();
- List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
- for (int i = 0; i < 20; i++)
- {
- instanceNames.add("local"+i+"host_123" + i);
- }
- instanceCluster1.add(instanceNames);
- List<Integer> weights1 = new ArrayList<Integer>();
- weights1.add(1);
- ZNRecord result = IdealStateCalculatorByRush.calculateIdealState(
- instanceCluster1, weights1, partitions, replicas, dbName);
-
- printIdealStateStats(result);
-
- List<String> instanceNames2 = new ArrayList<String>();
- for (int i = 400; i < 405; i++)
- {
- instanceNames2.add("localhost_123" + i);
- }
- instanceCluster1.add(instanceNames2);
- weights1.add(1);
- ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(
- instanceCluster1, weights1, partitions, replicas, dbName);
-
- printDiff(result, result2);
- printIdealStateStats(result2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByShuffling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByShuffling.java b/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByShuffling.java
deleted file mode 100644
index f381e61..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorByShuffling.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.TreeMap;
-
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.IdealState.IdealStateProperty;
-
-/*
- * Ideal state calculator for the cluster manager V1. The ideal state is
- * calculated by randomly assign master partitions to storage nodes.
- *
- * Note that the following code is a native strategy and is for cluster manager V1 only. We will
- * use the other algorithm to calculate the ideal state in future milestones.
- *
- *
- * */
-
-public class IdealStateCalculatorByShuffling
-{
- /*
- * Given the number of nodes, partitions and replica number, calculate the
- * ideal state in the following manner: For the master partition assignment,
- * 1. construct Arraylist partitionList, with partitionList[i] = i; 2. Shuffle
- * the partitions array 3. Scan the shuffled array, then assign
- * partitionList[i] to node (i % nodes)
- *
- * for the slave partitions, simply put them in the node after the node that
- * contains the master partition.
- *
- * The result of the method is a ZNRecord, which contains a list of maps; each
- * map is from the name of nodes to either "MASTER" or "SLAVE".
- */
-
-
- public static ZNRecord calculateIdealState(List<String> instanceNames,
- int partitions, int replicas, String dbName, long randomSeed)
- {
- return calculateIdealState(instanceNames, partitions, replicas, dbName, randomSeed, "MASTER", "SLAVE");
- }
- public static ZNRecord calculateIdealState(List<String> instanceNames,
- int partitions, int replicas, String dbName, long randomSeed, String masterValue, String slaveValue)
- {
- if (instanceNames.size() <= replicas)
- {
- throw new IllegalArgumentException(
- "Replicas must be less than number of storage nodes");
- }
-
- Collections.sort(instanceNames);
-
- ZNRecord result = new ZNRecord(dbName);
-
- List<Integer> partitionList = new ArrayList<Integer>(partitions);
- for (int i = 0; i < partitions; i++)
- {
- partitionList.add(new Integer(i));
- }
- Random rand = new Random(randomSeed);
- // Shuffle the partition list
- Collections.shuffle(partitionList, rand);
-
- for (int i = 0; i < partitionList.size(); i++)
- {
- int partitionId = partitionList.get(i);
- Map<String, String> partitionAssignment = new TreeMap<String, String>();
- int masterNode = i % instanceNames.size();
- // the first in the list is the node that contains the master
- partitionAssignment.put(instanceNames.get(masterNode), masterValue);
-
- // for the jth replica, we put it on (masterNode + j) % nodes-th
- // node
- for (int j = 1; j <= replicas; j++)
- {
- int index = (masterNode + j * partitionList.size()) % instanceNames.size();
- while(partitionAssignment.keySet().contains(instanceNames.get(index)))
- {
- index = (index +1) % instanceNames.size();
- }
- partitionAssignment
- .put(instanceNames.get(index),
- slaveValue);
- }
- String partitionName = dbName + "_" + partitionId;
- result.setMapField(partitionName, partitionAssignment);
- }
- result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
- return result;
- }
-
- public static ZNRecord calculateIdealState(List<String> instanceNames,
- int partitions, int replicas, String dbName)
- {
- long randomSeed = 888997632;
- // seed is a constant, so that the shuffle always give same result
- return calculateIdealState(instanceNames, partitions, replicas, dbName,
- randomSeed);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForEspressoRelay.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForEspressoRelay.java b/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForEspressoRelay.java
deleted file mode 100644
index a525abf..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/tools/IdealStateCalculatorForEspressoRelay.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.linkedin.helix.tools;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.model.IdealState;
-
-public class IdealStateCalculatorForEspressoRelay
-{
- public static IdealState calculateRelayIdealState(List<String> partitions, List<String> instances,
- String resultRecordName, int replica, String firstValue, String restValue, String stateModelName)
- {
- Collections.sort(partitions);
- Collections.sort(instances);
- if(instances.size() % replica != 0)
- {
- throw new HelixException("Instances must be divided by replica");
- }
-
- IdealState result = new IdealState(resultRecordName);
- result.setNumPartitions(partitions.size());
- result.setReplicas("" + replica);
- result.setStateModelDefRef(stateModelName);
-
- int groups = instances.size() / replica;
- int remainder = instances.size() % replica;
-
- int remainder2 = partitions.size() % groups;
- int storageNodeGroupSize = partitions.size() / groups;
-
- for(int i = 0; i < groups; i++)
- {
- int relayStart = 0, relayEnd = 0, storageNodeStart = 0, storageNodeEnd = 0;
- if(i < remainder)
- {
- relayStart = (replica + 1) * i;
- relayEnd = (replica + 1) *(i + 1);
- }
- else
- {
- relayStart = (replica + 1) * remainder + replica * (i - remainder);
- relayEnd = relayStart + replica;
- }
- //System.out.println("relay start :" + relayStart + " relayEnd:" + relayEnd);
- if(i < remainder2)
- {
- storageNodeStart = (storageNodeGroupSize + 1) * i;
- storageNodeEnd = (storageNodeGroupSize + 1) *(i + 1);
- }
- else
- {
- storageNodeStart = (storageNodeGroupSize + 1) * remainder2 + storageNodeGroupSize * (i - remainder2);
- storageNodeEnd = storageNodeStart + storageNodeGroupSize;
- }
-
- //System.out.println("storageNodeStart :" + storageNodeStart + " storageNodeEnd:" + storageNodeEnd);
- List<String> snBatch = partitions.subList(storageNodeStart, storageNodeEnd);
- List<String> relayBatch = instances.subList(relayStart, relayEnd);
-
- Map<String, List<String>> sublistFields = calculateSubIdealState(snBatch, relayBatch, replica);
-
- result.getRecord().getListFields().putAll(sublistFields);
- }
-
- for(String snName : result.getRecord().getListFields().keySet())
- {
- Map<String, String> mapField = new TreeMap<String, String>();
- List<String> relayCandidates = result.getRecord().getListField(snName);
- mapField.put(relayCandidates.get(0), firstValue);
- for(int i = 1; i < relayCandidates.size(); i++)
- {
- mapField.put(relayCandidates.get(i), restValue);
- }
- result.getRecord().getMapFields().put(snName, mapField);
- }
- System.out.println();
- return result;
- }
-
- private static Map<String, List<String>> calculateSubIdealState(
- List<String> snBatch, List<String> relayBatch, int replica)
- {
- Map<String, List<String>> result = new HashMap<String, List<String>>();
- for(int i = 0; i < snBatch.size(); i++)
- {
- String snName = snBatch.get(i);
- result.put(snName, new ArrayList<String>());
- for(int j = 0; j < replica; j++)
- {
- result.get(snName).add(relayBatch.get((j + i) % (relayBatch.size())));
- }
- }
- return result;
- }
-}