You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[27/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
new file mode 100644
index 0000000..0f53724
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -0,0 +1,1089 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkServer;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.file.FileDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.store.file.FilePropertyStore;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+
+
+public class TestHelper
+{
+ private static final Logger LOG = Logger.getLogger(TestHelper.class);
+
+ static public ZkServer startZkSever(final String zkAddress) throws Exception
+ {
+ List<String> empty = Collections.emptyList();
+ return TestHelper.startZkSever(zkAddress, empty);
+ }
+
+ static public ZkServer startZkSever(final String zkAddress, final String rootNamespace) throws Exception
+ {
+ List<String> rootNamespaces = new ArrayList<String>();
+ rootNamespaces.add(rootNamespace);
+ return TestHelper.startZkSever(zkAddress, rootNamespaces);
+ }
+
+ static public ZkServer startZkSever(final String zkAddress,
+ final List<String> rootNamespaces) throws Exception
+ {
+ System.out.println("Start zookeeper at " + zkAddress + " in thread "
+ + Thread.currentThread().getName());
+
+ String zkDir = zkAddress.replace(':', '_');
+ final String logDir = "/tmp/" + zkDir + "/logs";
+ final String dataDir = "/tmp/" + zkDir + "/dataDir";
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+ ZKClientPool.reset();
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+ {
+ @Override
+ public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
+ {
+ for (String rootNamespace : rootNamespaces)
+ {
+ try
+ {
+ zkClient.deleteRecursive(rootNamespace);
+ }
+ catch (Exception e)
+ {
+ LOG.error("fail to deleteRecursive path:" + rootNamespace + "\nexception:"
+ + e);
+ }
+ }
+ }
+ };
+
+ int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
+ ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ zkServer.start();
+
+ return zkServer;
+ }
+
+ static public void stopZkServer(ZkServer zkServer)
+ {
+ if (zkServer != null)
+ {
+ zkServer.shutdown();
+ System.out.println("Shut down zookeeper at port " + zkServer.getPort()
+ + " in thread " + Thread.currentThread().getName());
+ }
+ }
+
+ public static StartCMResult startDummyProcess(final String zkAddr,
+ final String clusterName,
+ final String instanceName) throws Exception
+ {
+ StartCMResult result = new StartCMResult();
+ HelixManager manager = null;
+ manager =
+ HelixManagerFactory.getZKHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ zkAddr);
+ result._manager = manager;
+ Thread thread = new Thread(new DummyProcessThread(manager, instanceName));
+ result._thread = thread;
+ thread.start();
+
+ return result;
+ }
+
+ // TODO refactor this
+ public static StartCMResult startController(final String clusterName,
+ final String controllerName,
+ final String zkConnectString,
+ final String controllerMode) throws Exception
+ {
+ final StartCMResult result = new StartCMResult();
+ final HelixManager manager =
+ HelixControllerMain.startHelixController(zkConnectString,
+ clusterName,
+ controllerName,
+ controllerMode);
+ result._manager = manager;
+
+ Thread thread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // ClusterManager manager = null;
+
+ try
+ {
+
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException e)
+ {
+ String msg =
+ "controller:" + controllerName + ", " + Thread.currentThread().getName()
+ + " interrupted";
+ LOG.info(msg);
+ // System.err.println(msg);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ thread.start();
+ result._thread = thread;
+ return result;
+ }
+
+ public static class StartCMResult
+ {
+ public Thread _thread;
+ public HelixManager _manager;
+
+ }
+
+ public static void setupEmptyCluster(ZkClient zkClient, String clusterName)
+ {
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+ admin.addCluster(clusterName, true);
+ }
+
+ /**
+ * convert T[] to set<T>
+ *
+ * @param s
+ * @return
+ */
+ public static <T> Set<T> setOf(T... s)
+ {
+ Set<T> set = new HashSet<T>(Arrays.asList(s));
+ return set;
+ }
+
+ public static void verifyWithTimeout(String verifierName, Object... args)
+ {
+ verifyWithTimeout(verifierName, 30 * 1000, args);
+ }
+
+ /**
+ * generic method for verification with a timeout
+ *
+ * @param verifierName
+ * @param args
+ */
+ public static void verifyWithTimeout(String verifierName, long timeout, Object... args)
+ {
+ final long sleepInterval = 1000; // in ms
+ final int loop = (int) (timeout / sleepInterval) + 1;
+ try
+ {
+ boolean result = false;
+ int i = 0;
+ for (; i < loop; i++)
+ {
+ Thread.sleep(sleepInterval);
+ // verifier should be static method
+ result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, args);
+
+ if (result == true)
+ {
+ break;
+ }
+ }
+
+ // debug
+ // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify ("
+ // + result + ")");
+ System.err.println(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify "
+ + " (" + result + ")");
+ LOG.debug("args:" + Arrays.toString(args));
+ // System.err.println("args:" + Arrays.toString(args));
+
+ if (result == false)
+ {
+ LOG.error(verifierName + " fails");
+ LOG.error("args:" + Arrays.toString(args));
+ }
+
+ Assert.assertTrue(result);
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private static Method getMethod(String name)
+ {
+ Method[] methods = TestHelper.class.getMethods();
+ for (Method method : methods)
+ {
+ if (name.equals(method.getName()))
+ {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ // for file-based cluster manager
+ public static boolean verifyEmptyCurStateFile(String clusterName,
+ String resourceName,
+ Set<String> instanceNames,
+ FilePropertyStore<ZNRecord> filePropertyStore)
+ {
+ DataAccessor accessor = new FileDataAccessor(filePropertyStore, clusterName);
+
+ for (String instanceName : instanceNames)
+ {
+ String path =
+ PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+ clusterName,
+ instanceName);
+ List<String> subPaths =
+ accessor.getChildNames(PropertyType.CURRENTSTATES, instanceName);
+
+ for (String previousSessionId : subPaths)
+ {
+ if (filePropertyStore.exists(path + "/" + previousSessionId + "/" + resourceName))
+ {
+ CurrentState previousCurrentState =
+ accessor.getProperty(CurrentState.class,
+ PropertyType.CURRENTSTATES,
+ instanceName,
+ previousSessionId,
+ resourceName);
+
+ if (previousCurrentState.getRecord().getMapFields().size() != 0)
+ {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ public static boolean verifyEmptyCurStateAndExtView(String clusterName,
+ String resourceName,
+ Set<String> instanceNames,
+ String zkAddr)
+ {
+ ZkClient zkClient = new ZkClient(zkAddr);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ try
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for (String instanceName : instanceNames)
+ {
+ List<String> sessionIds =
+ accessor.getChildNames(keyBuilder.sessions(instanceName));
+
+ for (String sessionId : sessionIds)
+ {
+ CurrentState curState =
+ accessor.getProperty(keyBuilder.currentState(instanceName,
+ sessionId,
+ resourceName));
+
+ if (curState != null && curState.getRecord().getMapFields().size() != 0)
+ {
+ return false;
+ }
+ }
+
+ ExternalView extView =
+ accessor.getProperty(keyBuilder.externalView(resourceName));
+
+ if (extView != null && extView.getRecord().getMapFields().size() != 0)
+ {
+ return false;
+ }
+
+ }
+
+ return true;
+ }
+ finally
+ {
+ zkClient.close();
+ }
+ }
+
+ public static boolean verifyNotConnected(HelixManager manager)
+ {
+ return !manager.isConnected();
+ }
+
+ public static void setupCluster(String clusterName,
+ String zkAddr,
+ int startPort,
+ String participantNamePrefix,
+ String resourceNamePrefix,
+ int resourceNb,
+ int partitionNb,
+ int nodesNb,
+ int replica,
+ String stateModelDef,
+ boolean doRebalance) throws Exception
+ {
+ TestHelper.setupCluster(clusterName,
+ zkAddr,
+ startPort,
+ participantNamePrefix,
+ resourceNamePrefix,
+ resourceNb,
+ partitionNb,
+ nodesNb,
+ replica,
+ stateModelDef,
+ IdealStateModeProperty.AUTO,
+ doRebalance);
+ }
+
+ public static void setupCluster(String clusterName,
+ String ZkAddr,
+ int startPort,
+ String participantNamePrefix,
+ String resourceNamePrefix,
+ int resourceNb,
+ int partitionNb,
+ int nodesNb,
+ int replica,
+ String stateModelDef,
+ IdealStateModeProperty mode,
+ boolean doRebalance) throws Exception
+ {
+ ZkClient zkClient = new ZkClient(ZkAddr);
+ if (zkClient.exists("/" + clusterName))
+ {
+ LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+ zkClient.deleteRecursive("/" + clusterName);
+ }
+
+ ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+ setupTool.addCluster(clusterName, true);
+
+ for (int i = 0; i < nodesNb; i++)
+ {
+ int port = startPort + i;
+ setupTool.addInstanceToCluster(clusterName, participantNamePrefix + ":" + port);
+ }
+
+ for (int i = 0; i < resourceNb; i++)
+ {
+ String dbName = resourceNamePrefix + i;
+ setupTool.addResourceToCluster(clusterName, dbName, partitionNb, stateModelDef, mode.toString());
+ if (doRebalance)
+ {
+ setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+ }
+ }
+ zkClient.close();
+ }
+
+ /**
+ *
+ * @param stateMap
+ * : "ResourceName/partitionKey" -> setOf(instances)
+ * @param state
+ * : MASTER|SLAVE|ERROR...
+ */
+ public static void verifyState(String clusterName,
+ String zkAddr,
+ Map<String, Set<String>> stateMap,
+ String state)
+ {
+ ZkClient zkClient = new ZkClient(zkAddr);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ try
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for (String resGroupPartitionKey : stateMap.keySet())
+ {
+ Map<String, String> retMap = getResourceAndPartitionKey(resGroupPartitionKey);
+ String resGroup = retMap.get("RESOURCE");
+ String partitionKey = retMap.get("PARTITION");
+
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView(resGroup));
+ for (String instance : stateMap.get(resGroupPartitionKey))
+ {
+ String actualState = extView.getStateMap(partitionKey).get(instance);
+ Assert.assertNotNull(actualState, "externalView doesn't contain state for "
+ + resGroup + "/" + partitionKey + " on " + instance + " (expect " + state
+ + ")");
+
+ Assert.assertEquals(actualState, state, "externalView for " + resGroup + "/"
+ + partitionKey + " on " + instance + " is " + actualState + " (expect "
+ + state + ")");
+ }
+ }
+ }
+ finally
+ {
+ zkClient.close();
+ }
+ }
+
+ /**
+ *
+ * @param resourcePartition
+ * : key is in form of "resource/partitionKey" or "resource_x"
+ *
+ * @return
+ */
+ private static Map<String, String> getResourceAndPartitionKey(String resourcePartition)
+ {
+ String resourceName;
+ String partitionName;
+ int idx = resourcePartition.indexOf('/');
+ if (idx > -1)
+ {
+ resourceName = resourcePartition.substring(0, idx);
+ partitionName = resourcePartition.substring(idx + 1);
+ }
+ else
+ {
+ idx = resourcePartition.lastIndexOf('_');
+ resourceName = resourcePartition.substring(0, idx);
+ partitionName = resourcePartition;
+ }
+
+ Map<String, String> retMap = new HashMap<String, String>();
+ retMap.put("RESOURCE", resourceName);
+ retMap.put("PARTITION", partitionName);
+ return retMap;
+ }
+
+ public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads,
+ final Callable<T> method,
+ final long timeout)
+ {
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+ final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+ final List<Thread> threadList = new ArrayList<Thread>();
+
+ for (int i = 0; i < nrThreads; i++)
+ {
+ Thread thread = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout)
+ {
+ LOG.error("Timeout while waiting for start latch");
+ }
+ }
+ catch (InterruptedException ex)
+ {
+ LOG.error("Interrupted while waiting for start latch");
+ }
+
+ try
+ {
+ T result = method.call();
+ if (result != null)
+ {
+ resultsMap.put("thread_" + this.getId(), result);
+ }
+ LOG.debug("result=" + result);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exeption in executing " + method.getClass().getName(), e);
+ }
+
+ finishCounter.countDown();
+ }
+ };
+ threadList.add(thread);
+ thread.start();
+ }
+ startLatch.countDown();
+
+ // wait for all thread to complete
+ try
+ {
+ boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout)
+ {
+ LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
+ for (Thread thread : threadList)
+ {
+ thread.interrupt();
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("Interrupted while waiting for finish latch", e);
+ }
+
+ return resultsMap;
+ }
+
+ public static Message createMessage(String msgId,
+ String fromState,
+ String toState,
+ String tgtName,
+ String resourceName,
+ String partitionName)
+ {
+ Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
+ msg.setFromState(fromState);
+ msg.setToState(toState);
+ msg.setTgtName(tgtName);
+ msg.setResourceName(resourceName);
+ msg.setPartitionName(partitionName);
+ msg.setStateModelDef("MasterSlave");
+
+ return msg;
+ }
+
+ public static int numberOfListeners(String zkAddr, String path) throws Exception
+ {
+ int count = 0;
+ String splits[] = zkAddr.split(":");
+ Socket sock = new Socket(splits[0], Integer.parseInt(splits[1]));
+ PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
+ BufferedReader in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+
+ out.println("wchp");
+
+ String line = in.readLine();
+ while (line != null)
+ {
+ // System.out.println(line);
+ if (line.equals(path))
+ {
+ // System.out.println("match: " + line);
+
+ String nextLine = in.readLine();
+ if (nextLine == null)
+ {
+ break;
+ }
+ // System.out.println(nextLine);
+ while (nextLine.startsWith("\t0x"))
+ {
+ count++;
+ nextLine = in.readLine();
+ if (nextLine == null)
+ {
+ break;
+ }
+ }
+ }
+ line = in.readLine();
+ }
+ sock.close();
+ return count;
+ }
+
+ public static String getTestMethodName()
+ {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ return calls[2].getMethodName();
+ }
+
+ public static String getTestClassName()
+ {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ String fullClassName = calls[2].getClassName();
+ return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+ }
+
+ public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods,
+ final long timeout)
+ {
+ final int nrThreads = methods.size();
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+ final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+ final List<Thread> threadList = new ArrayList<Thread>();
+
+ for (int i = 0; i < nrThreads; i++)
+ {
+ final Callable<T> method = methods.get(i);
+
+ Thread thread = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout)
+ {
+ LOG.error("Timeout while waiting for start latch");
+ }
+ }
+ catch (InterruptedException ex)
+ {
+ LOG.error("Interrupted while waiting for start latch");
+ }
+
+ try
+ {
+ T result = method.call();
+ if (result != null)
+ {
+ resultsMap.put("thread_" + this.getId(), result);
+ }
+ LOG.debug("result=" + result);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exeption in executing " + method.getClass().getName(), e);
+ }
+
+ finishCounter.countDown();
+ }
+ };
+ threadList.add(thread);
+ thread.start();
+ }
+ startLatch.countDown();
+
+ // wait for all thread to complete
+ try
+ {
+ boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout)
+ {
+ LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
+ for (Thread thread : threadList)
+ {
+ thread.interrupt();
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("Interrupted while waiting for finish latch", e);
+ }
+
+ return resultsMap;
+ }
+
+ public static void printCache(Map<String, ZNode> cache)
+ {
+ System.out.println("START:Print cache");
+ TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
+ map.putAll(cache);
+
+ for (String key : map.keySet())
+ {
+ ZNode node = map.get(key);
+ TreeSet<String> childSet = new TreeSet<String>();
+ childSet.addAll(node.getChildSet());
+ System.out.print(key + "=" + node.getData() + ", " + childSet + ", "
+ + (node.getStat() == null ? "null\n" : node.getStat()));
+ }
+ System.out.println("END:Print cache");
+ }
+
+ public static void readZkRecursive(String path,
+ Map<String, ZNode> map,
+ ZkClient zkclient)
+ {
+ try
+ {
+ Stat stat = new Stat();
+ ZNRecord record = zkclient.readData(path, stat);
+ List<String> childNames = zkclient.getChildren(path);
+ ZNode node = new ZNode(path, record, stat);
+ node.addChildren(childNames);
+ map.put(path, node);
+
+ for (String childName : childNames)
+ {
+ String childPath = path + "/" + childName;
+ readZkRecursive(childPath, map, zkclient);
+ }
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK
+ }
+ }
+
+ public static void readZkRecursive(String path,
+ Map<String, ZNode> map,
+ BaseDataAccessor<ZNRecord> zkAccessor)
+ {
+ try
+ {
+ Stat stat = new Stat();
+ ZNRecord record = zkAccessor.get(path, stat, 0);
+ List<String> childNames = zkAccessor.getChildNames(path, 0);
+ // System.out.println("childNames: " + childNames);
+ ZNode node = new ZNode(path, record, stat);
+ node.addChildren(childNames);
+ map.put(path, node);
+
+ if (childNames != null && !childNames.isEmpty())
+ {
+ for (String childName : childNames)
+ {
+ String childPath = path + "/" + childName;
+ readZkRecursive(childPath, map, zkAccessor);
+ }
+ }
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK
+ }
+ }
+
+ public static boolean verifyZkCache(List<String> paths,
+ BaseDataAccessor<ZNRecord> zkAccessor,
+ ZkClient zkclient,
+ boolean needVerifyStat)
+ {
+ // read everything
+ Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+ Map<String, ZNode> cache = new HashMap<String, ZNode>();
+ for (String path : paths)
+ {
+ readZkRecursive(path, zkMap, zkclient);
+ readZkRecursive(path, cache, zkAccessor);
+ }
+ // printCache(map);
+
+ return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
+ }
+
+ public static boolean verifyZkCache(List<String> paths,
+ Map<String, ZNode> cache,
+ ZkClient zkclient,
+ boolean needVerifyStat)
+ {
+ return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
+ }
+
+ public static boolean verifyZkCache(List<String> paths,
+ List<String> pathsExcludeForStat,
+ Map<String, ZNode> cache,
+ ZkClient zkclient,
+ boolean needVerifyStat)
+ {
+ // read everything on zk under paths
+ Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+ for (String path : paths)
+ {
+ readZkRecursive(path, zkMap, zkclient);
+ }
+ // printCache(map);
+
+ return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat);
+ }
+
+ public static boolean verifyZkCache(List<String> paths,
+ List<String> pathsExcludeForStat,
+ Map<String, ZNode> cache,
+ Map<String, ZNode> zkMap,
+ boolean needVerifyStat)
+ {
+ // equal size
+ if (zkMap.size() != cache.size())
+ {
+ System.err.println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: "
+ + zkMap.size());
+ System.out.println("cache: (" + cache.size() + ")");
+ TestHelper.printCache(cache);
+
+ System.out.println("zkMap: (" + zkMap.size() + ")");
+ TestHelper.printCache(zkMap);
+
+ return false;
+ }
+
+ // everything in cache is also in map
+ for (String path : cache.keySet())
+ {
+ ZNode cacheNode = cache.get(path);
+ ZNode zkNode = zkMap.get(path);
+
+ if (zkNode == null)
+ {
+ // in cache but not on zk
+ System.err.println("path: " + path + " in cache but not on zk: inCacheNode: "
+ + cacheNode);
+ return false;
+ }
+
+ if ((zkNode.getData() == null && cacheNode.getData() != null)
+ || (zkNode.getData() != null && cacheNode.getData() == null)
+ || (zkNode.getData() != null && cacheNode.getData() != null && !zkNode.getData()
+ .equals(cacheNode.getData())))
+ {
+ // data not equal
+ System.err.println("data mismatch on path: " + path + ", inCache: "
+ + cacheNode.getData() + ", onZk: " + zkNode.getData());
+ return false;
+ }
+
+ if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null)
+ || (zkNode.getChildSet() != null && cacheNode.getChildSet() == null)
+ || (zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet()
+ .equals(cacheNode.getChildSet())))
+ {
+ // childSet not equal
+ System.err.println("childSet mismatch on path: " + path + ", inCache: "
+ + cacheNode.getChildSet() + ", onZk: " + zkNode.getChildSet());
+ return false;
+ }
+
+ if (needVerifyStat && pathsExcludeForStat != null
+ && !pathsExcludeForStat.contains(path))
+ {
+ if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat()))
+ {
+ // stat not equal
+ System.err.println("Stat mismatch on path: " + path + ", inCache: "
+ + cacheNode.getStat() + ", onZk: " + zkNode.getStat());
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public static StateModelDefinition generateStateModelDefForBootstrap()
+ {
+ ZNRecord record = new ZNRecord("Bootstrap");
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "IDLE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("ONLINE");
+ statePriorityList.add("BOOTSTRAP");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("IDLE");
+ statePriorityList.add("DROPPED");
+ statePriorityList.add("ERROR");
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList)
+ {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("ONLINE"))
+ {
+ metadata.put("count", "R");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("BOOTSTRAP"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("OFFLINE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("IDLE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("DROPPED"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("ERROR"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ }
+
+ for (String state : statePriorityList)
+ {
+ String key = state + ".next";
+ if (state.equals("ONLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("BOOTSTRAP", "OFFLINE");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ metadata.put("IDLE", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("BOOTSTRAP"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "ONLINE");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ metadata.put("IDLE", "OFFLINE");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("OFFLINE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "BOOTSTRAP");
+ metadata.put("BOOTSTRAP", "BOOTSTRAP");
+ metadata.put("DROPPED", "IDLE");
+ metadata.put("IDLE", "IDLE");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("IDLE"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "OFFLINE");
+ metadata.put("BOOTSTRAP", "OFFLINE");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "DROPPED");
+ record.setMapField(key, metadata);
+ }
+ else if (state.equals("ERROR"))
+ {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("IDLE", "IDLE");
+ record.setMapField(key, metadata);
+ }
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("ONLINE-OFFLINE");
+ stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
+ stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
+ stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
+ stateTransitionPriorityList.add("OFFLINE-IDLE");
+ stateTransitionPriorityList.add("IDLE-OFFLINE");
+ stateTransitionPriorityList.add("IDLE-DROPPED");
+ stateTransitionPriorityList.add("ERROR-IDLED");
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return new StateModelDefinition(record);
+ }
+
+ public static String znrecordToString(ZNRecord record)
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(record.getId() + "\n");
+ Map<String, String> simpleFields = record.getSimpleFields();
+ if (simpleFields != null)
+ {
+ sb.append("simpleFields\n");
+ for (String key : simpleFields.keySet())
+ {
+ sb.append(" " + key + "\t: " + simpleFields.get(key) + "\n");
+ }
+ }
+
+ Map<String, List<String>> listFields = record.getListFields();
+ sb.append("listFields\n");
+ for (String key : listFields.keySet())
+ {
+ List<String> list = listFields.get(key);
+ sb.append(" " + key + "\t: ");
+ for (String listValue : list)
+ {
+ sb.append(listValue + ", ");
+ }
+ sb.append("\n");
+ }
+
+ Map<String, Map<String, String>> mapFields = record.getMapFields();
+ sb.append("mapFields\n");
+ for (String key : mapFields.keySet())
+ {
+ Map<String, String> map = mapFields.get(key);
+ sb.append(" " + key + "\t: \n");
+ for (String mapKey : map.keySet())
+ {
+ sb.append(" " + mapKey + "\t: " + map.get(mapKey) + "\n");
+ }
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
new file mode 100644
index 0000000..81ea6bc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.HierarchicalDataHolder;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import java.io.FileFilter;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestHierarchicalDataStore extends ZkUnitTestBase
+{
+ protected static ZkClient _zkClientString = null;
+
+ @Test (groups = {"unitTest"})
+ public void testHierarchicalDataStore()
+ {
+ _zkClientString = new ZkClient(ZK_ADDR, 1000, 3000);
+
+ String path = "/tmp/testHierarchicalDataStore";
+ FileFilter filter = null;
+ // _zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ _zkClientString.deleteRecursive(path);
+ HierarchicalDataHolder<ZNRecord> dataHolder = new HierarchicalDataHolder<ZNRecord>(
+ _zkClientString, path, filter);
+ dataHolder.print();
+ AssertJUnit.assertFalse(dataHolder.refreshData());
+
+ // write data
+ add(path, "root data");
+ AssertJUnit.assertTrue(dataHolder.refreshData());
+ dataHolder.print();
+
+ // add some children
+ add(path + "/child1", "child 1 data");
+ add(path + "/child2", "child 2 data");
+ AssertJUnit.assertTrue(dataHolder.refreshData());
+ dataHolder.print();
+
+ // add some grandchildren
+ add(path + "/child1" + "/grandchild1", "grand child 1 data");
+ add(path + "/child1" + "/grandchild2", "grand child 2 data");
+ AssertJUnit.assertTrue(dataHolder.refreshData());
+ dataHolder.print();
+
+ AssertJUnit.assertFalse(dataHolder.refreshData());
+
+ set(path + "/child1", "new child 1 data");
+ AssertJUnit.assertTrue(dataHolder.refreshData());
+ dataHolder.print();
+ }
+
+ private void set(String path, String data)
+ {
+ _zkClientString.writeData(path, data);
+ }
+
+ private void add(String path, String data)
+ {
+ _zkClientString.createPersistent(path, true);
+ _zkClientString.writeData(path, data);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
new file mode 100644
index 0000000..406f92f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.Mocks.MockHealthReportProvider;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.healthcheck.*;
+import org.testng.annotations.Test;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+
+public class TestParticipantHealthReportCollectorImpl {
+
+ protected ParticipantHealthReportCollectorImpl _providerImpl;
+ protected HelixManager _manager;
+ protected MockHealthReportProvider _mockProvider;
+
+ @BeforeMethod (groups = {"unitTest"})
+ public void setup()
+ {
+ _providerImpl = new ParticipantHealthReportCollectorImpl(new MockManager(), "instance_123");
+ _mockProvider = new MockHealthReportProvider();
+ }
+
+ @Test (groups = {"unitTest"})
+ public void testStart() throws Exception
+ {
+ _providerImpl.start();
+ _providerImpl.start();
+ }
+
+ @Test (groups = {"unitTest"})
+ public void testStop() throws Exception
+ {
+ _providerImpl.stop();
+ _providerImpl.stop();
+ }
+
+ @Test (groups = {"unitTest"})
+ public void testAddProvider() throws Exception
+ {
+ _providerImpl.removeHealthReportProvider(_mockProvider);
+ _providerImpl.addHealthReportProvider(_mockProvider);
+ _providerImpl.addHealthReportProvider(_mockProvider);
+ }
+
+ @Test (groups = {"unitTest"})
+ public void testRemoveProvider() throws Exception
+ {
+ _providerImpl.addHealthReportProvider(_mockProvider);
+ _providerImpl.removeHealthReportProvider(_mockProvider);
+ _providerImpl.removeHealthReportProvider(_mockProvider);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
new file mode 100644
index 0000000..54e96cf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestPerfCounters.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+
+import org.apache.helix.healthcheck.DefaultPerfCounters;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+
+public class TestPerfCounters {
+
+ final String INSTANCE_NAME = "instance_123";
+ final long AVAILABLE_CPUS = 1;
+ final long FREE_PHYSICAL_MEMORY = 2;
+ final long FREE_JVM_MEMORY = 3;
+ final long TOTAL_JVM_MEMORY = 4;
+ final double AVERAGE_SYSTEM_LOAD = 5;
+
+ DefaultPerfCounters _perfCounters;
+
+ @BeforeTest ()
+ public void setup()
+ {
+ _perfCounters = new DefaultPerfCounters(INSTANCE_NAME, AVAILABLE_CPUS,
+ FREE_PHYSICAL_MEMORY, FREE_JVM_MEMORY, TOTAL_JVM_MEMORY,
+ AVERAGE_SYSTEM_LOAD);
+ }
+
+ @Test ()
+ public void testGetAvailableCpus()
+ {
+ AssertJUnit.assertEquals(AVAILABLE_CPUS,_perfCounters.getAvailableCpus());
+ }
+
+ @Test ()
+ public void testGetAverageSystemLoad()
+ {
+ AssertJUnit.assertEquals(AVERAGE_SYSTEM_LOAD,_perfCounters.getAverageSystemLoad());
+ }
+
+ @Test ()
+ public void testGetTotalJvmMemory()
+ {
+ AssertJUnit.assertEquals(TOTAL_JVM_MEMORY,_perfCounters.getTotalJvmMemory());
+ }
+
+ @Test ()
+ public void testGetFreeJvmMemory()
+ {
+ AssertJUnit.assertEquals(FREE_JVM_MEMORY,_perfCounters.getFreeJvmMemory());
+ }
+
+ @Test ()
+ public void testGetFreePhysicalMemory()
+ {
+ AssertJUnit.assertEquals(FREE_PHYSICAL_MEMORY,_perfCounters.getFreePhysicalMemory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java b/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java
new file mode 100644
index 0000000..f0ea906
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestPerformanceHealthReportProvider.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Mocks.MockManager;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.PerformanceHealthReportProvider;
+import org.apache.helix.model.HealthStat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestPerformanceHealthReportProvider {
+
+ protected static final String CLUSTER_NAME = "TestCluster";
+ protected final String STAT_NAME = "Stat_123";
+ protected final String PARTITION_NAME = "Partition_456";
+ protected final String FAKE_STAT_NAME = "Stat_ABC";
+ protected final String FAKE_PARTITION_NAME = "Partition_DEF";
+ protected final String STORED_STAT = "789";
+ protected final String INSTANCE_NAME = "instance:1";
+
+ PerformanceHealthReportProvider _healthProvider;
+ MockManager _helixManager;
+
+ public void incrementPartitionStat() throws Exception
+ {
+ _helixManager = new MockManager(CLUSTER_NAME);
+ _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+ }
+
+ public void transmitReport() throws Exception
+ {
+ _helixManager = new MockManager(CLUSTER_NAME);
+ Map<String, Map<String, String>> partitionReport = _healthProvider
+ .getRecentPartitionHealthReport();
+ ZNRecord record = new ZNRecord(_healthProvider.getReportName());
+ if (partitionReport != null) {
+ record.setMapFields(partitionReport);
+ }
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.healthReport(INSTANCE_NAME, record.getId()), new HealthStat(record));
+ }
+
+ @BeforeMethod ()
+ public void setup()
+ {
+ _healthProvider = new PerformanceHealthReportProvider();
+ }
+
+ @Test ()
+ public void testGetRecentHealthReports() throws Exception
+ {
+ _healthProvider.getRecentHealthReport();
+ _healthProvider.getRecentPartitionHealthReport();
+ }
+
+ @Test ()
+ public void testIncrementPartitionStat() throws Exception
+ {
+ //stat does not exist yet
+ _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+ transmitReport();
+ //stat does exist
+ _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+ transmitReport();
+ String retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+ AssertJUnit.assertEquals(2.0, Double.parseDouble(retrievedStat));
+
+ //set to some other value
+ _healthProvider.submitPartitionStat(STAT_NAME, PARTITION_NAME, STORED_STAT);
+ transmitReport();
+ _healthProvider.incrementPartitionStat(STAT_NAME, PARTITION_NAME);
+ transmitReport();
+ retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+ AssertJUnit.assertEquals(Double.parseDouble(retrievedStat), Double.parseDouble(STORED_STAT)+1);
+ }
+
+ @Test ()
+ public void testSetGetPartitionStat() throws Exception
+ {
+ _healthProvider.submitPartitionStat(STAT_NAME, PARTITION_NAME, STORED_STAT);
+ transmitReport();
+ String retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+ //check on correct retrieval for real stat, real partition
+ AssertJUnit.assertEquals(STORED_STAT, retrievedStat);
+
+ //real stat, fake partition
+ retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, FAKE_PARTITION_NAME);
+ AssertJUnit.assertNull(retrievedStat);
+
+ //fake stat, real partition
+ retrievedStat = _healthProvider.getPartitionStat(FAKE_STAT_NAME, PARTITION_NAME);
+ AssertJUnit.assertNull(retrievedStat);
+
+ //fake stat, fake partition
+ retrievedStat = _healthProvider.getPartitionStat(FAKE_STAT_NAME, FAKE_PARTITION_NAME);
+ AssertJUnit.assertNull(retrievedStat);
+ }
+
+ @Test ()
+ public void testGetPartitionHealthReport() throws Exception
+ {
+ //test empty map case
+ Map<String, Map<String, String>> resultMap = _healthProvider.getRecentPartitionHealthReport();
+ AssertJUnit.assertEquals(resultMap.size(), 0);
+
+ //test non-empty case
+ testSetGetPartitionStat();
+ resultMap = _healthProvider.getRecentPartitionHealthReport();
+ //check contains 1 stat
+ AssertJUnit.assertEquals(1, resultMap.size());
+ //check contains STAT_NAME STAT
+ AssertJUnit.assertTrue(resultMap.keySet().contains(STAT_NAME));
+ Map<String, String> statMap = resultMap.get(STAT_NAME);
+ //check statMap has size 1
+ AssertJUnit.assertEquals(1, statMap.size());
+ //check contains PARTITION_NAME
+ AssertJUnit.assertTrue(statMap.keySet().contains(PARTITION_NAME));
+ //check stored val
+ String statVal = statMap.get(PARTITION_NAME);
+ AssertJUnit.assertEquals(statVal, STORED_STAT);
+ }
+
+ @Test ()
+ public void testPartitionStatReset() throws Exception
+ {
+ incrementPartitionStat();
+ //ensure stat appears
+ String retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+ AssertJUnit.assertEquals(1.0, Double.parseDouble(retrievedStat));
+ //reset partition stats
+ _healthProvider.resetStats();
+ transmitReport();
+ retrievedStat = _healthProvider.getPartitionStat(STAT_NAME, PARTITION_NAME);
+ AssertJUnit.assertEquals(null, retrievedStat);
+ }
+
+ @Test ()
+ public void testGetReportName() throws Exception
+ {
+ _healthProvider.getReportName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java
new file mode 100644
index 0000000..b585b34
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.testng.annotations.Test;
+import org.testng.AssertJUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+
+@Test
+public class TestPropertyPathConfig
+{
+ @Test
+ public void testGetPath()
+ {
+ String actual;
+ actual = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, "test_cluster");
+ AssertJUnit.assertEquals(actual, "/test_cluster/IDEALSTATES");
+ actual = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, "test_cluster","resource");
+ AssertJUnit.assertEquals(actual, "/test_cluster/IDEALSTATES/resource");
+
+
+ actual = PropertyPathConfig.getPath(PropertyType.INSTANCES, "test_cluster","instanceName1");
+ AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1");
+
+ actual = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, "test_cluster","instanceName1");
+ AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES");
+ actual = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, "test_cluster","instanceName1","sessionId");
+ AssertJUnit.assertEquals(actual, "/test_cluster/INSTANCES/instanceName1/CURRENTSTATES/sessionId");
+
+ actual = PropertyPathConfig.getPath(PropertyType.CONTROLLER, "test_cluster");
+ AssertJUnit.assertEquals(actual, "/test_cluster/CONTROLLER");
+ actual = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, "test_cluster");
+ AssertJUnit.assertEquals(actual, "/test_cluster/CONTROLLER/MESSAGES");
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java b/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
new file mode 100644
index 0000000..73850f2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestRelayIdealStateCalculator.java
@@ -0,0 +1,70 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.IdealStateCalculatorForEspressoRelay;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestRelayIdealStateCalculator
+{
+ @Test ()
+ public void testEspressoStorageClusterIdealState() throws Exception
+ {
+ testEspressoStorageClusterIdealState(15, 9, 3);
+ testEspressoStorageClusterIdealState(15, 6, 3);
+ testEspressoStorageClusterIdealState(15, 6, 2);
+ testEspressoStorageClusterIdealState(6, 4, 2);
+ }
+ public void testEspressoStorageClusterIdealState(int partitions, int nodes, int replica) throws Exception
+ {
+ List<String> storageNodes = new ArrayList<String>();
+ for(int i = 0;i < partitions; i++)
+ {
+ storageNodes.add("localhost:123" + i);
+ }
+
+ List<String> relays = new ArrayList<String>();
+ for(int i = 0;i < nodes; i++)
+ {
+ relays.add("relay:123" + i);
+ }
+
+ IdealState idealstate = IdealStateCalculatorForEspressoRelay.calculateRelayIdealState(storageNodes, relays, "TEST", replica, "Leader", "Standby", "LeaderStandby");
+
+ Assert.assertEquals(idealstate.getRecord().getListFields().size(), idealstate.getRecord().getMapFields().size());
+
+ Map<String, Integer> countMap = new TreeMap<String, Integer>();
+ for(String key : idealstate.getRecord().getListFields().keySet())
+ {
+ Assert.assertEquals(idealstate.getRecord().getListFields().get(key).size(), idealstate.getRecord().getMapFields().get(key).size());
+ List<String> list = idealstate.getRecord().getListFields().get(key);
+ Map<String, String> map = idealstate.getRecord().getMapFields().get(key);
+ Assert.assertEquals(list.size(), replica);
+ for(String val : list)
+ {
+ if(!countMap.containsKey(val))
+ {
+ countMap.put(val, 1);
+ }
+ else
+ {
+ countMap.put(val, countMap.get(val) + 1);
+ }
+ Assert.assertTrue(map.containsKey(val));
+ }
+ }
+ for(String nodeName : countMap.keySet())
+ {
+ Assert.assertTrue(countMap.get(nodeName) <= partitions * replica / nodes + 1);
+ //System.out.println(nodeName + " " + countMap.get(nodeName));
+ }
+ System.out.println();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
new file mode 100644
index 0000000..35eb5cb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
@@ -0,0 +1,289 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.Mocks.MockAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestRoutingTable
+{
+ NotificationContext changeContext = null;
+
+ @BeforeClass()
+ public synchronized void setup()
+ {
+
+ final String[] array = new String[] { "localhost_8900", "localhost_8901" };
+ HelixManager manager = new Mocks.MockManager() {
+ private MockAccessor _mockAccessor;
+
+ @Override
+// public DataAccessor getDataAccessor()
+ public HelixDataAccessor getHelixDataAccessor()
+ {
+ if (_mockAccessor == null)
+ {
+ _mockAccessor = new Mocks.MockAccessor() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
+// public List<ZNRecord> getChildValues(PropertyType type, String... keys)
+ {
+ PropertyType type = key.getType();
+ String[] keys = key.getParams();
+ if (type == PropertyType.CONFIGS && keys != null && keys.length > 1
+ && keys[1].equalsIgnoreCase(ConfigScopeProperty.PARTICIPANT.toString()))
+ {
+ List<InstanceConfig> configs = new ArrayList<InstanceConfig>();
+ for (String instanceName : array)
+ {
+ InstanceConfig config = new InstanceConfig(instanceName);
+ String[] splits = instanceName.split("_");
+ config.setHostName(splits[0]);
+ config.setPort(splits[1]);
+ configs.add(config);
+ }
+ return (List<T>) configs;
+ }
+ return Collections.emptyList();
+ };
+ };
+ }
+ return _mockAccessor;
+ }
+ };
+ changeContext = new NotificationContext(manager);
+ }
+
+ @Test()
+ public void testNullAndEmpty()
+ {
+
+ RoutingTableProvider routingTable = new RoutingTableProvider();
+ routingTable.onExternalViewChange(null, changeContext);
+ List<ExternalView> list = Collections.emptyList();
+ routingTable.onExternalViewChange(list, changeContext);
+
+ }
+
+ @Test()
+ public void testSimple()
+ {
+ List<InstanceConfig> instances;
+ RoutingTableProvider routingTable = new RoutingTableProvider();
+ ZNRecord record = new ZNRecord("TESTDB");
+
+ // one master
+ add(record, "TESTDB_0", "localhost_8900", "MASTER");
+ List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+ externalViewList.add(new ExternalView(record));
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+
+ instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+ AssertJUnit.assertNotNull(instances);
+ AssertJUnit.assertEquals(instances.size(), 1);
+
+ // additions
+ add(record, "TESTDB_0", "localhost_8901", "MASTER");
+ add(record, "TESTDB_1", "localhost_8900", "SLAVE");
+
+ externalViewList = new ArrayList<ExternalView>();
+ externalViewList.add(new ExternalView(record));
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+ AssertJUnit.assertNotNull(instances);
+ AssertJUnit.assertEquals(instances.size(), 2);
+
+ instances = routingTable.getInstances("TESTDB", "TESTDB_1", "SLAVE");
+ AssertJUnit.assertNotNull(instances);
+ AssertJUnit.assertEquals(instances.size(), 1);
+
+ // updates
+ add(record, "TESTDB_0", "localhost_8901", "SLAVE");
+ externalViewList = new ArrayList<ExternalView>();
+ externalViewList.add(new ExternalView(record));
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ instances = routingTable.getInstances("TESTDB", "TESTDB_0", "SLAVE");
+ AssertJUnit.assertNotNull(instances);
+ AssertJUnit.assertEquals(instances.size(), 1);
+ }
+
+ @Test()
+ public void testStateUnitGroupDeletion()
+ {
+ List<InstanceConfig> instances;
+ RoutingTableProvider routingTable = new RoutingTableProvider();
+
+ List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+ ZNRecord record = new ZNRecord("TESTDB");
+
+ // one master
+ add(record, "TESTDB_0", "localhost_8900", "MASTER");
+ externalViewList.add(new ExternalView(record));
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+ AssertJUnit.assertNotNull(instances);
+ AssertJUnit.assertEquals(instances.size(), 1);
+
+ externalViewList.clear();
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+ AssertJUnit.assertNotNull(instances);
+ AssertJUnit.assertEquals(instances.size(), 0);
+ }
+
+ @Test()
+ public void testGetInstanceForAllStateUnits()
+ {
+ List<InstanceConfig> instancesList;
+ Set<InstanceConfig> instancesSet;
+ InstanceConfig instancesArray[];
+ RoutingTableProvider routingTable = new RoutingTableProvider();
+ List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+ ZNRecord record = new ZNRecord("TESTDB");
+
+ // one master
+ add(record, "TESTDB_0", "localhost_8900", "MASTER");
+ add(record, "TESTDB_1", "localhost_8900", "MASTER");
+ add(record, "TESTDB_2", "localhost_8900", "MASTER");
+ add(record, "TESTDB_3", "localhost_8900", "SLAVE");
+ add(record, "TESTDB_4", "localhost_8900", "SLAVE");
+ add(record, "TESTDB_5", "localhost_8900", "SLAVE");
+
+ add(record, "TESTDB_0", "localhost_8901", "SLAVE");
+ add(record, "TESTDB_1", "localhost_8901", "SLAVE");
+ add(record, "TESTDB_2", "localhost_8901", "SLAVE");
+ add(record, "TESTDB_3", "localhost_8901", "MASTER");
+ add(record, "TESTDB_4", "localhost_8901", "MASTER");
+ add(record, "TESTDB_5", "localhost_8901", "MASTER");
+
+ externalViewList.add(new ExternalView(record));
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ instancesList = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
+ AssertJUnit.assertNotNull(instancesList);
+ AssertJUnit.assertEquals(instancesList.size(), 1);
+ instancesSet = routingTable.getInstances("TESTDB", "MASTER");
+ AssertJUnit.assertNotNull(instancesSet);
+ AssertJUnit.assertEquals(instancesSet.size(), 2);
+ instancesSet = routingTable.getInstances("TESTDB", "SLAVE");
+ AssertJUnit.assertNotNull(instancesSet);
+ AssertJUnit.assertEquals(instancesSet.size(), 2);
+ instancesArray = new InstanceConfig[instancesSet.size()];
+ instancesSet.toArray(instancesArray);
+ AssertJUnit.assertEquals(instancesArray[0].getHostName(), "localhost");
+ AssertJUnit.assertEquals(instancesArray[0].getPort(), "8900");
+ AssertJUnit.assertEquals(instancesArray[1].getHostName(), "localhost");
+ AssertJUnit.assertEquals(instancesArray[1].getPort(), "8901");
+ }
+
+ @Test()
+ public void testMultiThread() throws Exception
+ {
+ final RoutingTableProvider routingTable = new RoutingTableProvider();
+ List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+ ZNRecord record = new ZNRecord("TESTDB");
+ for (int i = 0; i < 1000; i++)
+ {
+ add(record, "TESTDB_" + i, "localhost_8900", "MASTER");
+ }
+ externalViewList.add(new ExternalView(record));
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ Callable<Boolean> runnable = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception
+ {
+
+ try
+ {
+ int count = 0;
+ while (count < 100)
+ {
+ List<InstanceConfig> instancesList = routingTable.getInstances("TESTDB", "TESTDB_0",
+ "MASTER");
+ AssertJUnit.assertEquals(instancesList.size(), 1);
+ // System.out.println(System.currentTimeMillis() + "-->"
+ // + instancesList.size());
+
+ Thread.sleep(5);
+
+ count++;
+ }
+ } catch (InterruptedException e)
+ {
+ // e.printStackTrace();
+ }
+ return true;
+ }
+ };
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ Future<Boolean> submit = executor.submit(runnable);
+ int count = 0;
+ while (count < 10)
+ {
+ try
+ {
+ Thread.sleep(10);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ routingTable.onExternalViewChange(externalViewList, changeContext);
+ count++;
+ }
+
+ Boolean result = submit.get(60, TimeUnit.SECONDS);
+ AssertJUnit.assertEquals(result, Boolean.TRUE);
+
+ }
+
+ private void add(ZNRecord record, String stateUnitKey, String instanceName, String state)
+ {
+ Map<String, String> stateUnitKeyMap = record.getMapField(stateUnitKey);
+ if (stateUnitKeyMap == null)
+ {
+ stateUnitKeyMap = new HashMap<String, String>();
+ record.setMapField(stateUnitKey, stateUnitKeyMap);
+ }
+ stateUnitKeyMap.put(instanceName, state);
+ record.setMapField(stateUnitKey, stateUnitKeyMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestSample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestSample.java b/helix-core/src/test/java/org/apache/helix/TestSample.java
new file mode 100644
index 0000000..0fe0099
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestSample.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ *
+ * @author kgopalak
+ *
+ */
+
+public class TestSample
+{
+
+ @Test ()
+ public final void testCallbackHandler()
+ {
+ String path = null;
+ Object listener = null;
+ EventType[] eventTypes = null;
+
+ }
+
+ @BeforeMethod ()
+ public void asd()
+ {
+ System.out.println("In Set up");
+ }
+
+ @Test ()
+ public void testB()
+ {
+ System.out.println("In method testB");
+
+ }
+
+ @Test ()
+ public void testA()
+ {
+ System.out.println("In method testA");
+
+ }
+
+ @AfterMethod ()
+ public void sfds()
+ {
+ System.out.println("In tear down");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java b/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java
new file mode 100644
index 0000000..f0d1309
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestShuffledIdealState.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.tools.IdealCalculatorByConsistentHashing;
+import org.apache.helix.tools.IdealStateCalculatorByRush;
+import org.apache.helix.tools.IdealStateCalculatorByShuffling;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestShuffledIdealState
+{
+ @Test ()
+ public void testInvocation() throws Exception
+ {
+ int partitions = 6, replicas = 2;
+ String dbName = "espressoDB1";
+ List<String> instanceNames = new ArrayList<String>();
+ instanceNames.add("localhost_1231");
+ instanceNames.add("localhost_1232");
+ instanceNames.add("localhost_1233");
+ instanceNames.add("localhost_1234");
+
+ ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+
+ ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(instanceNames, 1, partitions, replicas, dbName);
+
+ ZNRecord result3 = IdealCalculatorByConsistentHashing.calculateIdealState(instanceNames, partitions, replicas, dbName, new IdealCalculatorByConsistentHashing.FnvHash());
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result3, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result3, "SLAVE");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result3, "");
+ IdealCalculatorByConsistentHashing.printNodeOfflineOverhead(result3);
+
+ // System.out.println(result);
+ ObjectMapper mapper = new ObjectMapper();
+
+ // ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ StringWriter sw = new StringWriter();
+ try
+ {
+ mapper.writeValue(sw, result);
+ // System.out.println(sw.toString());
+
+ ZNRecord zn = mapper.readValue(new StringReader(sw.toString()),
+ ZNRecord.class);
+ System.out.println(result.toString());
+ System.out.println(zn.toString());
+ AssertJUnit.assertTrue(zn.toString().equalsIgnoreCase(result.toString()));
+ System.out.println();
+
+ sw= new StringWriter();
+ mapper.writeValue(sw, result2);
+
+ ZNRecord zn2 = mapper.readValue(new StringReader(sw.toString()),
+ ZNRecord.class);
+ System.out.println(result2.toString());
+ System.out.println(zn2.toString());
+ AssertJUnit.assertTrue(zn2.toString().equalsIgnoreCase(result2.toString()));
+
+ sw= new StringWriter();
+ mapper.writeValue(sw, result3);
+ System.out.println();
+
+ ZNRecord zn3 = mapper.readValue(new StringReader(sw.toString()),
+ ZNRecord.class);
+ System.out.println(result3.toString());
+ System.out.println(zn3.toString());
+ AssertJUnit.assertTrue(zn3.toString().equalsIgnoreCase(result3.toString()));
+ System.out.println();
+
+ } catch (JsonGenerationException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (JsonMappingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testShuffledIdealState()
+ {
+ // partitions is larger than nodes
+ int partitions = 6, replicas = 2, instances = 4;
+ String dbName = "espressoDB1";
+ List<String> instanceNames = new ArrayList<String>();
+ instanceNames.add("localhost_1231");
+ instanceNames.add("localhost_1232");
+ instanceNames.add("localhost_1233");
+ instanceNames.add("localhost_1234");
+
+ ZNRecord result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+ Assert.assertTrue(verify(result));
+
+ // partition is less than nodes
+ instanceNames.clear();
+ partitions = 4;
+ replicas = 3;
+ instances = 7;
+
+ for(int i = 0; i<instances; i++)
+ {
+ instanceNames.add("localhost_" + (1231 + i));
+ }
+ result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+ Assert.assertTrue(verify(result));
+
+ // partitions is multiple of nodes
+ instanceNames.clear();
+ partitions = 14;
+ replicas = 3;
+ instances = 7;
+
+ for(int i = 0; i<instances; i++)
+ {
+ instanceNames.add("localhost_" + (1231 + i));
+ }
+ result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+ Assert.assertTrue(verify(result));
+
+ // nodes are multiple of partitions
+ instanceNames.clear();
+ partitions = 4;
+ replicas = 3;
+ instances = 8;
+
+ for(int i = 0; i<instances; i++)
+ {
+ instanceNames.add("localhost_" + (1231 + i));
+ }
+ result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+ Assert.assertTrue(verify(result));
+
+ // nodes are multiple of partitions
+ instanceNames.clear();
+ partitions = 4;
+ replicas = 3;
+ instances = 12;
+
+ for(int i = 0; i<instances; i++)
+ {
+ instanceNames.add("localhost_" + (1231 + i));
+ }
+ result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+ Assert.assertTrue(verify(result));
+
+ // Just fits
+ instanceNames.clear();
+ partitions = 4;
+ replicas = 2;
+ instances = 12;
+
+ for(int i = 0; i<instances; i++)
+ {
+ instanceNames.add("localhost_" + (1231 + i));
+ }
+ result = IdealStateCalculatorByShuffling.calculateIdealState(
+ instanceNames, partitions, replicas, dbName);
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "MASTER");
+ IdealCalculatorByConsistentHashing.printIdealStateStats(result, "SLAVE");
+ Assert.assertTrue(verify(result));
+ }
+
+ boolean verify(ZNRecord result)
+ {
+ Map<String, Integer> masterPartitionCounts = new HashMap<String, Integer>();
+ Map<String, Integer> slavePartitionCounts = new HashMap<String, Integer>();
+
+ for(String key : result.getMapFields().keySet())
+ {
+ Map<String, String> mapField = result.getMapField(key);
+ int masterCount = 0;
+ for(String host: mapField.keySet())
+ {
+ if(mapField.get(host).equals("MASTER"))
+ {
+ Assert.assertTrue(masterCount == 0);
+ masterCount ++;
+ if(!masterPartitionCounts.containsKey(host))
+ {
+ masterPartitionCounts.put(host, 0);
+ }
+ else
+ {
+ masterPartitionCounts.put(host, masterPartitionCounts.get(host) + 1);
+ }
+ }
+ else
+ {
+ if(!slavePartitionCounts.containsKey(host))
+ {
+ slavePartitionCounts.put(host, 0);
+ }
+ else
+ {
+ slavePartitionCounts.put(host, slavePartitionCounts.get(host) + 1);
+ }
+ }
+ }
+ }
+
+ List<Integer> masterCounts = new ArrayList<Integer>();
+ List<Integer> slaveCounts = new ArrayList<Integer>();
+ masterCounts.addAll(masterPartitionCounts.values());
+ slaveCounts.addAll(slavePartitionCounts.values());
+ Collections.sort(masterCounts);
+ Collections.sort(slaveCounts);
+
+ Assert.assertTrue(masterCounts.get(masterCounts.size() - 1 ) - masterCounts.get(0) <= 1);
+
+ Assert.assertTrue(slaveCounts.get(slaveCounts.size() - 1 ) - slaveCounts.get(0) <= 2);
+ return true;
+ }
+}