You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[22/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
new file mode 100644
index 0000000..2230d97
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -0,0 +1,909 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.ClusterView;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.file.FileHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.PropertyJsonComparator;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+
+
+public class ClusterStateVerifier
+{
+ public static String cluster = "cluster";
+ public static String zkServerAddress = "zkSvr";
+ public static String help = "help";
+ public static String timeout = "timeout";
+ public static String period = "period";
+
+ private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
+
+ public interface Verifier
+ {
+ boolean verify();
+ }
+
+ public interface ZkVerifier extends Verifier
+ {
+ ZkClient getZkClient();
+
+ String getClusterName();
+ }
+
+ static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener
+ {
+ final CountDownLatch _countDown;
+ final ZkClient _zkClient;
+ final Verifier _verifier;
+
+ public ExtViewVeriferZkListener(CountDownLatch countDown,
+ ZkClient zkClient,
+ ZkVerifier verifier)
+ {
+ _countDown = countDown;
+ _zkClient = zkClient;
+ _verifier = verifier;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception
+ {
+ boolean result = _verifier.verify();
+ if (result == true)
+ {
+ _countDown.countDown();
+ }
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+ {
+ for (String child : currentChilds)
+ {
+ String childPath =
+ parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
+ _zkClient.subscribeDataChanges(childPath, this);
+ }
+
+ boolean result = _verifier.verify();
+ if (result == true)
+ {
+ _countDown.countDown();
+ }
+ }
+
+ }
+
+ /**
+ * verifier that verifies best possible state and external view
+ */
+ public static class BestPossAndExtViewZkVerifier implements ZkVerifier
+ {
+ private final String zkAddr;
+ private final String clusterName;
+ private final Map<String, Map<String, String>> errStates;
+ private final ZkClient zkClient;
+
+ public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName)
+ {
+ this(zkAddr, clusterName, null);
+ }
+
+ public BestPossAndExtViewZkVerifier(String zkAddr,
+ String clusterName,
+ Map<String, Map<String, String>> errStates)
+ {
+ if (zkAddr == null || clusterName == null)
+ {
+ throw new IllegalArgumentException("requires zkAddr|clusterName");
+ }
+ this.zkAddr = zkAddr;
+ this.clusterName = clusterName;
+ this.errStates = errStates;
+ this.zkClient = ZKClientPool.getZkClient(zkAddr); // null;
+ }
+
+ @Override
+ public boolean verify()
+ {
+ try
+ {
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName,
+ new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+ return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+ }
+ catch (Exception e)
+ {
+ LOG.error("exception in verification", e);
+ }
+ return false;
+ }
+
+ @Override
+ public ZkClient getZkClient()
+ {
+ return zkClient;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return clusterName;
+ }
+
+ @Override
+ public String toString()
+ {
+ String verifierName = getClass().getName();
+ verifierName =
+ verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+ return verifierName + "(" + clusterName + "@" + zkAddr + ")";
+ }
+ }
+
+ public static class BestPossAndExtViewFileVerifier implements Verifier
+ {
+ private final String rootPath;
+ private final String clusterName;
+ private final Map<String, Map<String, String>> errStates;
+ private final FilePropertyStore<ZNRecord> fileStore;
+
+ public BestPossAndExtViewFileVerifier(String rootPath, String clusterName)
+ {
+ this(rootPath, clusterName, null);
+ }
+
+ public BestPossAndExtViewFileVerifier(String rootPath,
+ String clusterName,
+ Map<String, Map<String, String>> errStates)
+ {
+ if (rootPath == null || clusterName == null)
+ {
+ throw new IllegalArgumentException("requires rootPath|clusterName");
+ }
+ this.rootPath = rootPath;
+ this.clusterName = clusterName;
+ this.errStates = errStates;
+
+ this.fileStore =
+ new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
+ rootPath,
+ new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
+ }
+
+ @Override
+ public boolean verify()
+ {
+ try
+ {
+ HelixDataAccessor accessor = new FileHelixDataAccessor(fileStore, clusterName);
+
+ return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
+ }
+ catch (Exception e)
+ {
+ LOG.error("exception in verification", e);
+ return false;
+ }
+ finally
+ {
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ String verifierName = getClass().getName();
+ verifierName =
+ verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+ return verifierName + "(" + rootPath + "@" + clusterName + ")";
+ }
+ }
+
+ public static class MasterNbInExtViewVerifier implements ZkVerifier
+ {
+ private final String zkAddr;
+ private final String clusterName;
+ private final ZkClient zkClient;
+
+ public MasterNbInExtViewVerifier(String zkAddr, String clusterName)
+ {
+ if (zkAddr == null || clusterName == null)
+ {
+ throw new IllegalArgumentException("requires zkAddr|clusterName");
+ }
+ this.zkAddr = zkAddr;
+ this.clusterName = clusterName;
+ this.zkClient = ZKClientPool.getZkClient(zkAddr);
+ }
+
+ @Override
+ public boolean verify()
+ {
+ try
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName,
+ new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+ return ClusterStateVerifier.verifyMasterNbInExtView(accessor);
+ }
+ catch (Exception e)
+ {
+ LOG.error("exception in verification", e);
+ }
+ return false;
+ }
+
+ @Override
+ public ZkClient getZkClient()
+ {
+ return zkClient;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return clusterName;
+ }
+
+ }
+
+ static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
+ Map<String, Map<String, String>> errStates)
+ {
+ try
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ // read cluster once and do verification
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.refresh(accessor);
+
+ Map<String, IdealState> idealStates = cache.getIdealStates();
+ if (idealStates == null) // || idealStates.isEmpty())
+ {
+ // ideal state is null because ideal state is dropped
+ idealStates = Collections.emptyMap();
+ }
+
+ Map<String, ExternalView> extViews =
+ accessor.getChildValuesMap(keyBuilder.externalViews());
+ if (extViews == null) // || extViews.isEmpty())
+ {
+ extViews = Collections.emptyMap();
+ }
+
+ // if externalView is not empty and idealState doesn't exist
+ // add empty idealState for the resource
+ for (String resource : extViews.keySet())
+ {
+ if (!idealStates.containsKey(resource))
+ {
+ idealStates.put(resource, new IdealState(resource));
+ }
+ }
+
+ // calculate best possible state
+ BestPossibleStateOutput bestPossOutput =
+ ClusterStateVerifier.calcBestPossState(cache);
+
+ // set error states
+ if (errStates != null)
+ {
+ for (String resourceName : errStates.keySet())
+ {
+ Map<String, String> partErrStates = errStates.get(resourceName);
+ for (String partitionName : partErrStates.keySet())
+ {
+ String instanceName = partErrStates.get(partitionName);
+ Map<String, String> partStateMap =
+ bestPossOutput.getInstanceStateMap(resourceName,
+ new Partition(partitionName));
+ partStateMap.put(instanceName, "ERROR");
+ }
+ }
+ }
+
+
+ for (String resourceName : idealStates.keySet())
+ {
+ ExternalView extView = extViews.get(resourceName);
+ if (extView == null)
+ {
+ LOG.info("externalView for " + resourceName + " is not available");
+ return false;
+ }
+
+ // step 0: remove empty map and DROPPED state from best possible state
+ Map<Partition, Map<String, String>> bpStateMap =
+ bestPossOutput.getResourceMap(resourceName);
+ Iterator<Entry<Partition, Map<String, String>>> iter =
+ bpStateMap.entrySet().iterator();
+ while (iter.hasNext())
+ {
+ Map.Entry<Partition, Map<String, String>> entry = iter.next();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (instanceStateMap.isEmpty())
+ {
+ iter.remove();
+ } else
+ {
+ // remove instances with DROPPED state
+ Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+ while (insIter.hasNext())
+ {
+ Map.Entry<String, String> insEntry = insIter.next();
+ String state = insEntry.getValue();
+ if (state.equalsIgnoreCase("DROPPED"))
+ {
+ insIter.remove();
+ }
+ }
+ }
+ }
+
+ // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
+
+ // step 1: externalView and bestPossibleState has equal size
+ int extViewSize = extView.getRecord().getMapFields().size();
+ int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+ if (extViewSize != bestPossStateSize)
+ {
+ LOG.info("exterView size (" + extViewSize
+ + ") is different from bestPossState size (" + bestPossStateSize
+ + ") for resource: " + resourceName);
+ // System.out.println("extView: " + extView.getRecord().getMapFields());
+ // System.out.println("bestPossState: " +
+ // bestPossOutput.getResourceMap(resourceName));
+ return false;
+ }
+
+ // step 2: every entry in external view is contained in best possible state
+ for (String partition : extView.getRecord().getMapFields().keySet())
+ {
+ Map<String, String> evInstanceStateMap =
+ extView.getRecord().getMapField(partition);
+ Map<String, String> bpInstanceStateMap =
+ bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+
+ boolean result =
+ ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
+ bpInstanceStateMap);
+ if (result == false)
+ {
+ LOG.info("externalView is different from bestPossibleState for partition:"
+ + partition);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ catch (Exception e)
+ {
+ LOG.error("exception in verification", e);
+ return false;
+ }
+
+ }
+
+ static boolean verifyMasterNbInExtView(HelixDataAccessor accessor)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+
+ Map<String, IdealState> idealStates =
+ accessor.getChildValuesMap(keyBuilder.idealStates());
+ if (idealStates == null || idealStates.size() == 0)
+ {
+ LOG.info("No resource idealState");
+ return true;
+ }
+
+ Map<String, ExternalView> extViews =
+ accessor.getChildValuesMap(keyBuilder.externalViews());
+ if (extViews == null || extViews.size() < idealStates.size())
+ {
+ LOG.info("No externalViews | externalView.size() < idealState.size()");
+ return false;
+ }
+
+ for (String resource : extViews.keySet())
+ {
+ int partitions = idealStates.get(resource).getNumPartitions();
+ Map<String, Map<String, String>> instanceStateMap =
+ extViews.get(resource).getRecord().getMapFields();
+ if (instanceStateMap.size() < partitions)
+ {
+ LOG.info("Number of externalViews (" + instanceStateMap.size()
+ + ") < partitions (" + partitions + ")");
+ return false;
+ }
+
+ for (String partition : instanceStateMap.keySet())
+ {
+ boolean foundMaster = false;
+ for (String instance : instanceStateMap.get(partition).keySet())
+ {
+ if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER"))
+ {
+ foundMaster = true;
+ break;
+ }
+ }
+ if (!foundMaster)
+ {
+ LOG.info("No MASTER for partition: " + partition);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ static void runStage(ClusterEvent event, Stage stage) throws Exception
+ {
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+ stage.process(event);
+ stage.postProcess();
+ }
+
+ /**
+ * calculate the best possible state note that DROPPED states are not checked since when
+ * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+ *
+ * @param cache
+ * @return
+ * @throws Exception
+ */
+
+ static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception
+ {
+ ClusterEvent event = new ClusterEvent("sampleEvent");
+ event.addAttribute("ClusterDataCache", cache);
+
+ ResourceComputationStage rcState = new ResourceComputationStage();
+ CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+ BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+
+ runStage(event, rcState);
+ runStage(event, csStage);
+ runStage(event, bpStage);
+
+ BestPossibleStateOutput output =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+
+ // System.out.println("output:" + output);
+ return output;
+ }
+
+ public static <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2)
+ {
+ boolean isEqual = true;
+ if (map1 == null && map2 == null)
+ {
+ // OK
+ }
+ else if (map1 == null && map2 != null)
+ {
+ if (!map2.isEmpty())
+ {
+ isEqual = false;
+ }
+ }
+ else if (map1 != null && map2 == null)
+ {
+ if (!map1.isEmpty())
+ {
+ isEqual = false;
+ }
+ }
+ else
+ {
+ // verify size
+ if (map1.size() != map2.size())
+ {
+ isEqual = false;
+ }
+ // verify each <key, value> in map1 is contained in map2
+ for (K key : map1.keySet())
+ {
+ if (!map1.get(key).equals(map2.get(key)))
+ {
+ LOG.debug("different value for key: " + key + "(map1: " + map1.get(key)
+ + ", map2: " + map2.get(key) + ")");
+ isEqual = false;
+ break;
+ }
+ }
+ }
+ return isEqual;
+ }
+
+ public static boolean verifyByPolling(Verifier verifier)
+ {
+ return verifyByPolling(verifier, 30 * 1000);
+ }
+
+ public static boolean verifyByPolling(Verifier verifier, long timeout)
+ {
+ return verifyByPolling(verifier, timeout, 1000);
+ }
+
+ public static boolean verifyByPolling(Verifier verifier, long timeout, long period)
+ {
+ long startTime = System.currentTimeMillis();
+ boolean result = false;
+ try
+ {
+ long curTime;
+ do
+ {
+ Thread.sleep(period);
+ result = verifier.verify();
+ if (result == true)
+ {
+ break;
+ }
+ curTime = System.currentTimeMillis();
+ }
+ while (curTime <= startTime + timeout);
+ return result;
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ finally
+ {
+ long endTime = System.currentTimeMillis();
+
+ // debug
+ System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
+ + "ms to verify");
+
+ }
+ return false;
+ }
+
+ public static boolean verifyByZkCallback(ZkVerifier verifier)
+ {
+ return verifyByZkCallback(verifier, 30000);
+ }
+
+ public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout)
+ {
+ long startTime = System.currentTimeMillis();
+ CountDownLatch countDown = new CountDownLatch(1);
+ ZkClient zkClient = verifier.getZkClient();
+ String clusterName = verifier.getClusterName();
+
+ // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
+ // so when analyze zk log, we know when a test ends
+ zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+
+ ExtViewVeriferZkListener listener =
+ new ExtViewVeriferZkListener(countDown, zkClient, verifier);
+
+ String extViewPath =
+ PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
+ zkClient.subscribeChildChanges(extViewPath, listener);
+ for (String child : zkClient.getChildren(extViewPath))
+ {
+ String childPath =
+ extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+ zkClient.subscribeDataChanges(childPath, listener);
+ }
+
+ // do initial verify
+ boolean result = verifier.verify();
+ if (result == false)
+ {
+ try
+ {
+ result = countDown.await(timeout, TimeUnit.MILLISECONDS);
+ if (result == false)
+ {
+ // make a final try if timeout
+ result = verifier.verify();
+ }
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ // clean up
+ zkClient.unsubscribeChildChanges(extViewPath, listener);
+ for (String child : zkClient.getChildren(extViewPath))
+ {
+ String childPath =
+ extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+ zkClient.unsubscribeDataChanges(childPath, listener);
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+ // debug
+ System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
+
+ return result;
+ }
+
+ public static boolean verifyFileBasedClusterStates(String file,
+ String instanceName,
+ StateModelFactory<StateModel> stateModelFactory)
+ {
+ ClusterView clusterView = ClusterViewSerializer.deserialize(new File(file));
+ boolean ret = true;
+ int nonOfflineStateNr = 0;
+
+ // ideal_state for instance with name $instanceName
+ Map<String, String> instanceIdealStates = new HashMap<String, String>();
+ for (ZNRecord idealStateItem : clusterView.getPropertyList(PropertyType.IDEALSTATES))
+ {
+ Map<String, Map<String, String>> idealStates = idealStateItem.getMapFields();
+
+ for (Map.Entry<String, Map<String, String>> entry : idealStates.entrySet())
+ {
+ if (entry.getValue().containsKey(instanceName))
+ {
+ String state = entry.getValue().get(instanceName);
+ instanceIdealStates.put(entry.getKey(), state);
+ }
+ }
+ }
+
+ Map<String, StateModel> currentStateMap = stateModelFactory.getStateModelMap();
+
+ if (currentStateMap.size() != instanceIdealStates.size())
+ {
+ LOG.warn("Number of current states (" + currentStateMap.size() + ") mismatch "
+ + "number of ideal states (" + instanceIdealStates.size() + ")");
+ return false;
+ }
+
+ for (Map.Entry<String, String> entry : instanceIdealStates.entrySet())
+ {
+
+ String stateUnitKey = entry.getKey();
+ String idealState = entry.getValue();
+
+ if (!idealState.equalsIgnoreCase("offline"))
+ {
+ nonOfflineStateNr++;
+ }
+
+ if (!currentStateMap.containsKey(stateUnitKey))
+ {
+ LOG.warn("Current state does not contain " + stateUnitKey);
+ // return false;
+ ret = false;
+ continue;
+ }
+
+ String curState = currentStateMap.get(stateUnitKey).getCurrentState();
+ if (!idealState.equalsIgnoreCase(curState))
+ {
+ LOG.info("State mismatch--unit_key:" + stateUnitKey + " cur:" + curState
+ + " ideal:" + idealState + " instance_name:" + instanceName);
+ // return false;
+ ret = false;
+ continue;
+ }
+ }
+
+ if (ret == true)
+ {
+ System.out.println(instanceName + ": verification succeed");
+ LOG.info(instanceName + ": verification succeed (" + nonOfflineStateNr + " states)");
+ }
+
+ return ret;
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions()
+ {
+ Option helpOption =
+ OptionBuilder.withLongOpt(help)
+ .withDescription("Prints command-line options info")
+ .create();
+
+ Option zkServerOption =
+ OptionBuilder.withLongOpt(zkServerAddress)
+ .withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption =
+ OptionBuilder.withLongOpt(cluster)
+ .withDescription("Provide cluster name")
+ .create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option timeoutOption =
+ OptionBuilder.withLongOpt(timeout)
+ .withDescription("Timeout value for verification")
+ .create();
+ timeoutOption.setArgs(1);
+ timeoutOption.setArgName("Timeout value (Optional), default=30s");
+
+ Option sleepIntervalOption =
+ OptionBuilder.withLongOpt(period)
+ .withDescription("Polling period for verification")
+ .create();
+ sleepIntervalOption.setArgs(1);
+ sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(timeoutOption);
+ options.addOption(sleepIntervalOption);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions)
+ {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs)
+ {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ // CommandLine cmd = null;
+
+ try
+ {
+ return cliParser.parse(cliOptions, cliArgs);
+ }
+ catch (ParseException pe)
+ {
+ System.err.println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static boolean verifyState(String[] args)
+ {
+ // TODO Auto-generated method stub
+ String clusterName = "storage-cluster";
+ String zkServer = "localhost:2181";
+ long timeoutValue = 0;
+ long periodValue = 1000;
+
+ if (args.length > 0)
+ {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkServer = cmd.getOptionValue(zkServerAddress);
+ clusterName = cmd.getOptionValue(cluster);
+ String timeoutStr = cmd.getOptionValue(timeout);
+ String periodStr = cmd.getOptionValue(period);
+ if (timeoutStr != null)
+ {
+ try
+ {
+ timeoutValue = Long.parseLong(timeoutStr);
+ }
+ catch (Exception e)
+ {
+ System.err.println("Exception in converting " + timeoutStr
+ + " to long. Use default (0)");
+ }
+ }
+
+ if (periodStr != null)
+ {
+ try
+ {
+ periodValue = Long.parseLong(periodStr);
+ }
+ catch (Exception e)
+ {
+ System.err.println("Exception in converting " + periodStr
+ + " to long. Use default (1000)");
+ }
+ }
+
+ }
+ // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
+ // timeoutValue,
+ // periodValue);
+
+ return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
+ timeoutValue);
+ }
+
+ public static void main(String[] args)
+ {
+ boolean result = verifyState(args);
+ System.out.println(result ? "Successful" : "failed");
+ System.exit(1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java
new file mode 100644
index 0000000..c10da85
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterViewSerializer.java
@@ -0,0 +1,177 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.ClusterView;
+import org.apache.helix.manager.file.StaticFileHelixManager;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+
+public class ClusterViewSerializer
+{
+ private static Logger logger = Logger.getLogger(ClusterViewSerializer.class);
+
+ public static void serialize(ClusterView view, File file)
+ {
+ ObjectMapper mapper = new ObjectMapper();
+
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ // serializationConfig.set(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, true);
+
+ try
+ {
+ mapper.writeValue(file, view);
+ }
+ catch (Exception e)
+ {
+ logger.error("Error during serialization of data:" + view, e);
+ }
+ }
+
+ public static byte[] serialize(ClusterView view)
+ {
+ ObjectMapper mapper = new ObjectMapper();
+
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ // serializationConfig.set(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, true);
+
+ StringWriter sw = new StringWriter();
+
+ try
+ {
+ mapper.writeValue(sw, view);
+ return sw.toString().getBytes();
+ }
+ catch (Exception e)
+ {
+ logger.error("Error during serialization of data:" + view, e);
+ }
+
+ return new byte[0];
+ }
+
+ public static ClusterView deserialize(File file)
+ {
+ if (!file.exists())
+ {
+ logger.error(String.format("Static config file \"%s\" doesn't exist", file.getAbsolutePath()));
+ return null;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+ // deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+
+ try
+ {
+ ClusterView view = mapper.readValue(file, ClusterView.class);
+ return view;
+ }
+ catch (Exception e)
+ {
+ logger.error("Error during deserialization of file:" + file.getAbsolutePath(), e);
+ }
+
+ return null;
+ }
+
+
+ public static ClusterView deserialize(byte[] bytes)
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+ DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
+ // deserializationConfig.set(DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+ deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
+
+ try
+ {
+ ClusterView view = mapper.readValue(bais, ClusterView.class);
+ return view;
+ }
+ catch (Exception e)
+ {
+ logger.error("Error during deserialization of bytes:" + new String(bytes), e);
+ }
+
+ return null;
+ }
+
+ public static void main(String[] args) throws JsonGenerationException,
+ JsonMappingException, IOException
+ {
+ // temporary test only
+ // create fake db names and nodes
+ List<StaticFileHelixManager.DBParam> dbParams = new ArrayList<StaticFileHelixManager.DBParam>();
+ // dbParams.add(new FileBasedClusterManager.DBParam("BizFollow", 1));
+ dbParams.add(new StaticFileHelixManager.DBParam("BizProfile_qatest218a", 128));
+ // dbParams.add(new FileBasedClusterManager.DBParam("EspressoDB", 10));
+ // dbParams.add(new FileBasedClusterManager.DBParam("MailboxDB", 128));
+ // dbParams.add(new FileBasedClusterManager.DBParam("MyDB", 8));
+ // dbParams.add(new FileBasedClusterManager.DBParam("schemata", 1));
+ String[] nodesInfo = { "localhost:8900", "localhost:8901",
+ "localhost:8902", "localhost:8903",
+ "localhost:8904" };
+ // String[] nodesInfo = { "esv4-app75.stg.linkedin.com:12918" };
+ int replica = 0;
+
+ ClusterView view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, replica);
+ String file = "/tmp/cluster-view-bizprofile.json";
+ // ClusterViewSerializer serializer = new ClusterViewSerializer(file);
+
+ byte[] bytes = ClusterViewSerializer.serialize(view);
+ // logger.info("serialized bytes=" );
+ // logger.info(new String(bytes));
+ System.out.println("serialized bytes=");
+ System.out.println(new String(bytes));
+
+ ClusterView restoredView = ClusterViewSerializer.deserialize(bytes);
+ // logger.info(restoredView);
+
+ bytes = ClusterViewSerializer.serialize(restoredView);
+ // logger.info(new String(bytes));
+ System.out.println("restored cluster view=");
+ System.out.println(new String(bytes));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java b/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java
new file mode 100644
index 0000000..9c202d4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealCalculatorByConsistentHashing.java
@@ -0,0 +1,626 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+public class IdealCalculatorByConsistentHashing
+{
+ /**
+ * Interface to calculate the hash function value of a string
+ */
+ public interface HashFunction
+ {
+ public int getHashValue(String key);
+ }
+
+ /**
+ * The default string hash function. Same as the default function used by
+ * Voldmort
+ */
+ public static class FnvHash implements HashFunction
+ {
+ private static final long FNV_BASIS = 0x811c9dc5;
+ private static final long FNV_PRIME = (1 << 24) + 0x193;
+ public static final long FNV_BASIS_64 = 0xCBF29CE484222325L;
+ public static final long FNV_PRIME_64 = 1099511628211L;
+
+ public int hash(byte[] key)
+ {
+ long hash = FNV_BASIS;
+ for (int i = 0; i < key.length; i++)
+ {
+ hash ^= 0xFF & key[i];
+ hash *= FNV_PRIME;
+ }
+ return (int) hash;
+ }
+
+ public long hash64(long val)
+ {
+ long hashval = FNV_BASIS_64;
+ for (int i = 0; i < 8; i++)
+ {
+ long octet = val & 0x00ff;
+ val = val >> 8;
+ hashval = hashval ^ octet;
+ hashval = hashval * FNV_PRIME_64;
+ }
+ return Math.abs(hashval);
+ }
+
+ @Override
+ public int getHashValue(String key)
+ {
+ return hash(key.getBytes());
+ }
+
+ }
+
+ /**
+ * Calculate the ideal state for list of instances clusters using consistent
+ * hashing.
+ *
+ * @param instanceNames
+ * List of instance names.
+ * @param partitions
+ * the partition number of the database
+ * @param replicas
+ * the replication degree
+ * @param dbName
+ * the name of the database
+ * @return The ZNRecord that contains the ideal state
+ */
+ public static ZNRecord calculateIdealState(List<String> instanceNames,
+ int partitions, int replicas, String dbName, HashFunction hashFunc)
+ {
+ return calculateIdealState(instanceNames, partitions, replicas, dbName,
+ hashFunc, 65536);
+ }
+
+ /**
+ * Calculate the ideal state for list of instances clusters using consistent
+ * hashing.
+ *
+ * @param instanceNames
+ * List of instance names.
+ * @param partitions
+ * the partition number of the database
+ * @param replicas
+ * the replication degree
+ * @param dbName
+ * the name of the database
+ * @param hashringSize
+ * the size of the hash ring used by consistent hashing
+ * @return The ZNRecord that contains the ideal state
+ */
+ public static ZNRecord calculateIdealState(List<String> instanceNames,
+ int partitions, int replicas, String dbName, HashFunction hashFunc,
+ int hashRingSize)
+ {
+ ZNRecord result = new ZNRecord(dbName);
+
+ int[] hashRing = generateEvenHashRing(instanceNames, hashRingSize);
+ result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+ Random rand = new Random(0xc0ffee);
+ for (int i = 0; i < partitions; i++)
+ {
+ String partitionName = dbName + ".partition-" + i;
+ int hashPos = rand.nextInt() % hashRingSize;
+ // (int)(hashFunc.getHashValue(partitionName) % hashRingSize);
+ hashPos = hashPos < 0 ? (hashPos + hashRingSize) : hashPos;
+ // System.out.print(hashPos+ " ");
+ // if(i % 120 == 0) System.out.println();
+ Map<String, String> partitionAssignment = new TreeMap<String, String>();
+ // the first in the list is the node that contains the master
+ int masterPos = hashRing[hashPos];
+ partitionAssignment.put(instanceNames.get(masterPos), "MASTER");
+
+ // partitionAssignment.put("hash", "" + hashPos + " " + masterPos);
+
+ // Put slaves in next has ring positions. We need to make sure that no
+ // more than 2 slaves
+ // are mapped to one node.
+ for (int j = 1; j <= replicas; j++)
+ {
+ String next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
+ while (partitionAssignment.containsKey(next))
+ {
+ hashPos++;
+ next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
+ }
+ partitionAssignment.put(next, "SLAVE");
+ }
+ result.setMapField(partitionName, partitionAssignment);
+ }
+ return result;
+ }
+
+ /**
+ * Generate the has ring for consistent hashing.
+ *
+ * @param instanceNames
+ * List of instance names.
+ * @param hashringSize
+ * the size of the hash ring used by consistent hashing
+ * @return The int array as the hashing. it contains random values ranges from
+ * 0..size of instanceNames-1
+ */
+ public static int[] generateHashRing(List<String> instanceNames,
+ int hashRingSize)
+ {
+ int[] result = new int[hashRingSize];
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = 0;
+ }
+ int instances = instanceNames.size();
+ // The following code generates the random distribution
+ for (int i = 1; i < instances; i++)
+ {
+ putNodeOnHashring(result, i, hashRingSize / (i + 1), i);
+ }
+ return result;
+ }
+
+ public static int[] generateEvenHashRing(List<String> instanceNames,
+ int hashRingSize)
+ {
+ int[] result = new int[hashRingSize];
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = 0;
+ }
+ int instances = instanceNames.size();
+ // The following code generates the random distribution
+ for (int i = 1; i < instances; i++)
+ {
+ putNodeEvenOnHashRing(result, i, i + 1);
+ }
+ return result;
+ }
+
+ private static void putNodeEvenOnHashRing(int[] hashRing, int nodeVal,
+ int totalValues)
+ {
+ int newValNum = hashRing.length / totalValues;
+ assert (newValNum > 0);
+ Map<Integer, List<Integer>> valueIndex = buildValueIndex(hashRing);
+ int nSources = valueIndex.size();
+ int remainder = newValNum % nSources;
+
+ List<List<Integer>> positionLists = new ArrayList<List<Integer>>();
+ for (List<Integer> list : valueIndex.values())
+ {
+ positionLists.add(list);
+ }
+ class ListComparator implements Comparator<List<Integer>>
+ {
+ @Override
+ public int compare(List<Integer> o1, List<Integer> o2)
+ {
+ return (o1.size() > o2.size() ? -1 : (o1.size() == o2.size() ? 0 : 1));
+ }
+ }
+ Collections.sort(positionLists, new ListComparator());
+
+ for (List<Integer> oldValPositions : positionLists)
+ {
+ // List<Integer> oldValPositions = valueIndex.get(oldVal);
+ int nValsToReplace = newValNum / nSources;
+ assert (nValsToReplace > 0);
+ if (remainder > 0)
+ {
+ nValsToReplace++;
+ remainder--;
+ }
+ // System.out.print(oldValPositions.size()+" "+nValsToReplace+" ");
+ putNodeValueOnHashRing(hashRing, nodeVal, nValsToReplace, oldValPositions);
+ // randomly take nValsToReplace positions in oldValPositions and make them
+ }
+ // System.out.println();
+ }
+
+ private static void putNodeValueOnHashRing(int[] hashRing, int nodeVal,
+ int numberOfValues, List<Integer> positions)
+ {
+ Random rand = new Random(nodeVal);
+ // initialize the index array
+ int[] index = new int[positions.size()];
+ for (int i = 0; i < index.length; i++)
+ {
+ index[i] = i;
+ }
+
+ int nodesLeft = index.length;
+
+ for (int i = 0; i < numberOfValues; i++)
+ {
+ // Calculate a random index
+ int randIndex = rand.nextInt() % nodesLeft;
+ if (randIndex < 0)
+ {
+ randIndex += nodesLeft;
+ }
+ hashRing[positions.get(index[randIndex])] = nodeVal;
+
+ // swap the random index and the last available index, and decrease the
+ // nodes left
+ int temp = index[randIndex];
+ index[randIndex] = index[nodesLeft - 1];
+ index[nodesLeft - 1] = temp;
+ nodesLeft--;
+ }
+ }
+
+ private static Map<Integer, List<Integer>> buildValueIndex(int[] hashRing)
+ {
+ Map<Integer, List<Integer>> result = new TreeMap<Integer, List<Integer>>();
+ for (int i = 0; i < hashRing.length; i++)
+ {
+ if (!result.containsKey(hashRing[i]))
+ {
+ List<Integer> list = new ArrayList<Integer>();
+ result.put(hashRing[i], list);
+ }
+ result.get(hashRing[i]).add(i);
+ }
+ return result;
+ }
+
+ /**
+ * Uniformly put node values on the hash ring. Derived from the shuffling
+ * algorithm
+ *
+ * @param result
+ * the hash ring array.
+ * @param nodeValue
+ * the int value to be added to the hash ring this time
+ * @param numberOfNodes
+ * number of node values to put on the hash ring array
+ * @param randomSeed
+ * the random seed
+ */
+ public static void putNodeOnHashring(int[] result, int nodeValue,
+ int numberOfNodes, int randomSeed)
+ {
+ Random rand = new Random(randomSeed);
+ // initialize the index array
+ int[] index = new int[result.length];
+ for (int i = 0; i < index.length; i++)
+ {
+ index[i] = i;
+ }
+
+ int nodesLeft = index.length;
+
+ for (int i = 0; i < numberOfNodes; i++)
+ {
+ // Calculate a random index
+ int randIndex = rand.nextInt() % nodesLeft;
+ if (randIndex < 0)
+ {
+ randIndex += nodesLeft;
+ }
+ if (result[index[randIndex]] == nodeValue)
+ {
+ assert (false);
+ }
+ result[index[randIndex]] = nodeValue;
+
+ // swap the random index and the last available index, and decrease the
+ // nodes left
+ int temp = index[randIndex];
+ index[randIndex] = index[nodesLeft - 1];
+ index[nodesLeft - 1] = temp;
+
+ nodesLeft--;
+ }
+ }
+
+ /**
+ * Helper function to see how many partitions are mapped to different
+ * instances in two ideal states
+ * */
+ public static void printDiff(ZNRecord record1, ZNRecord record2)
+ {
+ int diffCount = 0;
+ for (String key : record1.getMapFields().keySet())
+ {
+ Map<String, String> map1 = record1.getMapField(key);
+ Map<String, String> map2 = record2.getMapField(key);
+
+ for (String k : map1.keySet())
+ {
+ if (!map2.containsKey(k))
+ {
+ diffCount++;
+ } else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
+ {
+ diffCount++;
+ }
+ }
+ }
+ System.out.println("diff count = " + diffCount);
+ }
+
+ /**
+ * Helper function to compare the difference between two hashing buffers
+ * */
+ public static void compareHashrings(int[] ring1, int[] ring2)
+ {
+ int diff = 0;
+ for (int i = 0; i < ring1.length; i++)
+ {
+ if (ring1[i] != ring2[i])
+ {
+ diff++;
+ }
+ }
+ System.out.println("ring diff: " + diff);
+ }
+
+ public static void printNodeOfflineOverhead(ZNRecord record)
+ {
+ // build node -> partition map
+ Map<String, Set<String>> nodeNextMap = new TreeMap<String, Set<String>>();
+ for (String partitionName : record.getMapFields().keySet())
+ {
+ Map<String, String> map1 = record.getMapField(partitionName);
+ String master = "", slave = "";
+ for (String nodeName : map1.keySet())
+ {
+ if (!nodeNextMap.containsKey(nodeName))
+ {
+ nodeNextMap.put(nodeName, new TreeSet<String>());
+ }
+
+ // String master = "", slave = "";
+ if (map1.get(nodeName).equalsIgnoreCase("MASTER"))
+ {
+ master = nodeName;
+ } else
+ {
+ if (slave.equalsIgnoreCase(""))
+ {
+ slave = nodeName;
+ }
+ }
+
+ }
+ nodeNextMap.get(master).add(slave);
+ }
+ System.out.println("next count: ");
+ for (String key : nodeNextMap.keySet())
+ {
+ System.out.println(nodeNextMap.get(key).size() + " ");
+ }
+ System.out.println();
+ }
+
+ /**
+ * Helper function to calculate and print the standard deviation of the
+ * partition assignment ideal state, also the min/max of master partitions
+ * that is hosted on each node
+ * */
+ public static void printIdealStateStats(ZNRecord record, String value)
+ {
+ Map<String, Integer> countsMap = new TreeMap<String, Integer>();
+ for (String key : record.getMapFields().keySet())
+ {
+ Map<String, String> map1 = record.getMapField(key);
+ for (String k : map1.keySet())
+ {
+ if (!countsMap.containsKey(k))
+ {
+ countsMap.put(k, new Integer(0));//
+ }
+ if (value.equals("") || map1.get(k).equalsIgnoreCase(value))
+ {
+ countsMap.put(k, countsMap.get(k).intValue() + 1);
+ }
+ }
+ }
+ double sum = 0;
+ int maxCount = 0;
+ int minCount = Integer.MAX_VALUE;
+
+ System.out.println("Partition distributions: ");
+ for (String k : countsMap.keySet())
+ {
+ int count = countsMap.get(k);
+ sum += count;
+ if (maxCount < count)
+ {
+ maxCount = count;
+ }
+ if (minCount > count)
+ {
+ minCount = count;
+ }
+ System.out.print(count + " ");
+ }
+ System.out.println();
+ double mean = sum / (countsMap.size());
+ // calculate the deviation of the node distribution
+ double deviation = 0;
+ for (String k : countsMap.keySet())
+ {
+ double count = countsMap.get(k);
+ deviation += (count - mean) * (count - mean);
+ }
+ System.out.println("Mean: " + mean + " normal deviation:"
+ + Math.sqrt(deviation / countsMap.size()));
+
+ System.out.println("Max count: " + maxCount + " min count:" + minCount);
+ /*
+ * int steps = 10; int stepLen = (maxCount - minCount)/steps; List<Integer>
+ * histogram = new ArrayList<Integer>((maxCount - minCount)/stepLen + 1);
+ *
+ * for(int i = 0; i< (maxCount - minCount)/stepLen + 1; i++) {
+ * histogram.add(0); } for(String k :countsMap.keySet()) { int count =
+ * countsMap.get(k); int stepNo = (count - minCount)/stepLen;
+ * histogram.set(stepNo, histogram.get(stepNo) +1); }
+ * System.out.println("histogram:"); for(Integer x : histogram) {
+ * System.out.print(x+" "); }
+ */
+ }
+
+ public static void printHashRingStat(int[] hashRing)
+ {
+ double sum = 0, mean = 0, deviation = 0;
+ Map<Integer, Integer> countsMap = new TreeMap<Integer, Integer>();
+ for (int i = 0; i < hashRing.length; i++)
+ {
+ if (!countsMap.containsKey(hashRing[i]))
+ {
+ countsMap.put(hashRing[i], new Integer(0));//
+ }
+ countsMap.put(hashRing[i], countsMap.get(hashRing[i]).intValue() + 1);
+ }
+ int maxCount = Integer.MIN_VALUE;
+ int minCount = Integer.MAX_VALUE;
+ for (Integer k : countsMap.keySet())
+ {
+ int count = countsMap.get(k);
+ sum += count;
+ if (maxCount < count)
+ {
+ maxCount = count;
+ }
+ if (minCount > count)
+ {
+ minCount = count;
+ }
+ }
+ mean = sum / countsMap.size();
+ for (Integer k : countsMap.keySet())
+ {
+ int count = countsMap.get(k);
+ deviation += (count - mean) * (count - mean);
+ }
+ System.out.println("hashring Mean: " + mean + " normal deviation:"
+ + Math.sqrt(deviation / countsMap.size()));
+
+ }
+
+ static int[] getFnvHashArray(List<String> strings)
+ {
+ int[] result = new int[strings.size()];
+ int i = 0;
+ IdealCalculatorByConsistentHashing.FnvHash hashfunc = new IdealCalculatorByConsistentHashing.FnvHash();
+ for (String s : strings)
+ {
+ int val = hashfunc.getHashValue(s) % 65536;
+ if (val < 0)
+ val += 65536;
+ result[i++] = val;
+ }
+ return result;
+ }
+
+ static void printArrayStat(int[] vals)
+ {
+ double sum = 0, mean = 0, deviation = 0;
+
+ for (int i = 0; i < vals.length; i++)
+ {
+ sum += vals[i];
+ }
+ mean = sum / vals.length;
+ for (int i = 0; i < vals.length; i++)
+ {
+ deviation += (mean - vals[i]) * (mean - vals[i]);
+ }
+ System.out.println("normalied deviation: "
+ + Math.sqrt(deviation / vals.length) / mean);
+ }
+
+ public static void main(String args[]) throws Exception
+ {
+ // Test the hash ring generation
+ List<String> instanceNames = new ArrayList<String>();
+ for (int i = 0; i < 10; i++)
+ {
+ instanceNames.add("localhost_123" + i);
+ }
+
+ // int[] ring1 =
+ // IdealCalculatorByConsistentHashing.generateEvenHashRing(instanceNames,
+ // 65535);
+ // printHashRingStat(ring1);
+ // int[] ring1 = getFnvHashArray(instanceNames);
+ // printArrayStat(ring1);
+
+ int partitions = 200, replicas = 2;
+ String dbName = "espressoDB1";
+
+ ZNRecord result = IdealCalculatorByConsistentHashing.calculateIdealState(
+ instanceNames, partitions, replicas, dbName,
+ new IdealCalculatorByConsistentHashing.FnvHash());
+ System.out.println("\nMaster :");
+ printIdealStateStats(result, "MASTER");
+
+ System.out.println("\nSlave :");
+ printIdealStateStats(result, "SLAVE");
+
+ System.out.println("\nTotal :");
+ printIdealStateStats(result, "");
+
+ printNodeOfflineOverhead(result);
+ /*
+ * ZNRecordSerializer serializer = new ZNRecordSerializer(); byte[] bytes;
+ * bytes = serializer.serialize(result); // System.out.println(new
+ * String(bytes));
+ *
+ * List<String> instanceNames2 = new ArrayList<String>(); for(int i = 0;i <
+ * 40; i++) { instanceNames2.add("localhost_123"+i); }
+ *
+ * ZNRecord result2 =
+ * IdealCalculatorByConsistentHashing.calculateIdealState( instanceNames2,
+ * partitions, replicas, dbName, new
+ * IdealCalculatorByConsistentHashing.FnvHash());
+ *
+ * printDiff(result, result2);
+ *
+ * //IdealCalculatorByConsistentHashing.printIdealStateStats(result2);
+ *
+ *
+ *
+ * int[] ring2 =
+ * IdealCalculatorByConsistentHashing.generateHashRing(instanceNames2,
+ * 30000);
+ *
+ * IdealCalculatorByConsistentHashing.compareHashrings(ring1, ring2);
+ * //printNodeStats(result); //printNodeStats(result2); bytes =
+ * serializer.serialize(result2); printHashRingStat(ring2); //
+ * System.out.println(new String(bytes));
+ */
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java
new file mode 100644
index 0000000..d1c7170
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByRush.java
@@ -0,0 +1,326 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+public class IdealStateCalculatorByRush
+{
+ /**
+ * Build the config map for RUSH algorithm. The input of RUSH algorithm groups
+ * nodes into "cluster"s, and different clusters can be assigned with
+ * different weights.
+ *
+ * @param numClusters
+ * number of node clusters
+ * @param instancesPerCluster
+ * List of clusters, each contain a list of node name strings.
+ * @param replicationDegree
+ * the replication degree
+ * @param clusterWeights
+ * the weight for each node cluster
+ * @return this config map structure for RUSH algorithm.
+ */
+ static HashMap<String, Object> buildRushConfig(int numClusters,
+ List<List<String>> instancesPerCluster, int replicationDegree,
+ List<Integer> clusterWeights)
+ {
+ HashMap<String, Object> config = new HashMap<String, Object>();
+ config.put("replicationDegree", replicationDegree);
+
+ HashMap[] clusterList = new HashMap[numClusters];
+ config.put("subClusters", clusterList);
+
+ HashMap[] nodes;
+ HashMap<String, String> node;
+ HashMap<String, Object> clusterData;
+ for (int n = 0; n < numClusters; n++)
+ {
+ int numNodes = instancesPerCluster.get(n).size();
+ nodes = new HashMap[numNodes];
+ for (int i = 0; i < numNodes; i++)
+ {
+ node = new HashMap<String, String>();
+ node.put("partition", instancesPerCluster.get(n).get(i));
+ nodes[i] = node;
+ }
+ clusterData = new HashMap<String, Object>();
+ clusterData.put("weight", clusterWeights.get(n));
+ clusterData.put("nodes", nodes);
+ clusterList[n] = clusterData;
+ }
+ return config;
+ }
+
+ /**
+ * Calculate the ideal state for list of instances clusters.
+ *
+ * @param numClusters
+ * number of node clusters
+ * @param instanceClusters
+ * List of clusters, each contain a list of node name strings.
+ * @param instanceClusterWeights
+ * the weight for each instance cluster
+ * @param partitions
+ * the partition number of the database
+ * @param replicas
+ * the replication degree
+ * @param dbName
+ * the name of the database
+ * @return The ZNRecord that contains the ideal state
+ */
+ public static ZNRecord calculateIdealState(
+ List<List<String>> instanceClusters,
+ List<Integer> instanceClusterWeights, int partitions, int replicas,
+ String dbName) throws Exception
+ {
+ ZNRecord result = new ZNRecord(dbName);
+
+ int numberOfClusters = instanceClusters.size();
+ List<List<String>> nodesInClusters = instanceClusters;
+ List<Integer> clusterWeights = instanceClusterWeights;
+
+ HashMap<String, Object> rushConfig = buildRushConfig(numberOfClusters,
+ nodesInClusters, replicas + 1, clusterWeights);
+ RUSHrHash rushHash = new RUSHrHash(rushConfig);
+
+ Random r = new Random(0);
+ for (int i = 0; i < partitions; i++)
+ {
+ int partitionId = i;
+ String partitionName = dbName + ".partition-" + partitionId;
+
+ ArrayList<HashMap> partitionAssignmentResult = rushHash
+ .findNode(i);
+ List<String> nodeNames = new ArrayList<String>();
+ for (HashMap p : partitionAssignmentResult)
+ {
+ for (Object key : p.keySet())
+ {
+ if (p.get(key) instanceof String)
+ {
+ nodeNames.add(p.get(key).toString());
+ }
+ }
+ }
+ Map<String, String> partitionAssignment = new TreeMap<String, String>();
+
+ for (int j = 0; j < nodeNames.size(); j++)
+ {
+ partitionAssignment.put(nodeNames.get(j), "SLAVE");
+ }
+ int master = r.nextInt(nodeNames.size());
+ //master = nodeNames.size()/2;
+ partitionAssignment.put(nodeNames.get(master), "MASTER");
+
+
+ result.setMapField(partitionName, partitionAssignment);
+ }
+ result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+ return result;
+ }
+
+ public static ZNRecord calculateIdealState(
+ List<String> instanceClusters,
+ int instanceClusterWeight, int partitions, int replicas,
+ String dbName) throws Exception
+ {
+ List<List<String>> instanceClustersList = new ArrayList<List<String>>();
+ instanceClustersList.add(instanceClusters);
+
+ List<Integer> instanceClusterWeightList = new ArrayList<Integer>();
+ instanceClusterWeightList.add(instanceClusterWeight);
+
+ return calculateIdealState(
+ instanceClustersList,
+ instanceClusterWeightList, partitions, replicas,
+ dbName);
+ }
+ /**
+ * Helper function to see how many partitions are mapped to different
+ * instances in two ideal states
+ * */
+ public static void printDiff(ZNRecord record1, ZNRecord record2)
+ {
+ int diffCount = 0;
+ int diffCountMaster = 0;
+ for (String key : record1.getMapFields().keySet())
+ {
+ Map<String, String> map1 = record1.getMapField(key);
+ Map<String, String> map2 = record2.getMapField(key);
+
+ for (String k : map1.keySet())
+ {
+ if (!map2.containsKey(k))
+ {
+ diffCount++;
+ }
+ else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
+ {
+ diffCountMaster++;
+ }
+ }
+ }
+ System.out.println("\ndiff count = " + diffCount);
+ System.out.println("\nmaster diff count:"+ diffCountMaster);
+ }
+
+ /**
+ * Helper function to calculate and print the standard deviation of the
+ * partition assignment ideal state.
+ * */
+ public static void printIdealStateStats(ZNRecord record)
+ {
+ Map<String, Integer> countsMap = new TreeMap<String, Integer>();
+ Map<String, Integer> masterCountsMap = new TreeMap<String, Integer>();
+ for (String key : record.getMapFields().keySet())
+ {
+ Map<String, String> map1 = record.getMapField(key);
+ for (String k : map1.keySet())
+ {
+ if (!countsMap.containsKey(k))
+ {
+ countsMap.put(k, new Integer(0));
+ }
+ else
+ {
+ countsMap.put(k, countsMap.get(k).intValue() + 1);
+ }
+ if (!masterCountsMap.containsKey(k))
+ {
+ masterCountsMap.put(k, new Integer(0));
+
+ }
+ else if (map1.get(k).equalsIgnoreCase("MASTER"))
+ {
+ masterCountsMap.put(k, masterCountsMap.get(k).intValue() + 1);
+ }
+ }
+ }
+ double sum = 0;
+ int maxCount = 0;
+ int minCount = Integer.MAX_VALUE;
+ for (String k : countsMap.keySet())
+ {
+ int count = countsMap.get(k);
+ sum += count;
+ if (maxCount < count)
+ {
+ maxCount = count;
+ }
+ if (minCount > count)
+ {
+ minCount = count;
+ }
+ System.out.print(count + " ");
+ }
+ System.out.println("\nMax count: " + maxCount + " min count:" + minCount);
+ System.out.println("\n master:");
+ double sumMaster = 0;
+ int maxCountMaster = 0;
+ int minCountMaster = Integer.MAX_VALUE;
+ for (String k : masterCountsMap.keySet())
+ {
+ int count = masterCountsMap.get(k);
+ sumMaster += count;
+ if (maxCountMaster < count)
+ {
+ maxCountMaster = count;
+ }
+ if (minCountMaster > count)
+ {
+ minCountMaster = count;
+ }
+ System.out.print(count + " ");
+ }
+ System.out.println("\nMean master: "+ 1.0*sumMaster/countsMap.size());
+ System.out.println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
+ double mean = sum / (countsMap.size());
+ // calculate the deviation of the node distribution
+ double deviation = 0;
+ for (String k : countsMap.keySet())
+ {
+ double count = countsMap.get(k);
+ deviation += (count - mean) * (count - mean);
+ }
+ System.out.println("Mean: " + mean + " normal deviation:"
+ + Math.sqrt(deviation / countsMap.size()) / mean);
+
+ //System.out.println("Max count: " + maxCount + " min count:" + minCount);
+ int steps = 10;
+ int stepLen = (maxCount - minCount) / steps;
+ if(stepLen == 0) return;
+ List<Integer> histogram = new ArrayList<Integer>((maxCount - minCount)
+ / stepLen + 1);
+
+ for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++)
+ {
+ histogram.add(0);
+ }
+ for (String k : countsMap.keySet())
+ {
+ int count = countsMap.get(k);
+ int stepNo = (count - minCount) / stepLen;
+ histogram.set(stepNo, histogram.get(stepNo) + 1);
+ }
+ System.out.println("histogram:");
+ for (Integer x : histogram)
+ {
+ System.out.print(x + " ");
+ }
+ }
+
+ public static void main(String args[]) throws Exception
+ {
+ int partitions = 4096, replicas = 2;
+ String dbName = "espressoDB1";
+ List<String> instanceNames = new ArrayList<String>();
+ List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
+ for (int i = 0; i < 20; i++)
+ {
+ instanceNames.add("local"+i+"host_123" + i);
+ }
+ instanceCluster1.add(instanceNames);
+ List<Integer> weights1 = new ArrayList<Integer>();
+ weights1.add(1);
+ ZNRecord result = IdealStateCalculatorByRush.calculateIdealState(
+ instanceCluster1, weights1, partitions, replicas, dbName);
+
+ printIdealStateStats(result);
+
+ List<String> instanceNames2 = new ArrayList<String>();
+ for (int i = 400; i < 405; i++)
+ {
+ instanceNames2.add("localhost_123" + i);
+ }
+ instanceCluster1.add(instanceNames2);
+ weights1.add(1);
+ ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(
+ instanceCluster1, weights1, partitions, replicas, dbName);
+
+ printDiff(result, result2);
+ printIdealStateStats(result2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
new file mode 100644
index 0000000..39bd5d2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorByShuffling.java
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+
+
+/*
+ * Ideal state calculator for the cluster manager V1. The ideal state is
+ * calculated by randomly assign master partitions to storage nodes.
+ *
+ * Note that the following code is a native strategy and is for cluster manager V1 only. We will
+ * use the other algorithm to calculate the ideal state in future milestones.
+ *
+ *
+ * */
+
+public class IdealStateCalculatorByShuffling
+{
+ /*
+ * Given the number of nodes, partitions and replica number, calculate the
+ * ideal state in the following manner: For the master partition assignment,
+ * 1. construct Arraylist partitionList, with partitionList[i] = i; 2. Shuffle
+ * the partitions array 3. Scan the shuffled array, then assign
+ * partitionList[i] to node (i % nodes)
+ *
+ * for the slave partitions, simply put them in the node after the node that
+ * contains the master partition.
+ *
+ * The result of the method is a ZNRecord, which contains a list of maps; each
+ * map is from the name of nodes to either "MASTER" or "SLAVE".
+ */
+
+
+ public static ZNRecord calculateIdealState(List<String> instanceNames,
+ int partitions, int replicas, String dbName, long randomSeed)
+ {
+ return calculateIdealState(instanceNames, partitions, replicas, dbName, randomSeed, "MASTER", "SLAVE");
+ }
+ public static ZNRecord calculateIdealState(List<String> instanceNames,
+ int partitions, int replicas, String dbName, long randomSeed, String masterValue, String slaveValue)
+ {
+ if (instanceNames.size() <= replicas)
+ {
+ throw new IllegalArgumentException(
+ "Replicas must be less than number of storage nodes");
+ }
+
+ Collections.sort(instanceNames);
+
+ ZNRecord result = new ZNRecord(dbName);
+
+ List<Integer> partitionList = new ArrayList<Integer>(partitions);
+ for (int i = 0; i < partitions; i++)
+ {
+ partitionList.add(new Integer(i));
+ }
+ Random rand = new Random(randomSeed);
+ // Shuffle the partition list
+ Collections.shuffle(partitionList, rand);
+
+ for (int i = 0; i < partitionList.size(); i++)
+ {
+ int partitionId = partitionList.get(i);
+ Map<String, String> partitionAssignment = new TreeMap<String, String>();
+ int masterNode = i % instanceNames.size();
+ // the first in the list is the node that contains the master
+ partitionAssignment.put(instanceNames.get(masterNode), masterValue);
+
+ // for the jth replica, we put it on (masterNode + j) % nodes-th
+ // node
+ for (int j = 1; j <= replicas; j++)
+ {
+ int index = (masterNode + j * partitionList.size()) % instanceNames.size();
+ while(partitionAssignment.keySet().contains(instanceNames.get(index)))
+ {
+ index = (index +1) % instanceNames.size();
+ }
+ partitionAssignment
+ .put(instanceNames.get(index),
+ slaveValue);
+ }
+ String partitionName = dbName + "_" + partitionId;
+ result.setMapField(partitionName, partitionAssignment);
+ }
+ result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
+ return result;
+ }
+
+ public static ZNRecord calculateIdealState(List<String> instanceNames,
+ int partitions, int replicas, String dbName)
+ {
+ long randomSeed = 888997632;
+ // seed is a constant, so that the shuffle always give same result
+ return calculateIdealState(instanceNames, partitions, replicas, dbName,
+ randomSeed);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java
new file mode 100644
index 0000000..6e84439
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/IdealStateCalculatorForEspressoRelay.java
@@ -0,0 +1,101 @@
+package org.apache.helix.tools;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+
+public class IdealStateCalculatorForEspressoRelay
+{
+ public static IdealState calculateRelayIdealState(List<String> partitions, List<String> instances,
+ String resultRecordName, int replica, String firstValue, String restValue, String stateModelName)
+ {
+ Collections.sort(partitions);
+ Collections.sort(instances);
+ if(instances.size() % replica != 0)
+ {
+ throw new HelixException("Instances must be divided by replica");
+ }
+
+ IdealState result = new IdealState(resultRecordName);
+ result.setNumPartitions(partitions.size());
+ result.setReplicas("" + replica);
+ result.setStateModelDefRef(stateModelName);
+
+ int groups = instances.size() / replica;
+ int remainder = instances.size() % replica;
+
+ int remainder2 = partitions.size() % groups;
+ int storageNodeGroupSize = partitions.size() / groups;
+
+ for(int i = 0; i < groups; i++)
+ {
+ int relayStart = 0, relayEnd = 0, storageNodeStart = 0, storageNodeEnd = 0;
+ if(i < remainder)
+ {
+ relayStart = (replica + 1) * i;
+ relayEnd = (replica + 1) *(i + 1);
+ }
+ else
+ {
+ relayStart = (replica + 1) * remainder + replica * (i - remainder);
+ relayEnd = relayStart + replica;
+ }
+ //System.out.println("relay start :" + relayStart + " relayEnd:" + relayEnd);
+ if(i < remainder2)
+ {
+ storageNodeStart = (storageNodeGroupSize + 1) * i;
+ storageNodeEnd = (storageNodeGroupSize + 1) *(i + 1);
+ }
+ else
+ {
+ storageNodeStart = (storageNodeGroupSize + 1) * remainder2 + storageNodeGroupSize * (i - remainder2);
+ storageNodeEnd = storageNodeStart + storageNodeGroupSize;
+ }
+
+ //System.out.println("storageNodeStart :" + storageNodeStart + " storageNodeEnd:" + storageNodeEnd);
+ List<String> snBatch = partitions.subList(storageNodeStart, storageNodeEnd);
+ List<String> relayBatch = instances.subList(relayStart, relayEnd);
+
+ Map<String, List<String>> sublistFields = calculateSubIdealState(snBatch, relayBatch, replica);
+
+ result.getRecord().getListFields().putAll(sublistFields);
+ }
+
+ for(String snName : result.getRecord().getListFields().keySet())
+ {
+ Map<String, String> mapField = new TreeMap<String, String>();
+ List<String> relayCandidates = result.getRecord().getListField(snName);
+ mapField.put(relayCandidates.get(0), firstValue);
+ for(int i = 1; i < relayCandidates.size(); i++)
+ {
+ mapField.put(relayCandidates.get(i), restValue);
+ }
+ result.getRecord().getMapFields().put(snName, mapField);
+ }
+ System.out.println();
+ return result;
+ }
+
+ private static Map<String, List<String>> calculateSubIdealState(
+ List<String> snBatch, List<String> relayBatch, int replica)
+ {
+ Map<String, List<String>> result = new HashMap<String, List<String>>();
+ for(int i = 0; i < snBatch.size(); i++)
+ {
+ String snName = snBatch.get(i);
+ result.put(snName, new ArrayList<String>());
+ for(int j = 0; j < replica; j++)
+ {
+ result.get(snName).add(relayBatch.get((j + i) % (relayBatch.size())));
+ }
+ }
+ return result;
+ }
+}