You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[32/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
new file mode 100644
index 0000000..5f93f8f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -0,0 +1,227 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestAddClusterV2 extends ZkIntegrationTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestAddClusterV2.class);
+
+ protected static final int CLUSTER_NR = 10;
+ protected static final int NODE_NR = 5;
+ protected static final int START_PORT = 12918;
+ protected static final String STATE_MODEL = "MasterSlave";
+ protected ClusterSetup _setupTool = null;
+ protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ protected static final String TEST_DB = "TestDB";
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CONTROLLER_CLUSTER;
+ if (_gZkClient.exists(namespace))
+ {
+ _gZkClient.deleteRecursive(namespace);
+ }
+
+ for (int i = 0; i < CLUSTER_NR; i++)
+ {
+ namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
+ if (_gZkClient.exists(namespace))
+ {
+ _gZkClient.deleteRecursive(namespace);
+ }
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+
+
+ // setup CONTROLLER_CLUSTER
+ _setupTool.addCluster(CONTROLLER_CLUSTER, true);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String controllerName = CONTROLLER_PREFIX + ":" + i;
+ _setupTool.addInstanceToCluster(CONTROLLER_CLUSTER, controllerName);
+ }
+
+
+ // setup cluster of clusters
+ for (int i = 0; i < CLUSTER_NR; i++)
+ {
+ String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
+ _setupTool.addCluster(clusterName, true);
+ _setupTool.activateCluster(clusterName, CONTROLLER_CLUSTER, true);
+ }
+
+ final String firstCluster = CLUSTER_PREFIX + "_" + CLASS_NAME + "_0";
+ setupStorageCluster(_setupTool, firstCluster, TEST_DB, 20, PARTICIPANT_PREFIX,
+ START_PORT, "MasterSlave", 3, true);
+
+ // start dummy participants for the first cluster
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ if (_startCMResultMap.get(instanceName) != null)
+ {
+ LOG.error("fail to start participant:" + instanceName
+ + "(participant with the same name already running");
+ }
+ else
+ {
+ StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster,
+ instanceName);
+ _startCMResultMap.put(instanceName, result);
+ }
+ }
+
+ // start distributed cluster controllers
+ for (int i = 0; i < 5; i++)
+ {
+ String controllerName = CONTROLLER_PREFIX + "_" + i;
+ if (_startCMResultMap.get(controllerName) != null)
+ {
+ LOG.error("fail to start controller:" + controllerName
+ + "(controller with the same name already running");
+ }
+ else
+ {
+ StartCMResult result = TestHelper.startController(CONTROLLER_CLUSTER,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.DISTRIBUTED);
+ _startCMResultMap.put(controllerName, result);
+ }
+ }
+
+ verifyClusters();
+ }
+
+ @Test
+ public void Test()
+ {
+
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception
+ {
+ System.out.println("AFTERCLASS " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ /**
+ * shutdown order:
+ * 1) pause the leader (optional)
+ * 2) disconnect all controllers
+ * 3) disconnect leader/disconnect participant
+ */
+ String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
+ // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
+
+ StartCMResult result;
+
+ Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
+
+ while (it.hasNext())
+ {
+ String instanceName = it.next().getKey();
+ if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX))
+ {
+ result = _startCMResultMap.get(instanceName);
+ result._manager.disconnect();
+ result._thread.interrupt();
+ it.remove();
+ }
+ verifyClusters();
+ }
+
+ result = _startCMResultMap.remove(leader);
+ result._manager.disconnect();
+ result._thread.interrupt();
+
+ it = _startCMResultMap.entrySet().iterator();
+ while (it.hasNext())
+ {
+ String instanceName = it.next().getKey();
+ result = _startCMResultMap.get(instanceName);
+ result._manager.disconnect();
+ result._thread.interrupt();
+ it.remove();
+ }
+
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+
+
+ /**
+ * verify the external view (against the best possible state)
+ * in the controller cluster and the first cluster
+ */
+ protected void verifyClusters()
+ {
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CONTROLLER_CLUSTER));
+ Assert.assertTrue(result);
+
+ result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_PREFIX + "_" + CLASS_NAME + "_0"));
+ Assert.assertTrue(result);
+ }
+
+ protected void pauseController(DataAccessor clusterDataAccessor)
+ {
+ clusterDataAccessor.setProperty(PropertyType.PAUSE, new PauseSignal("pause"));
+ }
+
+ protected void setupStorageCluster(ClusterSetup setupTool, String clusterName,
+ String dbName, int partitionNr, String prefix, int startPort, String stateModel, int replica, boolean rebalance)
+ {
+ setupTool.addResourceToCluster(clusterName, dbName, partitionNr, stateModel);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = prefix + ":" + (startPort + i);
+ setupTool.addInstanceToCluster(clusterName, instanceName);
+ }
+ if(rebalance)
+ {
+ setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
new file mode 100644
index 0000000..524cc1d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -0,0 +1,238 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase
+{
+ private static Logger LOG =
+ Logger.getLogger(TestAddNodeAfterControllerStart.class);
+ final String className = getShortClassName();
+
+ class ZkClusterManagerWithGetHandlers extends ZKHelixManager
+ {
+ public ZkClusterManagerWithGetHandlers(String clusterName,
+ String instanceName,
+ InstanceType instanceType,
+ String zkConnectString) throws Exception
+ {
+ super(clusterName, instanceName, instanceType, zkConnectString);
+ }
+
+ @Override
+ public List<CallbackHandler> getHandlers()
+ {
+ return super.getHandlers();
+ }
+
+ }
+
+ @Test
+ public void testStandalone() throws Exception
+ {
+ String clusterName = className + "_standalone";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ final int nodeNr = 5;
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918,
+ "localhost",
+ "TestDB",
+ 1,
+ 20,
+ nodeNr - 1,
+ 3,
+ "MasterSlave",
+ true);
+
+ MockParticipant[] participants = new MockParticipant[nodeNr];
+ for (int i = 0; i < nodeNr - 1; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ new Thread(participants[i]).start();
+ }
+
+ ZkClusterManagerWithGetHandlers controller =
+ new ZkClusterManagerWithGetHandlers(clusterName,
+ "controller_0",
+ InstanceType.CONTROLLER,
+ ZK_ADDR);
+ controller.connect();
+ boolean result;
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
+ result = checkHandlers(controller.getHandlers(), msgPath);
+ Assert.assertTrue(result);
+
+ _gSetupTool.addInstanceToCluster(clusterName, "localhost:12922");
+ _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
+
+ participants[nodeNr - 1] =
+ new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
+ new Thread(participants[nodeNr - 1]).start();
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12922");
+ result = checkHandlers(controller.getHandlers(), msgPath);
+ Assert.assertTrue(result);
+
+ // clean up
+ controller.disconnect();
+ for (int i = 0; i < nodeNr; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDistributed() throws Exception
+ {
+ String clusterName = className + "_distributed";
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ // setup grand cluster
+ TestHelper.setupCluster("GRAND_" + clusterName,
+ ZK_ADDR,
+ 0,
+ "controller",
+ null,
+ 0,
+ 0,
+ 1,
+ 0,
+ null,
+ true);
+
+ TestHelper.startController("GRAND_" + clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.DISTRIBUTED);
+
+ // setup cluster
+ _gSetupTool.addCluster(clusterName, true);
+ _gSetupTool.activateCluster(clusterName, "GRAND_" + clusterName, true); // addCluster2
+
+ boolean result;
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ "GRAND_" + clusterName));
+ Assert.assertTrue(result);
+
+ // add node/resource, and do rebalance
+ final int nodeNr = 2;
+ for (int i = 0; i < nodeNr - 1; i++)
+ {
+ int port = 12918 + i;
+ _gSetupTool.addInstanceToCluster(clusterName, "localhost:" + port);
+ }
+
+ _gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
+ _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
+
+ MockParticipant[] participants = new MockParticipant[nodeNr];
+ for (int i = 0; i < nodeNr - 1; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+ participants[i].syncStart();
+// new Thread(participants[i]).start();
+ }
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // check if controller_0 has message listener for localhost_12918
+ String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
+ int numberOfListeners = TestHelper.numberOfListeners(ZK_ADDR, msgPath);
+ // System.out.println("numberOfListeners(" + msgPath + "): " + numberOfListeners);
+ Assert.assertEquals(numberOfListeners, 2); // 1 of participant, and 1 of controller
+
+ _gSetupTool.addInstanceToCluster(clusterName, "localhost:12919");
+ _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
+
+ participants[nodeNr - 1] =
+ new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
+ participants[nodeNr - 1].syncStart();
+// new Thread(participants[nodeNr - 1]).start();
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ // check if controller_0 has message listener for localhost_12919
+ msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12919");
+ numberOfListeners = TestHelper.numberOfListeners(ZK_ADDR, msgPath);
+ // System.out.println("numberOfListeners(" + msgPath + "): " + numberOfListeners);
+ Assert.assertEquals(numberOfListeners, 2); // 1 of participant, and 1 of controller
+
+ // clean up
+ for (int i = 0; i < nodeNr; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ boolean checkHandlers(List<CallbackHandler> handlers, String path)
+ {
+// System.out.println(handlers.size() + " handlers: ");
+ for (CallbackHandler handler : handlers)
+ {
+// System.out.println(handler.getPath());
+ if (handler.getPath().equals(path))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // TODO: need to add a test case for ParticipantCodeRunner
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
new file mode 100644
index 0000000..a9b46fc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -0,0 +1,126 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockParticipant.MockMSModelFactory;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
+{
+ @Test
+ public void testAddStateModelFactoryAfterConnect() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // add a new idealState without registering message handling factory
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addResourceToCluster(clusterName, "TestDB1", 16, "MasterSlave");
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB1"));
+ idealState.setStateModelFactoryName("TestDB1_Factory");
+ accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
+ setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
+
+ // external view for TestDB1 should be empty
+ ExternalView extView = null;
+ long start = System.currentTimeMillis();
+ while (extView == null)
+ {
+ Thread.sleep(50);
+ extView = accessor.getProperty(keyBuilder.externalView("TestDB1"));
+
+ long now = System.currentTimeMillis();
+ if (now - start > 5000)
+ {
+ Assert.fail("Timeout waiting for an empty external view of TestDB1");
+ }
+ }
+ Assert.assertEquals(extView.getRecord().getMapFields().size(),
+ 0,
+ "External view for TestDB1 should be empty since TestDB1 is added without a state model factory");
+
+ // register "TestDB1_Factory" state model factory
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ for (int i = 0; i < n; i++)
+ {
+ participants[i].getManager()
+ .getStateMachineEngine()
+ .registerStateModelFactory("MasterSlave",
+ new MockMSModelFactory(),
+ "TestDB1_Factory");
+ }
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
new file mode 100644
index 0000000..ccedeac
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
@@ -0,0 +1,87 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase
+{
+ @Test
+ public void testAutoIsWithEmptyMap() throws Exception
+ {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "LeaderStandby", false); // do not rebalance
+
+ // calculate and set custom ideal state
+ String idealPath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
+ ZNRecord curIdealState = _gZkClient.readData(idealPath);
+
+ List<String> instanceNames = new ArrayList<String>(5);
+ for (int i = 0; i < 5; i++)
+ {
+ int port = 12918 + i;
+ instanceNames.add("localhost_" + port);
+ }
+ ZNRecord idealState = IdealStateCalculatorForStorageNode.calculateIdealState(instanceNames, 10,
+ 2, "TestDB0", "LEADER", "STANDBY");
+ // System.out.println(idealState);
+ // curIdealState.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.toString(),
+ // "CUSTOMIZED");
+ curIdealState.setSimpleField(IdealState.IdealStateProperty.REPLICAS.toString(), "3");
+
+ curIdealState.setListFields(idealState.getListFields());
+ _gZkClient.writeData(idealPath, curIdealState);
+
+ // start controller
+ TestHelper
+ .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[5];
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(
+ ZK_ADDR, clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
new file mode 100644
index 0000000..520daa3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -0,0 +1,213 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.manager.zk.ZKDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("START " + CLASS_NAME + " at "
+ + new Date(System.currentTimeMillis()));
+
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ String namespace = "/" + CLUSTER_NAME;
+ if (_zkClient.exists(namespace))
+ {
+ _zkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(ZK_ADDR);
+
+ // setup storage cluster
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL, IdealStateModeProperty.AUTO_REBALANCE+"");
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+ // start dummy participants
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ if (_startCMResultMap.get(instanceName) != null)
+ {
+ LOG.error("fail to start particpant:" + instanceName
+ + "(participant with same name already exists)");
+ }
+ else
+ {
+ StartCMResult result =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _startCMResultMap.put(instanceName, result);
+ }
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ StartCMResult startResult =
+ TestHelper.startController(CLUSTER_NAME,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ _startCMResultMap.put(controllerName, startResult);
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+
+ Assert.assertTrue(result);
+ }
+
+ @Test()
+ public void testAutoRebalance() throws Exception
+ {
+
+ // kill 1 node
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+ _startCMResultMap.get(instanceName)._manager.disconnect();
+ Thread.currentThread().sleep(1000);
+ _startCMResultMap.get(instanceName)._thread.interrupt();
+
+ //verifyBalanceExternalView();
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+
+ // add 2 nodes
+ for (int i = 0; i < 2; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (1000 + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+ StartCMResult resultx =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+ _startCMResultMap.put(storageNodeName, resultx);
+ }
+ Thread.sleep(1000);
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+ CLUSTER_NAME, TEST_DB));
+ Assert.assertTrue(result);
+ }
+
+ static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String masterState, int replica, int instances)
+ {
+ Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
+ for(String partitionName : externalView.getMapFields().keySet())
+ {
+ Map<String, String> assignmentMap = externalView.getMapField(partitionName);
+ //Assert.assertTrue(assignmentMap.size() >= replica);
+ for(String instance : assignmentMap.keySet())
+ {
+ if(assignmentMap.get(instance).equals(masterState))
+ {
+ if(!masterPartitionsCountMap.containsKey(instance))
+ {
+ masterPartitionsCountMap.put(instance, 0);
+ }
+ masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1);
+ }
+ }
+ }
+
+ int perInstancePartition = partitionCount / instances;
+
+ int totalCount = 0;
+ for(String instanceName : masterPartitionsCountMap.keySet())
+ {
+ int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+ totalCount += instancePartitionCount;
+ if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition +1 ))
+ {
+ return false;
+ }
+ if(instancePartitionCount == perInstancePartition +1)
+ {
+ if(partitionCount % instances == 0)
+ {
+ return false;
+ }
+ }
+ }
+ if(partitionCount != totalCount)
+ {
+ return false;
+ }
+ return true;
+
+ }
+
+ public static class ExternalViewBalancedVerifier implements ZkVerifier
+ {
+ ZkClient _client;
+ String _clusterName;
+ String _resourceName;
+
+ public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
+ {
+ _client = client;
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ }
+ @Override
+ public boolean verify()
+ {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
+ Builder keyBuilder = accessor.keyBuilder();
+ int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.refresh(accessor);
+ String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
+ int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+ return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(), numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size());
+ }
+
+ @Override
+ public ZkClient getZkClient()
+ {
+ return _client;
+ }
+
+ @Override
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
new file mode 100644
index 0000000..f817a39
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
@@ -0,0 +1,61 @@
+package org.apache.helix.integration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestBasicSpectator extends ZkStandAloneCMTestBase implements ExternalViewChangeListener
+{
+ Map<String, Integer> _externalViewChanges = new HashMap<String, Integer>();
+
+ @Test
+ public void TestSpectator() throws Exception
+ {
+ HelixManager relayHelixManager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+ null,
+ InstanceType.SPECTATOR,
+ ZK_ADDR);
+
+ relayHelixManager.connect();
+ relayHelixManager.addExternalViewChangeListener(this);
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ Assert.assertTrue(_externalViewChanges.containsKey("NextDB"));
+ Assert.assertTrue(_externalViewChanges.containsKey(TEST_DB));
+
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+ for(ExternalView view : externalViewList)
+ {
+ if(!_externalViewChanges.containsKey(view.getResourceName()))
+ {
+ _externalViewChanges.put(view.getResourceName(), 1);
+ }
+ else
+ {
+ _externalViewChanges.put(view.getResourceName(), _externalViewChanges.get(view.getResourceName())+ 1);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
new file mode 100644
index 0000000..605ce63
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -0,0 +1,87 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestBucketizedResource extends ZkIntegrationTestBase
+{
+ @Test()
+ public void testBucketizedResource() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+
+ MockParticipant[] participants = new MockParticipant[5];
+// ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ // String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, "TestDB0");
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setBucketSize(1);
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
new file mode 100644
index 0000000..d523867
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.testng.annotations.Test;
+
+public class TestCMWithFailParticipant extends ZkIntegrationTestBase
+{
+ // ZkClient _zkClient;
+ //
+ // @BeforeClass ()
+ // public void beforeClass() throws Exception
+ // {
+ // _zkClient = new ZkClient(ZK_ADDR);
+ // _zkClient.setZkSerializer(new ZNRecordSerializer());
+ // }
+ //
+ //
+ // @AfterClass
+ // public void afterClass()
+ // {
+ // _zkClient.close();
+ // }
+
+ @Test()
+ public void testCMWithFailParticipant() throws Exception
+ {
+ int numResources = 1;
+ int numPartitionsPerResource = 10;
+ int numInstance = 5;
+ int replica = 3;
+
+ String uniqClusterName = "TestFail_" + "rg" + numResources + "_p" + numPartitionsPerResource
+ + "_n" + numInstance + "_r" + replica;
+ System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestDriver.setupCluster(uniqClusterName, ZK_ADDR, numResources, numPartitionsPerResource,
+ numInstance, replica);
+
+ for (int i = 0; i < numInstance; i++)
+ {
+ TestDriver.startDummyParticipant(uniqClusterName, i);
+ }
+ TestDriver.startController(uniqClusterName);
+
+ TestDriver.stopDummyParticipant(uniqClusterName, 2000, 0);
+ TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000);
+ TestDriver.stopCluster(uniqClusterName);
+
+ System.out.println("END " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
new file mode 100644
index 0000000..5321183
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestCarryOverBadCurState extends ZkIntegrationTestBase
+{
+ @Test
+ public void testCarryOverBadCurState() throws Exception
+ {
+ System.out.println("START testCarryOverBadCurState at "
+ + new Date(System.currentTimeMillis()));
+
+ String clusterName = getShortClassName();
+ MockParticipant[] participants = new MockParticipant[5];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // add a bad current state
+ ZNRecord badCurState = new ZNRecord("TestDB0");
+ String path = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, clusterName, "localhost_12918", "session_0", "TestDB0");
+ _gZkClient.createPersistent(path, true);
+ _gZkClient.writeData(path, badCurState);
+
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ System.out.println("END testCarryOverBadCurState at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
new file mode 100644
index 0000000..c6613f4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.HelixUtil;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestClusterStartsup extends ZkStandAloneCMTestBase
+{
+ void setupCluster() throws HelixException
+ {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_zkClient.exists(namespace))
+ {
+ _zkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(ZK_ADDR);
+
+ // setup storage cluster
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String storageNodeName = "localhost:" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+ }
+
+
+ @Override
+ @BeforeClass()
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ }
+
+ @Override
+ @AfterClass()
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ @Test()
+ public void testParticipantStartUp() throws Exception
+ {
+ setupCluster();
+ String controllerMsgPath = HelixUtil.getControllerPropertyPath(CLUSTER_NAME, PropertyType.MESSAGES_CONTROLLER);
+ _zkClient.deleteRecursive(controllerMsgPath);
+ HelixManager manager = null;;
+
+ try
+ {
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+ "localhost_" + (START_PORT + 1),
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ manager.connect();
+ Assert.fail("Should fail on connect() since cluster structure is not set up");
+ }
+ catch(HelixException e)
+ {
+ // OK
+ }
+
+ if(manager != null)
+ {
+ AssertJUnit.assertFalse(manager.isConnected());
+ }
+
+ try
+ {
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+ "localhost_" + (START_PORT + 3),
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ manager.connect();
+ Assert.fail("Should fail on connect() since cluster structure is not set up");
+ }
+ catch(HelixException e)
+ {
+ // OK
+ }
+
+ if(manager != null)
+ {
+ AssertJUnit.assertFalse(manager.isConnected());
+ }
+
+ setupCluster();
+ String stateModelPath = HelixUtil.getStateModelDefinitionPath(CLUSTER_NAME);
+ _zkClient.deleteRecursive(stateModelPath);
+
+ try
+ {
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+ "localhost_" + (START_PORT + 1),
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ manager.connect();
+ Assert.fail("Should fail on connect() since cluster structure is not set up");
+ }
+ catch(HelixException e)
+ {
+ // OK
+ }
+ if(manager != null)
+ {
+ AssertJUnit.assertFalse(manager.isConnected());
+ }
+
+ setupCluster();
+ String instanceStatusUpdatePath = HelixUtil.getInstancePropertyPath(CLUSTER_NAME, "localhost_" + (START_PORT + 1), PropertyType.STATUSUPDATES);
+ _zkClient.deleteRecursive(instanceStatusUpdatePath);
+
+ try
+ {
+ manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+ "localhost_" + (START_PORT + 1),
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ manager.connect();
+ Assert.fail("Should fail on connect() since cluster structure is not set up");
+ }
+ catch(HelixException e)
+ {
+ // OK
+ }
+ if(manager != null)
+ {
+ AssertJUnit.assertFalse(manager.isConnected());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
new file mode 100644
index 0000000..90f72a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
@@ -0,0 +1,155 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestCustomIdealState extends ZkIntegrationTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestCustomIdealState.class);
+ ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ _zkClient.close();
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+
+ int numResources = 2;
+ int numPartitionsPerResource = 100;
+ int numInstance = 5;
+ int replica = 3;
+
+ String uniqClusterName = "TestCustomIS_" + "rg" + numResources + "_p" + numPartitionsPerResource
+ + "_n" + numInstance + "_r" + replica + "_basic";
+ System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, numResources,
+ numPartitionsPerResource, numInstance, replica);
+
+ for (int i = 0; i < numInstance; i++)
+ {
+ TestDriver.startDummyParticipant(uniqClusterName, i);
+ }
+ TestDriver.startController(uniqClusterName);
+
+ TestDriver.setIdealState(uniqClusterName, 2000, 50);
+ TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000);
+
+ TestDriver.stopCluster(uniqClusterName);
+
+ System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testNonAliveInstances() throws Exception
+ {
+ int numResources = 2;
+ int numPartitionsPerResource = 50;
+ int numInstance = 5;
+ int replica = 3;
+
+ String uniqClusterName = "TestCustomIS_" + "rg" + numResources + "_p" + numPartitionsPerResource
+ + "_n" + numInstance + "_r" + replica + "_nonalive";
+ System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, numResources,
+ numPartitionsPerResource, numInstance, replica);
+
+ for (int i = 0; i < numInstance / 2; i++)
+ {
+ TestDriver.startDummyParticipant(uniqClusterName, i);
+ }
+
+ TestDriver.startController(uniqClusterName);
+ TestDriver.setIdealState(uniqClusterName, 0, 100);
+
+ // wait some time for customized ideal state being populated
+ Thread.sleep(1000);
+
+ // start the rest of participants after ideal state is set
+ for (int i = numInstance / 2; i < numInstance; i++)
+ {
+ TestDriver.startDummyParticipant(uniqClusterName, i);
+ }
+
+ TestDriver.verifyCluster(uniqClusterName, 4000, 50 * 1000);
+
+ TestDriver.stopCluster(uniqClusterName);
+
+ System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
+ @Test()
+ public void testDrop() throws Exception
+ {
+ int numResources = 2;
+ int numPartitionsPerResource = 50;
+ int numInstance = 5;
+ int replica = 3;
+
+ String uniqClusterName = "TestCustomIS_" + "rg" + numResources + "_p" + numPartitionsPerResource
+ + "_n" + numInstance + "_r" + replica + "_drop";
+
+ System.out.println("START " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+ TestDriver.setupClusterWithoutRebalance(uniqClusterName, ZK_ADDR, numResources,
+ numPartitionsPerResource, numInstance, replica);
+
+ for (int i = 0; i < numInstance; i++)
+ {
+ TestDriver.startDummyParticipant(uniqClusterName, i);
+ }
+ TestDriver.startController(uniqClusterName);
+ TestDriver.setIdealState(uniqClusterName, 2000, 50);
+ TestDriver.verifyCluster(uniqClusterName, 3000, 50 * 1000);
+
+ // drop resource group
+ ClusterSetup setup = new ClusterSetup(ZK_ADDR);
+ setup.dropResourceFromCluster(uniqClusterName, "TestDB0");
+
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", uniqClusterName, "TestDB0",
+ TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",
+ "localhost_12921", "localhost_12922"), ZK_ADDR);
+
+ TestDriver.stopCluster(uniqClusterName);
+ System.out.println("STOP " + uniqClusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ // TODO add a test case that verify (in case of node failure) best possible
+ // state is a subset of ideal state
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
new file mode 100644
index 0000000..3a29560
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
@@ -0,0 +1,403 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+
+public class TestDisable extends ZkIntegrationTestBase
+{
+
+ @Test
+ public void testDisableNodeCustomIS() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+ String disableNode = "localhost_12918";
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // set ideal state to customized mode
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable localhost_12918
+ String command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName +
+ " " + disableNode + " false";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12918 is in OFFLINE state
+ Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+ Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+ expectInstanceStateMap.put(disableNode, "OFFLINE");
+ expectStateMap.put(".*", expectInstanceStateMap);
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+ Assert.assertTrue(result, disableNode + " should be in OFFLINE");
+
+ // re-enable localhost_12918
+ command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName +
+ " " + disableNode + " true";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12918 is NOT in OFFLINE state
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+ Assert.assertTrue(result, disableNode + " should NOT be in OFFLINE");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisableNodeAutoIS() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+ String disableNode = "localhost_12919";
+
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable localhost_12919
+ String command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName +
+ " " + disableNode + " false";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12919 is in OFFLINE state
+ Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+ Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+ expectInstanceStateMap.put(disableNode, "OFFLINE");
+ expectStateMap.put(".*", expectInstanceStateMap);
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+ Assert.assertTrue(result, disableNode + " should be in OFFLINE");
+
+ // re-enable localhost_12919
+ command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName +
+ " " + disableNode + " true";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12919 is NOT in OFFLINE state
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+ Assert.assertTrue(result, disableNode + " should NOT be in OFFLINE");
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisablePartitionCustomIS() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // set ideal state to customized mode
+ ZkBaseDataAccessor<ZNRecord> baseAccessor =
+ new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ Builder keyBuilder = accessor.keyBuilder();
+ IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+ idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable [TestDB0_0, TestDB0_5] on localhost_12919
+ String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + clusterName +
+ " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12918 is in OFFLINE state for [TestDB0_0, TestDB0_5]
+ Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+ Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+ expectInstanceStateMap.put("localhost_12919", "OFFLINE");
+ expectStateMap.put("TestDB0_0", expectInstanceStateMap);
+ expectStateMap.put("TestDB0_5", expectInstanceStateMap);
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+ Assert.assertTrue(result, "localhost_12919" + " should be in OFFLINE for [TestDB0_0, TestDB0_5]");
+
+
+ // re-enable localhost_12919 for [TestDB0_0, TestDB0_5]
+ command = "--zkSvr " + ZK_ADDR +" --enablePartition true " + clusterName +
+ " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12919 is NOT in OFFLINE state for [TestDB0_0, TestDB0_5]
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+ Assert.assertTrue(result, "localhost_12919" + " should NOT be in OFFLINE");
+
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDisablePartitionAutoIS() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // disable [TestDB0_0, TestDB0_5] on localhost_12919
+ String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + clusterName +
+ " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12918 is in OFFLINE state for [TestDB0_0, TestDB0_5]
+ Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
+ Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
+ expectInstanceStateMap.put("localhost_12919", "OFFLINE");
+ expectStateMap.put("TestDB0_0", expectInstanceStateMap);
+ expectStateMap.put("TestDB0_5", expectInstanceStateMap);
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
+ Assert.assertTrue(result, "localhost_12919" + " should be in OFFLINE for [TestDB0_0, TestDB0_5]");
+
+
+ // re-enable localhost_12919 for [TestDB0_0, TestDB0_5]
+ command = "--zkSvr " + ZK_ADDR +" --enablePartition true " + clusterName +
+ " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure localhost_12919 is NOT in OFFLINE state for [TestDB0_0, TestDB0_5]
+ result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
+ Assert.assertTrue(result, "localhost_12919" + " should NOT be in OFFLINE");
+
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
new file mode 100644
index 0000000..ff0c535
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+
+ @Test()
+ public void testDisableNode() throws Exception
+ {
+ String command = "-zkSvr " + ZK_ADDR +" -enableInstance " + CLUSTER_NAME + " "+
+ PARTICIPANT_PREFIX + "_12918" + " TestDB TestDB_0 false";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ tool.enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_12918", true);
+
+ result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
new file mode 100644
index 0000000..0ce5ce0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
+
+ @Test()
+ public void testDisablePartition() throws Exception
+ {
+ LOG.info("START testDisablePartition() at " + new Date(System.currentTimeMillis()));
+
+ // localhost_12919 is MASTER for TestDB_0
+ String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + CLUSTER_NAME +
+ " localhost_12919 TestDB TestDB_0 TestDB_9";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ Map<String, Set<String>> map = new HashMap<String, Set<String>>();
+ map.put("TestDB_0", TestHelper.setOf("localhost_12919"));
+ map.put("TestDB_9", TestHelper.setOf("localhost_12919"));
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
+
+ ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ tool.enablePartition(true, CLUSTER_NAME, "localhost_12919", "TestDB", Arrays.asList("TestDB_9"));
+
+ result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ map.clear();
+ map.put("TestDB_0", TestHelper.setOf("localhost_12919"));
+ TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
+
+ map.clear();
+ map.put("TestDB_9", TestHelper.setOf("localhost_12919"));
+ TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "MASTER");
+
+ LOG.info("STOP testDisablePartition() at " + new Date(System.currentTimeMillis()));
+
+ }
+
+}