You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[33/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
new file mode 100644
index 0000000..627f3d5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
@@ -0,0 +1,179 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDistributedCMMain extends ZkIntegrationTestBase
+{
+
+ @Test
+ public void testDistributedCMMain() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterNamePrefix = className + "_" + methodName;
+ final int n = 5;
+ final int clusterNb = 10;
+
+ System.out.println("START " + clusterNamePrefix + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // setup 10 clusters
+ for (int i = 0; i < clusterNb; i++)
+ {
+ String clusterName = clusterNamePrefix + "0_" + i;
+ String participantName = "localhost" + i;
+ String resourceName = "TestDB" + i;
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ participantName, // participant name prefix
+ resourceName, // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+ }
+
+ // setup controller cluster
+ final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+ TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0, // controller
+ // port
+ "controller", // participant name prefix
+ clusterNamePrefix, // resource name prefix
+ 1, // resources
+ clusterNb, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "LeaderStandby",
+ true); // do rebalance
+
+ // start distributed cluster controllers
+ ClusterController[] controllers = new ClusterController[n + n];
+ for (int i = 0; i < n; i++)
+ {
+ controllers[i] =
+ new ClusterController(controllerClusterName,
+ "controller_" + i,
+ ZK_ADDR,
+ HelixControllerMain.DISTRIBUTED.toString());
+ controllers[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName),
+ 30000);
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // start first cluster
+ MockParticipant[] participants = new MockParticipant[n];
+ final String firstClusterName = clusterNamePrefix + "0_0";
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost0_" + (12918 + i);
+ participants[i] =
+ new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+
+ // add more controllers to controller cluster
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ for (int i = 0; i < n; i++)
+ {
+ String controller = "controller:" + (n + i);
+ setupTool.addInstanceToCluster(controllerClusterName, controller);
+ }
+ setupTool.rebalanceStorageCluster(controllerClusterName, clusterNamePrefix + "0", 6);
+ for (int i = n; i < 2 * n; i++)
+ {
+ controllers[i] =
+ new ClusterController(controllerClusterName,
+ "controller_" + i,
+ ZK_ADDR,
+ HelixControllerMain.DISTRIBUTED.toString());
+ controllers[i].syncStart();
+ }
+
+
+ // verify controller cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName));
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // verify first cluster
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+
+ // stop controller_0-5
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ for (int i = 0; i < n; i++)
+ {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ String leaderName = leader.getId();
+ int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+ controllers[j].syncStop();
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName));
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+ }
+
+
+ // clean up
+ // wait for all zk callbacks done
+ System.out.println("Cleaning up...");
+ Thread.sleep(2000);
+ for (int i = 0; i < 5; i++)
+ {
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName));
+ controllers[i].syncStop();
+ }
+
+ // Thread.sleep(2000);
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterNamePrefix + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
new file mode 100644
index 0000000..ffb17b0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
@@ -0,0 +1,148 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDistributedClusterController extends ZkIntegrationTestBase
+{
+
+ @Test
+ public void testDistributedClusterController() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterNamePrefix = className + "_" + methodName;
+ final int n = 5;
+ final int clusterNb = 10;
+
+ System.out.println("START " + clusterNamePrefix + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // setup 10 clusters
+ for (int i = 0; i < clusterNb; i++)
+ {
+ String clusterName = clusterNamePrefix + "0_" + i;
+ String participantName = "localhost" + i;
+ String resourceName = "TestDB" + i;
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ participantName, // participant name prefix
+ resourceName, // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+ }
+
+ // setup controller cluster
+ final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
+ TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0, // controller
+ // port
+ "controller", // participant name prefix
+ clusterNamePrefix, // resource name prefix
+ 1, // resources
+ clusterNb, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "LeaderStandby",
+ true); // do rebalance
+
+ // start distributed cluster controllers
+ ClusterController[] controllers = new ClusterController[n];
+ for (int i = 0; i < n; i++)
+ {
+ controllers[i] =
+ new ClusterController(controllerClusterName,
+ "controller_" + i,
+ ZK_ADDR,
+ HelixControllerMain.DISTRIBUTED.toString());
+ controllers[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName),
+ 30000);
+ Assert.assertTrue(result, "Controller cluster NOT in ideal state");
+
+ // start first cluster
+ MockParticipant[] participants = new MockParticipant[n];
+ final String firstClusterName = clusterNamePrefix + "0_0";
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost0_" + (12918 + i);
+ participants[i] =
+ new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ firstClusterName));
+ Assert.assertTrue(result, "first cluster NOT in ideal state");
+
+
+ // stop current leader in controller cluster
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ String leaderName = leader.getId();
+ int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
+ controllers[j].syncStop();
+
+
+ // setup the second cluster
+ MockParticipant[] participants2 = new MockParticipant[n];
+ final String secondClusterName = clusterNamePrefix + "0_1";
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost1_" + (12918 + i);
+ participants2[i] =
+ new MockParticipant(secondClusterName, instanceName, ZK_ADDR, null);
+ participants2[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ secondClusterName));
+ Assert.assertTrue(result, "second cluster NOT in ideal state");
+
+ // clean up
+ // wait for all zk callbacks done
+ System.out.println("Cleaning up...");
+ Thread.sleep(1000);
+ for (int i = 0; i < 5; i++)
+ {
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ controllerClusterName));
+ controllers[i].syncStop();
+ }
+
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterNamePrefix + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
new file mode 100644
index 0000000..cd09d92
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
@@ -0,0 +1,519 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.store.PropertyJsonSerializer;
+import org.apache.helix.store.PropertyStoreException;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.tools.TestCommand;
+import org.apache.helix.tools.TestExecutor;
+import org.apache.helix.tools.TestTrigger;
+import org.apache.helix.tools.ZnodeOpArg;
+import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.helix.tools.TestCommand.NodeOpArg;
+import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+
+public class TestDriver
+{
+ private static Logger LOG = Logger.getLogger(TestDriver.class);
+ private static final String ZK_ADDR = ZkIntegrationTestBase.ZK_ADDR;
+
+ // private static final String CLUSTER_PREFIX = "TestDriver";
+ private static final String STATE_MODEL = "MasterSlave";
+ private static final String TEST_DB_PREFIX = "TestDB";
+ private static final int START_PORT = 12918;
+ private static final String CONTROLLER_PREFIX = "controller";
+ private static final String PARTICIPANT_PREFIX = "localhost";
+ private static final Random RANDOM = new Random();
+ private static final PropertyJsonSerializer<ZNRecord> SERIALIZER = new PropertyJsonSerializer<ZNRecord>(
+ ZNRecord.class);
+
+ private static final Map<String, TestInfo> _testInfoMap = new ConcurrentHashMap<String, TestInfo>();
+
+ public static class TestInfo
+ {
+ public final ZkClient _zkClient;
+ public final String _clusterName;
+ public final int _numDb;
+ public final int _numPartitionsPerDb;
+ public final int _numNode;
+ public final int _replica;
+
+ // public final Map<String, ZNRecord> _idealStateMap = new
+ // ConcurrentHashMap<String, ZNRecord>();
+ public final Map<String, StartCMResult> _startCMResultMap = new ConcurrentHashMap<String, StartCMResult>();
+
+ public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
+ int numNode, int replica)
+ {
+ this._clusterName = clusterName;
+ this._zkClient = zkClient;
+ this._numDb = numDb;
+ this._numPartitionsPerDb = numPartitionsPerDb;
+ this._numNode = numNode;
+ this._replica = replica;
+ }
+ }
+
+ public static TestInfo getTestInfo(String uniqClusterName)
+ {
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+ String errMsg = "Cluster hasn't been setup for " + uniqClusterName;
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+ return testInfo;
+ }
+
+ public static void setupClusterWithoutRebalance(String uniqClusterName, String zkAddr,
+ int numResources, int numPartitionsPerResource, int numInstances, int replica)
+ throws Exception
+ {
+ setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
+ replica, false);
+ }
+
+ public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
+ int numPartitionsPerResource, int numInstances, int replica) throws Exception
+ {
+ setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
+ replica, true);
+ }
+
+ // public static void setupCluster(String uniqTestName, ZkClient zkClient, int
+ // numDb,
+ // int numPartitionPerDb, int numNodes, int replica, boolean doRebalance)
+ // throws Exception
+ public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
+ int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
+ throws Exception
+ {
+ ZkClient zkClient = new ZkClient(zkAddr);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // String clusterName = CLUSTER_PREFIX + "_" + uniqClusterName;
+ String clusterName = uniqClusterName;
+ if (zkClient.exists("/" + clusterName))
+ {
+ LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName
+ + " is not unique or test has been run without cleaning up zk; deleting it");
+ zkClient.deleteRecursive("/" + clusterName);
+ }
+
+ if (_testInfoMap.containsKey(uniqClusterName))
+ {
+ LOG.warn("test info already exists:" + uniqClusterName
+ + " is not unique or test has been run without cleaning up test info map; removing it");
+ _testInfoMap.remove(uniqClusterName);
+ }
+ TestInfo testInfo = new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource,
+ numInstances, replica);
+ _testInfoMap.put(uniqClusterName, testInfo);
+
+ ClusterSetup setupTool = new ClusterSetup(zkAddr);
+ setupTool.addCluster(clusterName, true);
+
+ for (int i = 0; i < numInstances; i++)
+ {
+ int port = START_PORT + i;
+ setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + ":" + port);
+ }
+
+ for (int i = 0; i < numResources; i++)
+ {
+ String dbName = TEST_DB_PREFIX + i;
+ setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource,
+ STATE_MODEL);
+ if (doRebalance)
+ {
+ setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+
+ // String idealStatePath = "/" + clusterName + "/" +
+ // PropertyType.IDEALSTATES.toString() + "/"
+ // + dbName;
+ // ZNRecord idealState = zkClient.<ZNRecord> readData(idealStatePath);
+ // testInfo._idealStateMap.put(dbName, idealState);
+ }
+ }
+ }
+
+ /**
+ * starting a dummy participant with a given id
+ *
+ * @param uniqueTestName
+ * @param instanceId
+ */
+ public static void startDummyParticipant(String uniqClusterName, int instanceId) throws Exception
+ {
+ startDummyParticipants(uniqClusterName, new int[] { instanceId });
+ }
+
+ public static void startDummyParticipants(String uniqClusterName, int[] instanceIds)
+ throws Exception
+ {
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+ String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+ String clusterName = testInfo._clusterName;
+
+ for (int id : instanceIds)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
+
+ if (testInfo._startCMResultMap.containsKey(instanceName))
+ {
+ LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
+ } else
+ {
+ StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
+ testInfo._startCMResultMap.put(instanceName, result);
+ // testInfo._instanceStarted.countDown();
+ }
+ }
+ }
+
+ public static void startController(String uniqClusterName) throws Exception
+ {
+ startController(uniqClusterName, new int[] { 0 });
+ }
+
+ public static void startController(String uniqClusterName, int[] nodeIds) throws Exception
+ {
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+ String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+ String clusterName = testInfo._clusterName;
+
+ for (int id : nodeIds)
+ {
+ String controllerName = CONTROLLER_PREFIX + "_" + id;
+ if (testInfo._startCMResultMap.containsKey(controllerName))
+ {
+ LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
+ } else
+ {
+ StartCMResult result = TestHelper.startController(clusterName, controllerName, ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ testInfo._startCMResultMap.put(controllerName, result);
+ }
+ }
+ }
+
+ public static void verifyCluster(String uniqClusterName, long beginTime, long timeout)
+ throws Exception
+ {
+ Thread.sleep(beginTime);
+
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+ String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+ String clusterName = testInfo._clusterName;
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName), timeout);
+ Assert.assertTrue(result);
+ }
+
+ public static void stopCluster(String uniqClusterName) throws Exception
+ {
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+ String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+ throw new IllegalArgumentException(errMsg);
+ }
+ TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
+
+ // stop controller first
+ for (Iterator<Entry<String, StartCMResult>> it = testInfo._startCMResultMap.entrySet()
+ .iterator(); it.hasNext();)
+ {
+ Map.Entry<String, StartCMResult> entry = it.next();
+ String instanceName = entry.getKey();
+ if (instanceName.startsWith(CONTROLLER_PREFIX))
+ {
+ it.remove();
+ HelixManager manager = entry.getValue()._manager;
+ manager.disconnect();
+ Thread thread = entry.getValue()._thread;
+ thread.interrupt();
+ }
+ }
+
+ Thread.sleep(1000);
+
+ // stop the rest
+ for (Map.Entry<String, StartCMResult> entry : testInfo._startCMResultMap.entrySet())
+ {
+ HelixManager manager = entry.getValue()._manager;
+ manager.disconnect();
+ Thread thread = entry.getValue()._thread;
+ thread.interrupt();
+ }
+
+ testInfo._zkClient.close();
+ }
+
+ public static void stopDummyParticipant(String uniqClusterName, long beginTime, int instanceId)
+ throws Exception
+ {
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+
+ String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+ throw new Exception(errMsg);
+ }
+
+ TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+ // String clusterName = testInfo._clusterName;
+
+ String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
+ StartCMResult result = testInfo._startCMResultMap.remove(failHost);
+
+ // TODO need sync
+ if (result == null || result._manager == null || result._thread == null)
+ {
+ String errMsg = "Dummy participant:" + failHost + " seems not running";
+ LOG.error(errMsg);
+ } else
+ {
+ // System.err.println("try to stop participant: " +
+ // result._manager.getInstanceName());
+ NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
+ TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
+ List<TestCommand> commandList = new ArrayList<TestCommand>();
+ commandList.add(command);
+ TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+ }
+ }
+
+ public static void setIdealState(String uniqClusterName, long beginTime, int percentage)
+ throws Exception
+ {
+ if (!_testInfoMap.containsKey(uniqClusterName))
+ {
+ String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
+ throw new IllegalArgumentException(errMsg);
+ }
+ TestInfo testInfo = _testInfoMap.get(uniqClusterName);
+ String clusterName = testInfo._clusterName;
+ List<String> instanceNames = new ArrayList<String>();
+
+ for (int i = 0; i < testInfo._numNode; i++)
+ {
+ int port = START_PORT + i;
+ instanceNames.add(PARTICIPANT_PREFIX + "_" + port);
+ }
+
+ List<TestCommand> commandList = new ArrayList<TestCommand>();
+ for (int i = 0; i < testInfo._numDb; i++)
+ {
+ String dbName = TEST_DB_PREFIX + i;
+ ZNRecord destIS = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames,
+ testInfo._numPartitionsPerDb, testInfo._replica - 1, dbName, "MASTER", "SLAVE");
+ // destIS.setId(dbName);
+ destIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
+ IdealStateModeProperty.CUSTOMIZED.toString());
+ destIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+ Integer.toString(testInfo._numPartitionsPerDb));
+ destIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
+ destIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
+ // String idealStatePath = "/" + clusterName + "/" +
+ // PropertyType.IDEALSTATES.toString() + "/"
+ // + TEST_DB_PREFIX + i;
+ ZNRecord initIS = new ZNRecord(dbName); // _zkClient.<ZNRecord>
+ // readData(idealStatePath);
+ initIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
+ IdealStateModeProperty.CUSTOMIZED.toString());
+ initIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+ Integer.toString(testInfo._numPartitionsPerDb));
+ initIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
+ initIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
+ int totalStep = calcuateNumTransitions(initIS, destIS);
+ // LOG.info("initIS:" + initIS);
+ // LOG.info("destIS:" + destIS);
+ // LOG.info("totalSteps from initIS to destIS:" + totalStep);
+ // System.out.println("initIS:" + initIS);
+ // System.out.println("destIS:" + destIS);
+
+ ZNRecord nextIS;
+ int step = totalStep * percentage / 100;
+ System.out.println("Resource:" + dbName + ", totalSteps from initIS to destIS:" + totalStep
+ + ", walk " + step + " steps(" + percentage + "%)");
+ nextIS = nextIdealState(initIS, destIS, step);
+ // testInfo._idealStateMap.put(dbName, nextIS);
+ String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName,
+ TEST_DB_PREFIX + i);
+ ZnodeOpArg arg = new ZnodeOpArg(idealStatePath, ZnodePropertyType.ZNODE, "+", nextIS);
+ TestCommand command = new TestCommand(CommandType.MODIFY, new TestTrigger(beginTime), arg);
+ commandList.add(command);
+ }
+
+ TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+
+ }
+
+ private static List<String[]> findAllUnfinishPairs(ZNRecord cur, ZNRecord dest)
+ {
+ // find all (host, resource) pairs that haven't reached destination state
+ List<String[]> list = new ArrayList<String[]>();
+ Map<String, Map<String, String>> map = dest.getMapFields();
+ for (Map.Entry<String, Map<String, String>> entry : map.entrySet())
+ {
+ String partitionName = entry.getKey();
+ Map<String, String> hostMap = entry.getValue();
+ for (Map.Entry<String, String> hostEntry : hostMap.entrySet())
+ {
+ String host = hostEntry.getKey();
+ String destState = hostEntry.getValue();
+ Map<String, String> curHostMap = cur.getMapField(partitionName);
+
+ String curState = null;
+ if (curHostMap != null)
+ {
+ curState = curHostMap.get(host);
+ }
+
+ String[] pair = new String[3];
+ if (curState == null)
+ {
+ if (destState.equalsIgnoreCase("SLAVE"))
+ {
+ pair[0] = new String(partitionName);
+ pair[1] = new String(host);
+ pair[2] = new String("1"); // number of transitions required
+ list.add(pair);
+ } else if (destState.equalsIgnoreCase("MASTER"))
+ {
+ pair[0] = new String(partitionName);
+ pair[1] = new String(host);
+ pair[2] = new String("2"); // number of transitions required
+ list.add(pair);
+ }
+ } else
+ {
+ if (curState.equalsIgnoreCase("SLAVE") && destState.equalsIgnoreCase("MASTER"))
+ {
+ pair[0] = new String(partitionName);
+ pair[1] = new String(host);
+ pair[2] = new String("1"); // number of transitions required
+ list.add(pair);
+ }
+ }
+ }
+ }
+ return list;
+ }
+
+ private static int calcuateNumTransitions(ZNRecord start, ZNRecord end)
+ {
+ int totalSteps = 0;
+ List<String[]> list = findAllUnfinishPairs(start, end);
+ for (String[] pair : list)
+ {
+ totalSteps += Integer.parseInt(pair[2]);
+ }
+ return totalSteps;
+ }
+
+ private static ZNRecord nextIdealState(final ZNRecord cur, final ZNRecord dest, final int steps)
+ throws PropertyStoreException
+ {
+ // get a deep copy
+ ZNRecord next = SERIALIZER.deserialize(SERIALIZER.serialize(cur));
+ List<String[]> list = findAllUnfinishPairs(cur, dest);
+
+ // randomly pick up pairs that haven't reached destination state and
+ // progress
+ for (int i = 0; i < steps; i++)
+ {
+ int randomInt = RANDOM.nextInt(list.size());
+ String[] pair = list.get(randomInt);
+ String curState = null;
+ Map<String, String> curHostMap = next.getMapField(pair[0]);
+ if (curHostMap != null)
+ {
+ curState = curHostMap.get(pair[1]);
+ }
+ final String destState = dest.getMapField(pair[0]).get(pair[1]);
+
+ // TODO generalize it using state-model
+ if (curState == null && destState != null)
+ {
+ Map<String, String> hostMap = next.getMapField(pair[0]);
+ if (hostMap == null)
+ {
+ hostMap = new HashMap<String, String>();
+ }
+ hostMap.put(pair[1], "SLAVE");
+ next.setMapField(pair[0], hostMap);
+ } else if (curState.equalsIgnoreCase("SLAVE") && destState != null
+ && destState.equalsIgnoreCase("MASTER"))
+ {
+ next.getMapField(pair[0]).put(pair[1], "MASTER");
+ } else
+ {
+ LOG.error("fail to calculate the next ideal state");
+ }
+ curState = next.getMapField(pair[0]).get(pair[1]);
+ if (curState != null && curState.equalsIgnoreCase(destState))
+ {
+ list.remove(randomInt);
+ }
+ }
+
+ LOG.info("nextIS:" + next);
+ return next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
new file mode 100644
index 0000000..942ba28
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -0,0 +1,200 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDrop extends ZkIntegrationTestBase
+{
+ @Test
+ public void testDropErrorPartition() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ Map<String, Set<String>> errTransitions = new HashMap<String, Set<String>>();
+ errTransitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ errTransitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
+
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 0)
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ErrTransition(errTransitions));
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ }
+ participants[i].syncStart();
+ }
+
+ Map<String, Map<String, String>> errStateMap =
+ new HashMap<String, Map<String, String>>();
+ errStateMap.put("TestDB0", new HashMap<String, String>());
+ errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+ errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStateMap));
+ Assert.assertTrue(result);
+
+ // drop resource containing error partitions should not change partitions in error state
+ ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZK_ADDR, "--dropResource", clusterName, "TestDB0"});
+
+
+ // make sure TestDB0_4 and TestDB0_8 partitions stay in ERROR state
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStateMap));
+ Assert.assertTrue(result);
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDropSchemataResource() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // add schemata resource group
+ String command = "--zkSvr " + ZK_ADDR +" --addResource " + clusterName +
+ " schemata 1 STORAGE_DEFAULT_SM_SCHEMATA";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ command = "--zkSvr " + ZK_ADDR +" --rebalance " + clusterName +
+ " schemata 1";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // drop schemata resource group
+ System.out.println("Dropping schemata resource group...");
+ command = "--zkSvr " + ZK_ADDR +" --dropResource " + clusterName +
+ " schemata";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure schemata external view is empty
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ ExternalView extView = accessor.getProperty(keyBuilder.externalView("schemata"));
+ Assert.assertEquals(extView.getPartitionSet().size(), 0, "schemata externalView should be empty but was \"" + extView + "\"");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
new file mode 100644
index 0000000..6f91df5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ @Test()
+ public void testDropResource() throws Exception
+ {
+ // add a resource to be dropped
+ _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 6, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 3);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+ CLUSTER_NAME,
+ "MyDB",
+ TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+ "localhost_12920", "localhost_12921",
+ "localhost_12922"),
+ ZK_ADDR);
+ }
+
+ @Test()
+ public void testDropResourceWhileNodeDead() throws Exception
+ {
+ // add a resource to be dropped
+ _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB2", 16, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
+
+ boolean verifyResult = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(verifyResult);
+
+ String hostToKill = "localhost_12920";
+
+ _startCMResultMap.get(hostToKill)._manager.disconnect();
+ Thread.sleep(1000);
+ _startCMResultMap.get(hostToKill)._thread.interrupt();
+
+ String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+ CLUSTER_NAME,
+ "MyDB2",
+ TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+ /*"localhost_12920",*/ "localhost_12921",
+ "localhost_12922"),
+ ZK_ADDR);
+
+ StartCMResult result =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, hostToKill);
+ _startCMResultMap.put(hostToKill, result);
+
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
+ CLUSTER_NAME,
+ "MyDB2",
+ TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+ "localhost_12920", "localhost_12921",
+ "localhost_12922"),
+ ZK_ADDR);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java b/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java
new file mode 100644
index 0000000..c56bdc2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDynamicFileClusterManager.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDynamicFileClusterManager extends FileCMTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestDynamicFileClusterManager.class);
+
+ @Test()
+ public void testDynamicFileClusterManager()
+ throws Exception
+ {
+ System.out.println("RUN testDynamicFileClusterManager() at " + new Date(System.currentTimeMillis()));
+
+ // add a new db
+ _mgmtTool.addResource(CLUSTER_NAME, "MyDB", 6, STATE_MODEL);
+ rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewFileVerifier(ROOT_PATH, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ // drop db
+ _mgmtTool.dropResource(CLUSTER_NAME, "MyDB");
+
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateFile",
+ CLUSTER_NAME,
+ "MyDB",
+ TestHelper.<String>setOf("localhost_12918", "localhost_12919",
+ "localhost_12920", "localhost_12921",
+ "localhost_12922"),
+ _fileStore);
+
+ System.out.println("STOP testDynamicFileClusterManager() at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
new file mode 100644
index 0000000..926d0d5
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
@@ -0,0 +1,159 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase
+{
+ static
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ }
+
+ class EnablePartitionTransition extends MockTransition
+ {
+ int slaveToOfflineCnt = 0;
+ int offlineToSlave = 0;
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ HelixManager manager = context.getManager();
+ String clusterName = manager.getClusterName();
+
+ String instance = message.getTgtName();
+ String partitionName = message.getPartitionName();
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ if (instance.equals("localhost_12919") && partitionName.equals("TestDB0_0"))
+ {
+ if (fromState.equals("SLAVE") && toState.equals("OFFLINE"))
+ {
+ slaveToOfflineCnt++;
+
+ try
+ {
+ String command =
+ "--zkSvr " + ZK_ADDR + " --enablePartition true " + clusterName
+ + " localhost_12919 TestDB0 TestDB0_0";
+
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ else if (slaveToOfflineCnt > 0 && fromState.equals("OFFLINE") && toState.equals("SLAVE"))
+ {
+ offlineToSlave++;
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testEnablePartitionDuringDisable() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ EnablePartitionTransition transition = new EnablePartitionTransition();
+ MockParticipant[] participants = new MockParticipant[5];
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (instanceName.equals("localhost_12919"))
+ {
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ transition);
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ }
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable partitions
+ String command =
+ "--zkSvr " + ZK_ADDR + " --enablePartition false " + clusterName
+ + " localhost_12919 TestDB0 TestDB0_0";
+
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ // ensure we get 1 slaveToOffline and 1 offlineToSlave after disable partition
+ long startT = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startT < 1000) // retry in 1s
+ {
+ if (transition.slaveToOfflineCnt > 0 && transition.offlineToSlave > 0)
+ {
+ break;
+ }
+
+ Thread.sleep(10);
+ }
+ long endT = System.currentTimeMillis();
+ System.out.println("1 disable and re-enable took: " + (endT - startT) + "ms");
+ Assert.assertEquals(transition.slaveToOfflineCnt, 1, "should get 1 slaveToOffline transition");
+ Assert.assertEquals(transition.offlineToSlave, 1, "should get 1 offlineToSlave transition");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
new file mode 100644
index 0000000..3eb01dd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.ErrTransition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestErrorPartition extends ZkIntegrationTestBase
+{
+ @Test()
+ public void testErrorPartition() throws Exception
+ {
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ System.out.println("START testErrorPartition() at "
+ + new Date(System.currentTimeMillis()));
+ ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918,
+ "localhost",
+ "TestDB",
+ 1,
+ 10,
+ 5,
+ 3,
+ "MasterSlave",
+ true);
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 0)
+ {
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
+ {
+ {
+ put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+ }
+ };
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ new ErrTransition(errPartitions));
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ }
+ participants[i].syncStart();
+ // new Thread(participants[i]).start();
+ }
+
+ Map<String, Map<String, String>> errStates =
+ new HashMap<String, Map<String, String>>();
+ errStates.put("TestDB0", new HashMap<String, String>());
+ errStates.get("TestDB0").put("TestDB0_4", "localhost_12918");
+ boolean result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStates));
+ Assert.assertTrue(result);
+
+ Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>()
+ {
+ {
+ put("TestDB0_4", TestHelper.setOf("localhost_12918"));
+ }
+ };
+
+ // verify "TestDB0_0", "localhost_12918" is in ERROR state
+ TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
+
+ // disable a partition on a node with error state
+ tool.enablePartition(false, clusterName, "localhost_12918", "TestDB0", Arrays.asList("TestDB0_4"));
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStates));
+ Assert.assertTrue(result);
+
+ TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
+
+ // disable a node with error state
+ tool.enableInstance(clusterName, "localhost_12918", false);
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName,
+ errStates));
+ Assert.assertTrue(result);
+
+ // make sure after restart stale ERROR state is gone
+ tool.enablePartition(true, clusterName, "localhost_12918", "TestDB0", Arrays.asList("TestDB0_4"));
+ tool.enableInstance(clusterName, "localhost_12918", true);
+
+ participants[0].syncStop();
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR);
+ new Thread(participants[0]).start();
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ System.out.println("END testErrorPartition() at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java
new file mode 100644
index 0000000..a43cd34
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExpandCluster.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration;
+
+import java.util.Map;
+
+import org.apache.helix.TestEspressoStorageClusterIdealState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestExpandCluster extends ZkStandAloneCMTestBase
+{
+@Test
+ public void testExpandCluster() throws Exception
+ {
+ String DB2 = "TestDB2";
+ int partitions = 100;
+ int replica = 3;
+ _setupTool.addResourceToCluster(CLUSTER_NAME, DB2, partitions, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, DB2, replica, "keyX");
+
+ String DB3 = "TestDB3";
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, DB3, partitions, STATE_MODEL);
+
+ IdealState testDB0 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ IdealState testDB2 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB2);
+ IdealState testDB3 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB3);
+
+ for (int i = 0; i < 5; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (27960 + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ String command = "-zkSvr localhost:2183 -expandCluster " + CLUSTER_NAME;
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+
+ IdealState testDB0_1 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ IdealState testDB2_1 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB2);
+ IdealState testDB3_1 = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB3);
+
+ Map<String, Object> resultOld2 = ClusterSetup.buildInternalIdealState(testDB2);
+ Map<String, Object> result2 = ClusterSetup.buildInternalIdealState(testDB2_1);
+
+ TestEspressoStorageClusterIdealState.Verify(result2, partitions, replica - 1);
+
+ Double masterKeepRatio = 0.0, slaveKeepRatio = 0.0;
+ double[] result = TestEspressoStorageClusterIdealState.compareResult(resultOld2, result2);
+ masterKeepRatio = result[0];
+ slaveKeepRatio = result[1];
+ Assert.assertTrue(masterKeepRatio > 0.49 && masterKeepRatio < 0.51);
+
+ Assert.assertTrue(testDB3_1.getRecord().getListFields().size() == 0);
+
+ // partitions should stay as same
+ Assert.assertTrue(testDB0_1.getRecord().getListFields().keySet().containsAll(testDB0.getRecord().getListFields().keySet()));
+ Assert.assertTrue(testDB0_1.getRecord().getListFields().size() == testDB0.getRecord().getListFields().size());
+ Assert.assertTrue(testDB2_1.getRecord().getMapFields().keySet().containsAll(testDB2.getRecord().getMapFields().keySet()));
+ Assert.assertTrue(testDB2_1.getRecord().getMapFields().size() == testDB2.getRecord().getMapFields().size());
+ Assert.assertTrue(testDB3_1.getRecord().getMapFields().keySet().containsAll(testDB3.getRecord().getMapFields().keySet()));
+ Assert.assertTrue(testDB3_1.getRecord().getMapFields().size() == testDB3.getRecord().getMapFields().size());
+
+ Map<String, Object> resultOld = ClusterSetup.buildInternalIdealState(testDB0);
+ Map<String, Object> resultNew = ClusterSetup.buildInternalIdealState(testDB0_1);
+
+ result = TestEspressoStorageClusterIdealState.compareResult(resultOld, resultNew);
+ masterKeepRatio = result[0];
+ slaveKeepRatio = result[1];
+ Assert.assertTrue(masterKeepRatio > 0.49 && masterKeepRatio < 0.51);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java b/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java
new file mode 100644
index 0000000..3a4fcbe
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExternalCmd.java
@@ -0,0 +1,36 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.ExternalCommand;
+import org.apache.helix.ScriptTestHelper;
+import org.apache.helix.TestHelper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestExternalCmd
+{
+
+ @Test
+ public void testExternalCmd() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ ExternalCommand cmd = ScriptTestHelper.runCommandLineTest("dummy.sh");
+ String output = cmd.getStringOutput("UTF8");
+ int idx = output.indexOf("this is a dummy test for verify ExternalCommand works");
+ Assert.assertNotSame(idx, -1);
+
+ System.out.println("END " + testName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
new file mode 100644
index 0000000..63bd388
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
@@ -0,0 +1,89 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestExternalViewUpdates extends ZkIntegrationTestBase
+{
+ @Test
+ public void testExternalViewUpdates() throws Exception
+ {
+ System.out.println("START testExternalViewUpdates at "
+ + new Date(System.currentTimeMillis()));
+
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+ int resourceNb = 10;
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ resourceNb, // resources
+ 1, // partitions per resource
+ 5, // number of nodes
+ 1, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // need to verify that each ExternalView's version number is 2
+ Builder keyBuilder = new Builder(clusterName);
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ String parentPath = keyBuilder.externalViews().getPath();
+ List<String> childNames = accessor.getChildNames(parentPath, 0);
+
+ List<String> paths = new ArrayList<String>();
+ for (String name : childNames)
+ {
+ paths.add(parentPath + "/" + name);
+ }
+
+// Stat[] stats = accessor.getStats(paths);
+ for (String path : paths)
+ {
+ Stat stat = accessor.getStat(path, 0);
+ Assert.assertTrue(stat.getVersion() <= 2, "ExternalView should be updated at most 2 times");
+ }
+
+ // TODO: need stop controller and participants
+
+ System.out.println("END testExternalViewUpdates at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java b/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java
new file mode 100644
index 0000000..4deaf36
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFailOverPerf1kp.java
@@ -0,0 +1,52 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.ExternalCommand;
+import org.apache.helix.ScriptTestHelper;
+import org.apache.helix.TestHelper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestFailOverPerf1kp
+{
+ @Test
+ public void testFailOverPerf1kp() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ ExternalCommand cmd = ScriptTestHelper.runCommandLineTest("helix_random_kill_local_startzk.sh");
+ String output = cmd.getStringOutput("UTF8");
+ int i = getStateTransitionLatency(0, output);
+ int j = output.indexOf("ms", i);
+ long latency = Long.parseLong(output.substring(i, j));
+ System.out.println("startup latency: " + latency);
+
+ i = getStateTransitionLatency(i, output);
+ j = output.indexOf("ms", i);
+ latency = Long.parseLong(output.substring(i, j));
+ System.out.println("failover latency: " + latency);
+ Assert.assertTrue(latency < 800, "failover latency for 1k partition test should < 800ms");
+
+ System.out.println("END " + testName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ int getStateTransitionLatency(int start, String output)
+ {
+ final String pattern = "state transition latency: ";
+ int i = output.indexOf(pattern, start) + pattern.length();
+// String latencyStr = output.substring(i, j);
+// System.out.println(latencyStr);
+ return i;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java b/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java
new file mode 100644
index 0000000..b1b7aac
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFileBasedHelixManager.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.helix.ClusterView;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.file.StaticFileHelixManager;
+import org.apache.helix.tools.ClusterViewSerializer;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+// TODO remove this test
+public class TestFileBasedHelixManager
+{
+ @Test ()
+ public void testFileBasedHelixManager() throws Exception
+ {
+ List<StaticFileHelixManager.DBParam> dbParams = new ArrayList<StaticFileHelixManager.DBParam>();
+ dbParams.add(new StaticFileHelixManager.DBParam("BizFollow", 1));
+ dbParams.add(new StaticFileHelixManager.DBParam("BizProfile", 1));
+ dbParams.add(new StaticFileHelixManager.DBParam("EspressoDB", 10));
+ dbParams.add(new StaticFileHelixManager.DBParam("MailboxDB", 128));
+ dbParams.add(new StaticFileHelixManager.DBParam("MyDB", 8));
+ dbParams.add(new StaticFileHelixManager.DBParam("schemata", 1));
+ String[] nodesInfo =
+ { "localhost:8900", "localhost:8901", "localhost:8902", "localhost:8903",
+ "localhost:8904" };
+
+ String file = "/tmp/clusterView.json";
+ int replica = 0;
+ // ClusterViewSerializer serializer = new ClusterViewSerializer(file);
+ ClusterView view = StaticFileHelixManager.generateStaticConfigClusterView(nodesInfo, dbParams, replica);
+ view.setExternalView(new LinkedList<ZNRecord>());
+ ClusterViewSerializer.serialize(view, new File(file));
+ ClusterView restoredView = ClusterViewSerializer.deserialize(new File(file));
+
+ verifyClusterViews(view, restoredView);
+ }
+
+ public void verifyClusterViews(ClusterView view1, ClusterView view2)
+ {
+ AssertJUnit.assertEquals(view1.getPropertyLists().size(), view2.getPropertyLists().size());
+ AssertJUnit.assertEquals(view1.getExternalView().size(), view2.getExternalView().size());
+ AssertJUnit.assertEquals(view1.getMemberInstanceMap().size(), view2.getMemberInstanceMap().size());
+ AssertJUnit.assertEquals(view1.getInstances().size(), view2.getInstances().size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
new file mode 100644
index 0000000..f07e29d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestGroupMessage.java
@@ -0,0 +1,194 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestGroupMessage extends ZkIntegrationTestBase
+{
+ class TestZkChildListener implements IZkChildListener
+ {
+ int _maxNbOfChilds = 0;
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
+ {
+ System.out.println(parentPath + " has " + currentChilds.size() + " messages");
+ if (currentChilds.size() > _maxNbOfChilds)
+ {
+ _maxNbOfChilds = currentChilds.size();
+ }
+ }
+
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // enable group message
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setGroupMessageMode(true);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ // registry a message listener so we know how many message generated
+ TestZkChildListener listener = new TestZkChildListener();
+ _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ // a non-group-message run followed by a group-message-enabled run
+ @Test
+ public void testChangeGroupMessageMode() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // stop all participants
+ Thread.sleep(1000);
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ // enable group message
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setGroupMessageMode(true);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ // registry a message listener so we know how many message generated
+ TestZkChildListener listener = new TestZkChildListener();
+ _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
+
+ // restart all participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ Assert.assertTrue(listener._maxNbOfChilds <= 2, "Should get no more than 2 messages (O->S and S->M)");
+
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}