You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[39/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java
deleted file mode 100644
index e30ccba..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestResetResource.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.ErrTransition;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestResetResource extends ZkIntegrationTestBase
-{
- @Test
- public void testResetNode() throws Exception
- {
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- final int n = 5;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- n, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // start controller
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
- {
- {
- put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
- put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
- }
- };
-
- // start mock participants
- MockParticipant[] participants = new MockParticipant[n];
- for (int i = 0; i < n; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- if (i == 0)
- {
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- new ErrTransition(errPartitions));
- }
- else
- {
- participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
- }
- participants[i].syncStart();
- }
-
- // verify cluster
- Map<String, Map<String, String>> errStateMap =
- new HashMap<String, Map<String, String>>();
- errStateMap.put("TestDB0", new HashMap<String, String>());
- errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
- errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
- boolean result =
- ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName,
- errStateMap)));
- Assert.assertTrue(result, "Cluster verification fails");
-
- // reset resource "TestDB0"
- participants[0].setTransition(null);
- String command =
- "--zkSvr " + ZK_ADDR + " --resetResource " + clusterName
- + " TestDB0";
- ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-
- result =
- ClusterStateVerifier.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName)));
- Assert.assertTrue(result, "Cluster verification fails");
-
- // clean up
- // wait for all zk callbacks done
- Thread.sleep(1000);
- controller.syncStop();
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java
deleted file mode 100644
index 8ea4e0b..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestRestartParticipant.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestRestartParticipant extends ZkIntegrationTestBase
-{
- public class KillOtherTransition extends MockTransition
- {
- final AtomicReference<MockParticipant> _other;
-
- public KillOtherTransition(MockParticipant other)
- {
- _other = new AtomicReference<MockParticipant>(other);
- }
-
- @Override
- public void doTransition(Message message, NotificationContext context)
- {
- MockParticipant other = _other.getAndSet(null);
- if (other != null)
- {
- System.err.println("Kill " + other.getInstanceName()
- + ". Interrupted exceptions are IGNORABLE");
- other.syncStop();
- }
- }
- }
-
- @Test()
- public void testRestartParticipant() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.INFO);
- System.out.println("START testRestartParticipant at "
- + new Date(System.currentTimeMillis()));
-
- String clusterName = getShortClassName();
- MockParticipant[] participants = new MockParticipant[5];
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- TestHelper.startController(clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- if (i == 4)
- {
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- new KillOtherTransition(participants[0]));
- }
- else
- {
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- null);
-// Thread.sleep(100);
- }
-
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // restart
- Thread.sleep(500);
- MockParticipant participant =
- new MockParticipant(participants[0].getClusterName(),
- participants[0].getInstanceName(),
- ZK_ADDR,
- null);
- System.err.println("Restart " + participant.getInstanceName());
- participant.syncStart();
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- System.out.println("START testRestartParticipant at "
- + new Date(System.currentTimeMillis()));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java
deleted file mode 100644
index 36eafff..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchedulerMessage.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.io.StringWriter;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.HelixTaskResult;
-import com.linkedin.helix.messaging.handling.MessageHandler;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.monitoring.ZKPathDataDumpTask;
-import com.linkedin.helix.util.HelixUtil;
-
-public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
-{
- TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
- public static class TestMessagingHandlerFactory implements
- MessageHandlerFactory
- {
- public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- return new TestMessagingHandler(message, context);
- }
-
- @Override
- public String getMessageType()
- {
- return "TestParticipant";
- }
-
- @Override
- public void reset()
- {
- // TODO Auto-generated method stub
-
- }
-
- public class TestMessagingHandler extends MessageHandler
- {
- public TestMessagingHandler(Message message, NotificationContext context)
- {
- super(message, context);
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- HelixTaskResult result = new HelixTaskResult();
- result.setSuccess(true);
- String destName = _message.getTgtName();
- synchronized (_results)
- {
- if (!_results.containsKey(_message.getPartitionName()))
- {
- _results.put(_message.getPartitionName(),
- new ConcurrentSkipListSet<String>());
- }
- }
- _results.get(_message.getPartitionName()).add(destName);
-
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- // TODO Auto-generated method stub
-
- }
- }
- }
-
- @Test()
- public void TestSchedulerMsg() throws Exception
- {
- _factory._results.clear();
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++)
- {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
- .randomUUID().toString());
- schedulerMessage.setTgtSessionId("*");
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
-
- // Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), "Template");
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate",
- msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- helixDataAccessor.createProperty(
- keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
- schedulerMessage);
-
- Thread.sleep(15000);
-
- Assert.assertEquals(_PARTITIONS, _factory._results.size());
- PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
- MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
- .getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
- .get("MessageCount").equals("" + (_PARTITIONS * 3)));
- int messageResultCount = 0;
- for(String key : statusUpdate.getMapFields().keySet())
- {
- if(key.startsWith("MessageResult "))
- {
- messageResultCount ++;
- }
- }
- Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
- int count = 0;
- for (Set<String> val : _factory._results.values())
- {
- count += val.size();
- }
- Assert.assertEquals(count, _PARTITIONS * 3);
-
- // test the ZkPathDataDumpTask
- String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(),
- PropertyType.STATUSUPDATES_CONTROLLER);
- List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
- for(String subPath : subPaths)
- {
- String nextPath = controllerStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
- Assert.assertTrue(subsubPaths.size() > 0);
- }
-
- String instanceStatusPath = HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
- PropertyType.STATUSUPDATES);
-
- subPaths = _zkClient.getChildren(instanceStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
- for(String subPath : subPaths)
- {
- String nextPath = instanceStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
- Assert.assertTrue(subsubPaths.size() > 0);
- for(String subsubPath : subsubPaths)
- {
- String nextnextPath = nextPath + "/" + subsubPath;
- Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
- }
- }
-
- ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
- dumpTask.run();
-
- subPaths = _zkClient.getChildren(controllerStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
- for(String subPath : subPaths)
- {
- String nextPath = controllerStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
- Assert.assertTrue(subsubPaths.size() == 0);
- }
-
- subPaths = _zkClient.getChildren(instanceStatusPath);
- Assert.assertTrue(subPaths.size() > 0);
- for(String subPath : subPaths)
- {
- String nextPath = instanceStatusPath + "/" + subPath;
- List<String> subsubPaths = _zkClient.getChildren(nextPath);
- Assert.assertTrue(subsubPaths.size() > 0);
- for(String subsubPath : subsubPaths)
- {
- String nextnextPath = nextPath + "/" + subsubPath;
- Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
- }
- }
- }
-
-
- @Test()
- public void TestSchedulerMsg2() throws Exception
- {
- _factory._results.clear();
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++)
- {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
- .randomUUID().toString());
- schedulerMessage.setTgtSessionId("*");
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
-
- // Template for the individual message sent to each participant
- Message msg = new Message(_factory.getMessageType(), "Template");
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_%");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate",
- msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
- schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
-
- Criteria cr2 = new Criteria();
- cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
- cr2.setInstanceName("*");
- cr2.setSessionSpecific(false);
-
- class MockAsyncCallback extends AsyncCallback
- {
- Message _message;
- public MockAsyncCallback()
- {
- }
-
- @Override
- public void onTimeOut()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- _message = message;
- }
-
- }
- MockAsyncCallback callback = new MockAsyncCallback();
- manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
- String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
-
- Assert.assertEquals(_PARTITIONS, _factory._results.size());
- PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
- MessageType.SCHEDULER_MSG.toString(), msgId);
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
- .getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
- .get("MessageCount").equals("" + (_PARTITIONS * 3)));
- int messageResultCount = 0;
- for(String key : statusUpdate.getMapFields().keySet())
- {
- if(key.startsWith("MessageResult "))
- {
- messageResultCount ++;
- }
- }
- Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
- int count = 0;
- for (Set<String> val : _factory._results.values())
- {
- count += val.size();
- }
- Assert.assertEquals(count, _PARTITIONS * 3);
- }
-
- @Test()
- public void TestSchedulerZeroMsg() throws Exception
- {
- TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
- HelixManager manager = null;
- for (int i = 0; i < NODE_NR; i++)
- {
- String hostDest = "localhost_" + (START_PORT + i);
- _startCMResultMap.get(hostDest)._manager.getMessagingService()
- .registerMessageHandlerFactory(factory.getMessageType(), factory);
- manager = _startCMResultMap.get(hostDest)._manager;
- }
-
- Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
- .randomUUID().toString());
- schedulerMessage.setTgtSessionId("*");
- schedulerMessage.setTgtName("CONTROLLER");
- // TODO: change it to "ADMIN" ?
- schedulerMessage.setSrcName("CONTROLLER");
-
- // Template for the individual message sent to each participant
- Message msg = new Message(factory.getMessageType(), "Template");
- msg.setTgtSessionId("*");
- msg.setMsgState(MessageState.NEW);
-
- // Criteria to send individual messages
- Criteria cr = new Criteria();
- cr.setInstanceName("localhost_DOESNOTEXIST");
- cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
- cr.setSessionSpecific(false);
- cr.setResource("%");
- cr.setPartition("%");
-
- ObjectMapper mapper = new ObjectMapper();
- SerializationConfig serializationConfig = mapper.getSerializationConfig();
- serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
- StringWriter sw = new StringWriter();
- mapper.writeValue(sw, cr);
-
- String crString = sw.toString();
-
- schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate",
- msg.getRecord().getSimpleFields());
- schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- PropertyKey controllerMessageKey = keyBuilder
- .controllerMessage(schedulerMessage.getMsgId());
- helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
-
- Thread.sleep(3000);
-
- Assert.assertEquals(0, factory._results.size());
- PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
- MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
- ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
- .getRecord();
- Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
- .get("MessageCount").equals("0"));
- int count = 0;
- for (Set<String> val : factory._results.values())
- {
- count += val.size();
- }
- Assert.assertEquals(count, 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java
deleted file mode 100644
index 5be3dab..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSchemataSM.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestSchemataSM extends ZkIntegrationTestBase
-{
- @Test
- public void testSchemataSM() throws Exception
- {
- String testName = "TestSchemataSM";
- String clusterName = testName;
-
- MockParticipant[] participants = new MockParticipant[5];
-// Logger.getRootLogger().setLevel(Level.INFO);
-
- System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
- // port
- "localhost", // participant name prefix
- "TestSchemata", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 1, // replicas
- "STORAGE_DEFAULT_SM_SCHEMATA",
- true); // do rebalance
-
- TestHelper.startController(clusterName,
- "controller_0",
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
-
- participants[i] =
- new MockParticipant(clusterName,
- instanceName,
- ZK_ADDR,
- null);
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java
deleted file mode 100644
index 73d7b7d..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSessionExpiryInTransition.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZkTestHelper;
-import com.linkedin.helix.ZkTestHelper.TestZkHelixManager;
-import com.linkedin.helix.mock.controller.ClusterController;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-
-public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
-{
-
- public class SessionExpiryTransition extends MockTransition
- {
- private final AtomicBoolean _done = new AtomicBoolean();
-
- @Override
- public void doTransition(Message message, NotificationContext context)
- {
- TestZkHelixManager manager = (TestZkHelixManager)context.getManager();
-
- String instance = message.getTgtName();
- String partition = message.getPartitionName();
- if (instance.equals("localhost_12918")
- && partition.equals("TestDB0_1") // TestDB0_1 is SLAVE on localhost_12918
- && _done.getAndSet(true) == false)
- {
- try
- {
- ZkTestHelper.expireSession(manager.getZkClient());
- }
- catch (Exception e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }
-
- // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
- // when there is pending messages
- // @Test
- public void testSessionExpiryInTransition() throws Exception
- {
- Logger.getRootLogger().setLevel(Level.WARN);
-
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- final String clusterName = className + "_" + methodName;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- MockParticipant[] participants = new MockParticipant[5];
-
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave",
- true); // do rebalance
-
- // start controller
- ClusterController controller =
- new ClusterController(clusterName, "controller_0", ZK_ADDR);
- controller.syncStart();
-
- // start participants
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
- TestZkHelixManager manager =
- new TestZkHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- ZK_ADDR);
- participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
- participants[i].syncStart();
- }
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // clean up
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- Thread.sleep(2000);
- controller.syncStop();
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java
deleted file mode 100644
index d205f89..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMMain.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestStandAloneCMMain extends ZkStandAloneCMTestBase
-{
- private static Logger logger = Logger.getLogger(TestStandAloneCMMain.class);
-
- @Test()
- public void testStandAloneCMMain() throws Exception
- {
- logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-
- for (int i = 1; i <= 2; i++)
- {
- String controllerName = "controller_" + i;
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME,
- controllerName,
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
- }
-
- stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- logger.info("STOP testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java
deleted file mode 100644
index d0339ea..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStandAloneCMSessionExpiry.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZkTestHelper;
-import com.linkedin.helix.ZkTestHelper.TestZkHelixManager;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase
-{
- private static Logger LOG = Logger.getLogger(TestStandAloneCMSessionExpiry.class);
-
- @Test()
- public void testStandAloneCMSessionExpiry() throws Exception
- {
- // Logger.getRootLogger().setLevel(Level.DEBUG);
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
-
- System.out.println("START " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- TestHelper.setupCluster(clusterName,
- ZK_ADDR,
- 12918,
- PARTICIPANT_PREFIX,
- "TestDB",
- 1,
- 20,
- 5,
- 3,
- "MasterSlave",
- true);
-
- MockParticipant[] participants = new MockParticipant[5];
- for (int i = 0; i < 5; i++)
- {
- String instanceName = "localhost_" + (12918 + i);
- TestZkHelixManager manager =
- new TestZkHelixManager(clusterName,
- instanceName,
- InstanceType.PARTICIPANT,
- ZK_ADDR);
- participants[i] = new MockParticipant(manager, null);
- participants[i].syncStart();
- }
-
- TestZkHelixManager controller =
- new TestZkHelixManager(clusterName,
- "controller_0",
- InstanceType.CONTROLLER,
- ZK_ADDR);
- controller.connect();
-
- boolean result;
- result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // participant session expiry
- TestZkHelixManager participantToExpire = (TestZkHelixManager)participants[1].getManager();
-
- System.out.println("Expire participant session");
- String oldSessionId = participantToExpire.getSessionId();
-
- ZkTestHelper.expireSession(participantToExpire.getZkClient());
- String newSessionId = participantToExpire.getSessionId();
- System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
- Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
-
- ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
- setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
- setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
-
- result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // controller session expiry
- System.out.println("Expire controller session");
- oldSessionId = controller.getSessionId();
- ZkTestHelper.expireSession(controller.getZkClient());
- newSessionId = controller.getSessionId();
- System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
- Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
-
- setupTool.addResourceToCluster(clusterName, "TestDB2", 8, "MasterSlave");
- setupTool.rebalanceStorageCluster(clusterName, "TestDB2", 3);
-
- result =
- ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- clusterName));
- Assert.assertTrue(result);
-
- // clean up
- System.out.println("Clean up ...");
- // Logger.getRootLogger().setLevel(Level.DEBUG);
- controller.disconnect();
- Thread.sleep(100);
- for (int i = 0; i < 5; i++)
- {
- participants[i].syncStop();
- }
-
- System.out.println("END " + clusterName + " at "
- + new Date(System.currentTimeMillis()));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java
deleted file mode 100644
index 9e5f380..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStateTransitionTimeout.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.mock.storage.MockJobIntf;
-import com.linkedin.helix.mock.storage.MockParticipant;
-import com.linkedin.helix.mock.storage.MockParticipant.MockMSStateModel;
-import com.linkedin.helix.mock.storage.MockTransition;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelFactory;
-import com.linkedin.helix.participant.statemachine.StateModelInfo;
-import com.linkedin.helix.participant.statemachine.StateTransitionError;
-import com.linkedin.helix.participant.statemachine.Transition;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-
-public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase
-{
- private static Logger LOG =
- Logger.getLogger(TestStateTransitionTimeout.class);
- @BeforeClass
- public void beforeClass() throws Exception
- {
- System.out.println("START " + CLASS_NAME + " at "
- + new Date(System.currentTimeMillis()));
-
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace))
- {
- _zkClient.deleteRecursive(namespace);
- }
- _setupTool = new ClusterSetup(ZK_ADDR);
-
- // setup storage cluster
- _setupTool.addCluster(CLUSTER_NAME, true);
- _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
-
- for (int i = 0; i < NODE_NR; i++)
- {
- String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
- _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
-
- // Set the timeout values
- IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
- String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + Message.Attributes.TIMEOUT;
- idealState.getRecord().setSimpleField(stateTransition, "300");
-
- String command = "-zkSvr " + ZK_ADDR + " -addResourceProperty "+ CLUSTER_NAME + " " + TEST_DB + " " + stateTransition + " 200";
- ClusterSetup.processCommandLineArgs(command.split(" "));
- }
-
- @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
- public static class TimeOutStateModel extends MockParticipant.MockMSStateModel
- {
- boolean _sleep = false;
- StateTransitionError _error;
- int _errorCallcount = 0;
- public TimeOutStateModel(MockTransition transition, boolean sleep)
- {
- super(transition);
- _sleep = sleep;
- }
-
- @Transition(to="SLAVE",from="OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
- {
- LOG.info("Become SLAVE from OFFLINE");
-
- }
-
- @Transition(to="MASTER",from="SLAVE")
- public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
- {
- LOG.info("Become MASTER from SLAVE");
- if (_transition != null && _sleep)
- {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to="SLAVE",from="MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
- {
- LOG.info("Become SLAVE from MASTER");
- }
-
- @Transition(to="OFFLINE",from="SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
- {
- LOG.info("Become OFFLINE from SLAVE");
-
- }
-
- @Transition(to="DROPPED",from="OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- {
- LOG.info("Become DROPPED from OFFLINE");
-
- }
-
- public void rollbackOnError(Message message, NotificationContext context,
- StateTransitionError error)
- {
- _error = error;
- _errorCallcount ++;
- }
- }
-
- public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel>
- {
- Set<String> partitionsToSleep = new HashSet<String>();
- int _sleepTime;
-
- public SleepStateModelFactory(int sleepTime)
- {
- _sleepTime = sleepTime;
- }
-
- public void setPartitions(Collection<String> partitions)
- {
- partitionsToSleep.addAll(partitions);
- }
-
- public void addPartition(String partition)
- {
- partitionsToSleep.add(partition);
- }
-
- @Override
- public TimeOutStateModel createNewStateModel(String stateUnitKey)
- {
- return new TimeOutStateModel(new MockParticipant.SleepTransition(_sleepTime), partitionsToSleep.contains(stateUnitKey));
- }
- }
-
- @Test
- public void testStateTransitionTimeOut() throws Exception
- {
- Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
- MockParticipant[] participants = new MockParticipant[NODE_NR];
- IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
- for (int i = 0; i < NODE_NR; i++)
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- SleepStateModelFactory factory = new SleepStateModelFactory(1000);
- factories.put(instanceName, factory);
- for(String p : idealState.getPartitionSet())
- {
- if(idealState.getPreferenceList(p).get(0).equals(instanceName))
- {
- factory.addPartition(p);
- }
- }
-
- participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
- participants[i].syncStart();
- }
- String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME,
- controllerName,
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
-
- Builder kb = accessor.keyBuilder();
- ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
- for(String p : idealState.getPartitionSet())
- {
- String idealMaster = idealState.getPreferenceList(p).get(0);
- Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
-
- TimeOutStateModel model = factories.get(idealMaster).getStateModel(p);
- Assert.assertEquals(model._errorCallcount , 1);
- Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java
deleted file mode 100644
index e1f0dd8..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestStatusUpdate.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.testng.Assert;
-
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.ExternalView;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class TestStatusUpdate extends ZkStandAloneCMTestBase
-{
- // For now write participant StatusUpdates to log4j.
- // TODO: Need to investigate another data channel to report to controller and re-enable
- // this test
- // @Test
- public void testParticipantStatusUpdates() throws Exception
- {
- ZkClient zkClient = new ZkClient(ZkIntegrationTestBase.ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
- Assert.assertNotNull(extViews);
-
- for (ExternalView extView : extViews)
- {
- String resourceName = extView.getResourceName();
- Set<String> partitionSet = extView.getPartitionSet();
- for (String partition : partitionSet)
- {
- Map<String, String> stateMap = extView.getStateMap(partition);
- for (String instance : stateMap.keySet())
- {
- String state = stateMap.get(instance);
- StatusUpdateUtil.StatusUpdateContents statusUpdates =
- StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor,
- instance,
- resourceName,
- partition);
-
- Map<String, StatusUpdateUtil.TaskStatus> taskMessages =
- statusUpdates.getTaskMessages();
- List<StatusUpdateUtil.Transition> transitions = statusUpdates.getTransitions();
- if (state.equals("MASTER"))
- {
- Assert.assertEquals(transitions.size() >= 2,
- true,
- "Invalid number of transitions");
- StatusUpdateUtil.Transition lastTransition =
- transitions.get(transitions.size() - 1);
- StatusUpdateUtil.Transition prevTransition =
- transitions.get(transitions.size() - 2);
- Assert.assertEquals(taskMessages.get(lastTransition.getMsgID()),
- StatusUpdateUtil.TaskStatus.COMPLETED,
- "Incomplete transition");
- Assert.assertEquals(taskMessages.get(prevTransition.getMsgID()),
- StatusUpdateUtil.TaskStatus.COMPLETED,
- "Incomplete transition");
- Assert.assertEquals(lastTransition.getFromState(), "SLAVE", "Invalid State");
- Assert.assertEquals(lastTransition.getToState(), "MASTER", "Invalid State");
- }
- else if (state.equals("SLAVE"))
- {
- Assert.assertEquals(transitions.size() >= 1,
- true,
- "Invalid number of transitions");
- StatusUpdateUtil.Transition lastTransition =
- transitions.get(transitions.size() - 1);
- Assert.assertEquals(lastTransition.getFromState().equals("MASTER")
- || lastTransition.getFromState().equals("OFFLINE"),
- true,
- "Invalid transition");
- Assert.assertEquals(lastTransition.getToState(), "SLAVE", "Invalid State");
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java
deleted file mode 100644
index accd057..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestSwapInstance.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.linkedin.helix.integration;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.manager.zk.ZKHelixAdmin;
-import com.linkedin.helix.model.IdealState;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-
-public class TestSwapInstance extends ZkStandAloneCMTestBase
-{
- @Test
- public void TestSwap() throws Exception
- {
- String controllerName = CONTROLLER_PREFIX + "_0";
- HelixManager manager = _startCMResultMap.get(controllerName)._manager;
- HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
- _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
-
-
- ZNRecord idealStateOld1 = new ZNRecord("TestDB");
- ZNRecord idealStateOld2 = new ZNRecord("MyDB");
-
- IdealState is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
- idealStateOld1.merge(is1.getRecord());
-
-
- IdealState is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
- idealStateOld2.merge(is2.getRecord());
-
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
- ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
- _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
-
- boolean result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444);
- _setupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2);
-
- boolean exception = false;
- try
- {
- _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
- }
- catch(Exception e)
- {
- exception = true;
- }
- Assert.assertTrue(exception);
-
- _startCMResultMap.get(instanceName)._manager.disconnect();
- _startCMResultMap.get(instanceName)._thread.interrupt();
- Thread.sleep(1000);
-
- exception = false;
- try
- {
- _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
- }
- catch(Exception e)
- {
- e.printStackTrace();
- exception = true;
- }
- Assert.assertFalse(exception);
- StartCMResult result2 =
- TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
- _startCMResultMap.put(instanceName2, result2);
-
- result = ClusterStateVerifier.verifyByPolling(
- new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
- Assert.assertTrue(result);
-
- is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
-
- is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
-
- for(String key : idealStateOld1.getMapFields().keySet())
- {
- for(String host : idealStateOld1.getMapField(key).keySet())
- {
- if(host.equals(instanceName))
- {
- Assert.assertTrue(
- idealStateOld1.getMapField(key).get(instanceName).equals(
- is1.getRecord().getMapField(key).get(instanceName2)));
- }
- else
- {
- Assert.assertTrue(
- idealStateOld1.getMapField(key).get(host).equals(
- is1.getRecord().getMapField(key).get(host)));
- }
- }
- }
-
- for(String key : idealStateOld1.getListFields().keySet())
- {
- Assert.assertEquals(idealStateOld1.getListField(key).size() , is1.getRecord().getListField(key).size());
- for(int i = 0; i < idealStateOld1.getListField(key).size(); i++)
- {
- String host = idealStateOld1.getListField(key).get(i);
- String newHost = is1.getRecord().getListField(key).get(i);
- if(host.equals(instanceName))
- {
- Assert.assertTrue(
- newHost.equals(instanceName2));
- }
- else
- {
- //System.out.println(key + " " + i+ " " + host + " "+newHost);
- //System.out.println(idealStateOld1.getListField(key));
- //System.out.println(is1.getRecord().getListField(key));
-
- Assert.assertTrue(host.equals(newHost));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java b/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java
deleted file mode 100644
index 1bd3387..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/TestZNRecordQueryProcessorWithZK.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package com.linkedin.helix.integration;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.josql.DataAccessorBasedTupleReader;
-import com.linkedin.helix.josql.JsqlQueryListProcessor;
-import com.linkedin.helix.josql.ZNRecordQueryProcessor;
-import com.linkedin.helix.model.HealthStat;
-import com.linkedin.helix.model.IdealState;
-
-public class TestZNRecordQueryProcessorWithZK extends ZkStandAloneCMTestBase
-{
- //@Test
- public void testClusterQuery() throws Exception
- {
- HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
-
- DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(manager.getHelixDataAccessor(), manager.getClusterName());
-
- ZNRecordQueryProcessor queryProcessor = new ZNRecordQueryProcessor();
-
- String partition = TEST_DB + "_4";
-
- String sql = "select T2.instance, T1.listIndex, T2.id from " +
- "explodeList(IDEALSTATES," + partition + ") as T1 join " +
- "LIVEINSTANCES as T2 using (T1.listVal, T2.id)";
- List<ZNRecord> result = queryProcessor.execute(sql, tupleReader);
- System.out.println(result);
- Assert.assertEquals(result.size(), 3);
- }
-
-
- //@Test
- public void testWildcardExpansion() throws Exception
- {
- HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
- List<String> instancesInCluster = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
- for(String instance : instancesInCluster)
- {
- ZNRecord record = new ZNRecord("scnTable");
- record.setSimpleField("k1", "v1");
- Builder kb = accessor.keyBuilder();
- accessor.setProperty(kb.healthReport(instance, "scnTable"), new HealthStat(record));
- }
-
- String path = "INSTANCES/*/HEALTHREPORT/scnTable";
- DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(accessor, manager.getClusterName());
- List<ZNRecord> tuples = tupleReader.get(path);
- System.out.println(tuples);
- Assert.assertEquals(tuples.size(), instancesInCluster.size());
- }
-
- @Test
- public void testNewMasterSelection() throws Exception
- {
- HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
- IdealState resourceIdealState = manager.getClusterManagmentTool().getResourceIdealState(manager.getClusterName(), TEST_DB);
- Map<String, Map<String, Integer>> scnMap = new HashMap<String, Map<String, Integer>>();
-
- List<String> instancesInCluster = manager.getClusterManagmentTool().getInstancesInCluster(manager.getClusterName());
- List<String> instances = new ArrayList<String>();
- instances.addAll(instancesInCluster);
- //instances.add(instancesInCluster.get(0));
- instances.add("deadInstance");
- System.out.println(instances.size());
-
- int seq = 50;
- for(String instance : instances)
- {
- ZNRecord scnRecord = new ZNRecord("scnTable");
- scnRecord.setSimpleField("instance", instance);
- for(int i = 0; i < _PARTITIONS; i++)
- {
- Map<String, String> scnDetails = new HashMap<String, String>();
-
- String partition = TEST_DB + "_" + i;
- List<String> idealStatePrefList =
- resourceIdealState.getPreferenceList(partition);
- String idealStateMaster = idealStatePrefList.get(0);
-
- scnDetails.put("gen", "4");
-
- if (instance.equals(idealStateMaster))
- {
- scnDetails.put("seq", "" + (seq - 25));
- }
- else
- {
- scnDetails.put("seq", "" + seq++);
- }
- scnRecord.setMapField(partition, scnDetails);
- }
-
- Builder kb = accessor.keyBuilder();
- accessor.setProperty(kb.healthReport(instance, "scnTable"), new HealthStat(scnRecord));
- }
-
- ZNRecordQueryProcessor processor = new ZNRecordQueryProcessor();
- DataAccessorBasedTupleReader tupleReader = new DataAccessorBasedTupleReader(accessor, manager.getClusterName());
-
- String scnTableQuery = "SELECT T1.instance as instance, T1.mapField as partition, T1.gen as gen, T1.seq as seq " +
- "FROM explodeMap(`INSTANCES/*/HEALTHREPORT/scnTable`) AS T1" +
- " JOIN LIVEINSTANCES as T2 using (T1.instance, T2.id)";
- List<ZNRecord> scnTable = processor.execute(scnTableQuery, tupleReader);
- tupleReader.setTempTable("scnTable", scnTable);
-
- String rankQuery = "SELECT instance, partition, gen, seq, T1.listIndex AS instanceRank " +
- " FROM scnTable JOIN explodeList(`IDEALSTATES/" + TEST_DB + "`) AS T1 " +
- "USING (scnTable.instance, T1.listVal) WHERE scnTable.partition=T1.listField";
- List<ZNRecord> rankTable = processor.execute(rankQuery, tupleReader);
- System.out.println(rankTable.size());
- tupleReader.setTempTable("rankTable", rankTable);
-
- String masterSelectionQuery = "SELECT instance, partition, instanceRank, gen, (T.maxSeq-seq) AS seqDiff, seq FROM rankTable JOIN " +
- " (SELECT partition, max(to_number(seq)) AS maxSeq FROM rankTable GROUP BY partition) AS T USING(rankTable.partition, T.partition) " +
- " WHERE to_number(seqDiff) < 10 " +
- " ORDER BY partition, to_number(gen) desc, to_number(instanceRank), to_number(seqDiff)";
-
- List<ZNRecord> masterSelectionTable = processor.execute(masterSelectionQuery, tupleReader);
- System.out.println(masterSelectionTable.size());
- for(ZNRecord record : masterSelectionTable)
- {
- System.out.println(record);
- }
-
- List<String> combinedQueryStringList = new ArrayList<String>();
- combinedQueryStringList.add(scnTableQuery + JsqlQueryListProcessor.SEPARATOR+"scnTable");
- combinedQueryStringList.add(rankQuery + JsqlQueryListProcessor.SEPARATOR+"rankTable");
- combinedQueryStringList.add(masterSelectionQuery);
- System.out.println();
- List<ZNRecord> masterSelectionTable2 = JsqlQueryListProcessor.executeQueryList(manager.getHelixDataAccessor(), manager.getClusterName(), combinedQueryStringList);
- for(ZNRecord record : masterSelectionTable2)
- {
- System.out.println(record);
- }
- Assert.assertEquals(masterSelectionTable2.size(), masterSelectionTable.size());
- for(int i = 0;i<masterSelectionTable2.size(); i++)
- {
- Assert.assertTrue(masterSelectionTable2.get(i).equals(masterSelectionTable.get(i)));
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java
deleted file mode 100644
index b13ad83..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/ZkIntegrationTestBase.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Map;
-import java.util.logging.Level;
-
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.manager.zk.ZKHelixDataAccessor;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkBaseDataAccessor;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.util.ZKClientPool;
-
-public class ZkIntegrationTestBase
-{
- private static Logger LOG =
- Logger.getLogger(ZkIntegrationTestBase.class);
-
- protected static ZkServer _zkServer;
- protected static ZkClient _gZkClient;
- protected static ClusterSetup _gSetupTool;
-
- public static final String ZK_ADDR = "localhost:2183";
- protected static final String CLUSTER_PREFIX = "CLUSTER";
- protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
-
- protected final String CONTROLLER_PREFIX = "controller";
- protected final String PARTICIPANT_PREFIX = "localhost";
-
- @BeforeSuite
- public void beforeSuite() throws Exception
- {
- // TODO: use logging.properties file to config java.util.logging.Logger levels
- java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
- topJavaLogger.setLevel(Level.WARNING);
-
- _zkServer = TestHelper.startZkSever(ZK_ADDR);
- AssertJUnit.assertTrue(_zkServer != null);
- ZKClientPool.reset();
-
- _gZkClient = new ZkClient(ZK_ADDR);
- _gZkClient.setZkSerializer(new ZNRecordSerializer());
- _gSetupTool = new ClusterSetup(ZK_ADDR);
- }
-
- @AfterSuite
- public void afterSuite()
- {
- ZKClientPool.reset();
- TestHelper.stopZkServer(_zkServer);
- _gZkClient.close();
- }
-
- protected String getShortClassName()
- {
- String className = this.getClass().getName();
- return className.substring(className.lastIndexOf('.') + 1);
- }
-
- protected String getCurrentLeader(ZkClient zkClient, String clusterName)
- {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
- Builder keyBuilder = accessor.keyBuilder();
-
- LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
- if (leader == null)
- {
- return null;
- }
- return leader.getInstanceName();
- }
-
- /**
- * Stop current leader and returns the new leader
- *
- * @param zkClient
- * @param clusterName
- * @param startCMResultMap
- * @return
- */
- protected String stopCurrentLeader(ZkClient zkClient,
- String clusterName,
- Map<String, StartCMResult> startCMResultMap)
- {
- String leader = getCurrentLeader(zkClient, clusterName);
- Assert.assertTrue(leader != null);
- System.out.println("stop leader: " + leader + " in " + clusterName);
- Assert.assertTrue(leader != null);
-
- StartCMResult result = startCMResultMap.remove(leader);
- Assert.assertTrue(result._manager != null);
- result._manager.disconnect();
-
- Assert.assertTrue(result._thread != null);
- result._thread.interrupt();
-
- boolean isNewLeaderElected = false;
- String newLeader = null;
- try
- {
- for (int i = 0; i < 5; i++)
- {
- Thread.sleep(1000);
- newLeader = getCurrentLeader(zkClient, clusterName);
- if (!newLeader.equals(leader))
- {
- isNewLeaderElected = true;
- System.out.println("new leader elected: " + newLeader + " in " + clusterName);
- break;
- }
- }
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- if (isNewLeaderElected == false)
- {
- System.out.println("fail to elect a new leader in " + clusterName);
- }
- AssertJUnit.assertTrue(isNewLeaderElected);
- return newLeader;
- }
-
- protected void enableHealthCheck(String clusterName)
- {
- ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
- new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java
deleted file mode 100644
index 488f205..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBase.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-import com.linkedin.helix.TestHelper;
-import com.linkedin.helix.TestHelper.StartCMResult;
-import com.linkedin.helix.controller.HelixControllerMain;
-import com.linkedin.helix.manager.zk.ZNRecordSerializer;
-import com.linkedin.helix.manager.zk.ZkClient;
-import com.linkedin.helix.tools.ClusterSetup;
-import com.linkedin.helix.tools.ClusterStateVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import com.linkedin.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-
-/**
- *
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
-{
- private static Logger LOG =
- Logger.getLogger(ZkStandAloneCMTestBase.class);
-
- protected static final int NODE_NR = 5;
- protected static final int START_PORT = 12918;
- protected static final String STATE_MODEL = "MasterSlave";
- protected static final String TEST_DB = "TestDB";
- protected static final int _PARTITIONS = 20;
-
- protected ClusterSetup _setupTool = null;
- protected final String CLASS_NAME = getShortClassName();
- protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_"
- + CLASS_NAME;
-
- protected Map<String, StartCMResult> _startCMResultMap =
- new HashMap<String, StartCMResult>();
- protected ZkClient _zkClient;
-
- int _replica = 3;
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
-// Logger.getRootLogger().setLevel(Level.INFO);
- System.out.println("START " + CLASS_NAME + " at "
- + new Date(System.currentTimeMillis()));
-
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- String namespace = "/" + CLUSTER_NAME;
- if (_zkClient.exists(namespace))
- {
- _zkClient.deleteRecursive(namespace);
- }
- _setupTool = new ClusterSetup(ZK_ADDR);
-
- // setup storage cluster
- _setupTool.addCluster(CLUSTER_NAME, true);
- _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
- for (int i = 0; i < NODE_NR; i++)
- {
- String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
- _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
- }
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
-
- // start dummy participants
- for (int i = 0; i < NODE_NR; i++)
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null)
- {
- LOG.error("fail to start particpant:" + instanceName
- + "(participant with same name already exists)");
- }
- else
- {
- StartCMResult result =
- TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
- _startCMResultMap.put(instanceName, result);
- }
- }
-
- // start controller
- String controllerName = CONTROLLER_PREFIX + "_0";
- StartCMResult startResult =
- TestHelper.startController(CLUSTER_NAME,
- controllerName,
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
-
- boolean result =
- ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
- CLUSTER_NAME));
-
- result =
- ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
- Assert.assertTrue(result);
- }
-
- @AfterClass
- public void afterClass() throws Exception
- {
- /**
- * shutdown order: 1) disconnect the controller 2) disconnect participants
- */
-
- StartCMResult result;
- Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext())
- {
- String instanceName = it.next().getKey();
- if (instanceName.startsWith(CONTROLLER_PREFIX))
- {
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
- }
-
- Thread.sleep(100);
- it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext())
- {
- String instanceName = it.next().getKey();
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
-
- _zkClient.close();
- // logger.info("END at " + new Date(System.currentTimeMillis()));
- System.out.println("END " + CLASS_NAME + " at "
- + new Date(System.currentTimeMillis()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
deleted file mode 100644
index 0b4b0fe..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.integration;
-
-import java.util.List;
-
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.controller.restlet.ZKPropertyTransferServer;
-import com.linkedin.helix.controller.restlet.ZkPropertyTransferClient;
-import com.linkedin.helix.model.StatusUpdate;
-/**
- *
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase
-{
- @BeforeClass
- public void beforeClass() throws Exception
- {
- ZKPropertyTransferServer.PERIOD = 500;
- ZkPropertyTransferClient.SEND_PERIOD = 500;
- ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
- super.beforeClass();
-
- Thread.sleep(1000);
- for (int i = 0; i < NODE_NR; i++)
- {
- String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- if (_startCMResultMap.get(instanceName) != null)
- {
- HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
- Builder kb = accessor.keyBuilder();
- List<StatusUpdate> statusUpdates = accessor.getChildValues(
- kb.stateTransitionStatus(instanceName, _startCMResultMap.get(instanceName)._manager.getSessionId(),
- TEST_DB));
- Assert.assertTrue(statusUpdates.size() > 0);
- for(StatusUpdate update : statusUpdates)
- {
- Assert.assertTrue(update.getRecord().getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
- Assert.assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
- }
- }
- }
- }
-
- @AfterClass
- public void afterClass() throws Exception
- {
- super.afterClass();
- ZKPropertyTransferServer.getInstance().shutdown();
- ZKPropertyTransferServer.getInstance().reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java b/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java
deleted file mode 100644
index 1dacc68..0000000
--- a/helix-core/src/test/java/com/linkedin/helix/josql/TestClusterJosqlQueryProcessor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.josql;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-import org.testng.annotations.Test;
-
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.josql.ZNRecordJosqlFunctionHandler;
-import com.linkedin.helix.josql.ZNRecordRow;
-import com.linkedin.helix.model.LiveInstance.LiveInstanceProperty;
-import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
-
-public class TestClusterJosqlQueryProcessor
-{
- @Test (groups = {"unitTest"})
- public void queryClusterDataSample()
- {
- List<ZNRecord> liveInstances = new ArrayList<ZNRecord>();
- Map<String, ZNRecord> liveInstanceMap = new HashMap<String, ZNRecord>();
- List<String> instances = new ArrayList<String>();
- for(int i = 0;i<5; i++)
- {
- String instance = "localhost_"+(12918+i);
- instances.add(instance);
- ZNRecord metaData = new ZNRecord(instance);
- metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
- UUID.randomUUID().toString());
- metaData.setSimpleField("SCN", "" + (10-i));
- liveInstances.add(metaData);
- liveInstanceMap.put(instance, metaData);
- }
-
- //liveInstances.remove(0);
- ZNRecord externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
- instances, 21, 3, "TestDB", "MASTER", "SLAVE");
-
-
- Criteria criteria = new Criteria();
- criteria.setInstanceName("%");
- criteria.setResource("TestDB");
- criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- criteria.setPartition("TestDB_2%");
- criteria.setPartitionState("SLAVE");
-
- String josql =
- " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN') AS 'SCN'" +
- " FROM com.linkedin.helix.josql.ZNRecordRow " +
- " WHERE mapKey LIKE 'TestDB_2%' " +
- " AND mapSubKey LIKE '%' " +
- " AND mapValue LIKE 'SLAVE' " +
- " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) " +
- " ORDER BY parseInt(getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN'))";
-
- Query josqlQuery = new Query();
- josqlQuery.setVariable("LIVEINSTANCES", liveInstances);
- josqlQuery.setVariable("LIVEINSTANCESMAP", liveInstanceMap);
- josqlQuery.addFunctionHandler(new ZNRecordRow());
- josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
- josqlQuery.addFunctionHandler(new Integer(0));
- try
- {
- josqlQuery.parse(josql);
- QueryResults qr = josqlQuery.execute(ZNRecordRow.convertMapFields(externalView));
- @SuppressWarnings({ "unchecked", "unused" })
- List<Object> result = qr.getResults();
-
- }
- catch (QueryParseException e)
- {
- e.printStackTrace();
- } catch (QueryExecutionException e)
- {
- e.printStackTrace();
- }
-
- }
-}