You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC
[35/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
new file mode 100644
index 0000000..3bfa1c0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -0,0 +1,425 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.ZKPathDataDumpTask;
+import org.apache.helix.util.HelixUtil;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+ TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
+ public static class TestMessagingHandlerFactory implements
+ MessageHandlerFactory
+ {
+ public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ return new TestMessagingHandler(message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return "TestParticipant";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public class TestMessagingHandler extends MessageHandler
+ {
+ public TestMessagingHandler(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ String destName = _message.getTgtName();
+ synchronized (_results)
+ {
+ if (!_results.containsKey(_message.getPartitionName()))
+ {
+ _results.put(_message.getPartitionName(),
+ new ConcurrentSkipListSet<String>());
+ }
+ }
+ _results.get(_message.getPartitionName()).add(destName);
+
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ // TODO Auto-generated method stub
+
+ }
+ }
+ }
+
+ @Test()
+ public void TestSchedulerMsg() throws Exception
+ {
+ _factory._results.clear();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor.createProperty(
+ keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+ schedulerMessage);
+
+ Thread.sleep(15000);
+
+ Assert.assertEquals(_PARTITIONS, _factory._results.size());
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+ int messageResultCount = 0;
+ for(String key : statusUpdate.getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult "))
+ {
+ messageResultCount ++;
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+ int count = 0;
+ for (Set<String> val : _factory._results.values())
+ {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+ // test the ZkPathDataDumpTask
+ String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(),
+ PropertyType.STATUSUPDATES_CONTROLLER);
+ List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
+ Assert.assertTrue(subPaths.size() > 0);
+ for(String subPath : subPaths)
+ {
+ String nextPath = controllerStatusPath + "/" + subPath;
+ List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ Assert.assertTrue(subsubPaths.size() > 0);
+ }
+
+ String instanceStatusPath = HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
+ PropertyType.STATUSUPDATES);
+
+ subPaths = _zkClient.getChildren(instanceStatusPath);
+ Assert.assertTrue(subPaths.size() > 0);
+ for(String subPath : subPaths)
+ {
+ String nextPath = instanceStatusPath + "/" + subPath;
+ List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ Assert.assertTrue(subsubPaths.size() > 0);
+ for(String subsubPath : subsubPaths)
+ {
+ String nextnextPath = nextPath + "/" + subsubPath;
+ Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
+ }
+ }
+
+ ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
+ dumpTask.run();
+
+ subPaths = _zkClient.getChildren(controllerStatusPath);
+ Assert.assertTrue(subPaths.size() > 0);
+ for(String subPath : subPaths)
+ {
+ String nextPath = controllerStatusPath + "/" + subPath;
+ List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ Assert.assertTrue(subsubPaths.size() == 0);
+ }
+
+ subPaths = _zkClient.getChildren(instanceStatusPath);
+ Assert.assertTrue(subPaths.size() > 0);
+ for(String subPath : subPaths)
+ {
+ String nextPath = instanceStatusPath + "/" + subPath;
+ List<String> subsubPaths = _zkClient.getChildren(nextPath);
+ Assert.assertTrue(subsubPaths.size() > 0);
+ for(String subsubPath : subsubPaths)
+ {
+ String nextnextPath = nextPath + "/" + subsubPath;
+ Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
+ }
+ }
+ }
+
+
+ @Test()
+ public void TestSchedulerMsg2() throws Exception
+ {
+ _factory._results.clear();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+ schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+
+ Criteria cr2 = new Criteria();
+ cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr2.setInstanceName("*");
+ cr2.setSessionSpecific(false);
+
+ class MockAsyncCallback extends AsyncCallback
+ {
+ Message _message;
+ public MockAsyncCallback()
+ {
+ }
+
+ @Override
+ public void onTimeOut()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ _message = message;
+ }
+
+ }
+ MockAsyncCallback callback = new MockAsyncCallback();
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ Assert.assertEquals(_PARTITIONS, _factory._results.size());
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+ int messageResultCount = 0;
+ for(String key : statusUpdate.getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult "))
+ {
+ messageResultCount ++;
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+ int count = 0;
+ for (Set<String> val : _factory._results.values())
+ {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+ }
+
+ @Test()
+ public void TestSchedulerZeroMsg() throws Exception
+ {
+ TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_DOESNOTEXIST");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ PropertyKey controllerMessageKey = keyBuilder
+ .controllerMessage(schedulerMessage.getMsgId());
+ helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
+
+ Thread.sleep(3000);
+
+ Assert.assertEquals(0, factory._results.size());
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("0"));
+ int count = 0;
+ for (Set<String> val : factory._results.values())
+ {
+ count += val.size();
+ }
+ Assert.assertEquals(count, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
new file mode 100644
index 0000000..8cc2380
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
@@ -0,0 +1,66 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSchemataSM extends ZkIntegrationTestBase
+{
+ @Test
+ public void testSchemataSM() throws Exception
+ {
+ String testName = "TestSchemataSM";
+ String clusterName = testName;
+
+ MockParticipant[] participants = new MockParticipant[5];
+// Logger.getRootLogger().setLevel(Level.INFO);
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
+ // port
+ "localhost", // participant name prefix
+ "TestSchemata", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 1, // replicas
+ "STORAGE_DEFAULT_SM_SCHEMATA",
+ true); // do rebalance
+
+ TestHelper.startController(clusterName,
+ "controller_0",
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] =
+ new MockParticipant(clusterName,
+ instanceName,
+ ZK_ADDR,
+ null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
new file mode 100644
index 0000000..12e500a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -0,0 +1,115 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkTestHelper.TestZkHelixManager;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+
+public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
+{
+
+ public class SessionExpiryTransition extends MockTransition
+ {
+ private final AtomicBoolean _done = new AtomicBoolean();
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ {
+ TestZkHelixManager manager = (TestZkHelixManager)context.getManager();
+
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+ if (instance.equals("localhost_12918")
+ && partition.equals("TestDB0_1") // TestDB0_1 is SLAVE on localhost_12918
+ && _done.getAndSet(true) == false)
+ {
+ try
+ {
+ ZkTestHelper.expireSession(manager.getZkClient());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
+ // when there is pending messages
+ // @Test
+ public void testSessionExpiryInTransition() throws Exception
+ {
+ Logger.getRootLogger().setLevel(Level.WARN);
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ final String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipant[] participants = new MockParticipant[5];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ TestZkHelixManager manager =
+ new TestZkHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ Thread.sleep(2000);
+ controller.syncStop();
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
new file mode 100644
index 0000000..21bde31
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestStandAloneCMMain extends ZkStandAloneCMTestBase
+{
+ private static Logger logger = Logger.getLogger(TestStandAloneCMMain.class);
+
+ @Test()
+ public void testStandAloneCMMain() throws Exception
+ {
+ logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
+
+ for (int i = 1; i <= 2; i++)
+ {
+ String controllerName = "controller_" + i;
+ StartCMResult startResult =
+ TestHelper.startController(CLUSTER_NAME,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ _startCMResultMap.put(controllerName, startResult);
+ }
+
+ stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ logger.info("STOP testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
new file mode 100644
index 0000000..b26a346
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -0,0 +1,136 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkTestHelper.TestZkHelixManager;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestStandAloneCMSessionExpiry.class);
+
+ @Test()
+ public void testStandAloneCMSessionExpiry() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.DEBUG);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName,
+ ZK_ADDR,
+ 12918,
+ PARTICIPANT_PREFIX,
+ "TestDB",
+ 1,
+ 20,
+ 5,
+ 3,
+ "MasterSlave",
+ true);
+
+ MockParticipant[] participants = new MockParticipant[5];
+ for (int i = 0; i < 5; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ TestZkHelixManager manager =
+ new TestZkHelixManager(clusterName,
+ instanceName,
+ InstanceType.PARTICIPANT,
+ ZK_ADDR);
+ participants[i] = new MockParticipant(manager, null);
+ participants[i].syncStart();
+ }
+
+ TestZkHelixManager controller =
+ new TestZkHelixManager(clusterName,
+ "controller_0",
+ InstanceType.CONTROLLER,
+ ZK_ADDR);
+ controller.connect();
+
+ boolean result;
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // participant session expiry
+ TestZkHelixManager participantToExpire = (TestZkHelixManager)participants[1].getManager();
+
+ System.out.println("Expire participant session");
+ String oldSessionId = participantToExpire.getSessionId();
+
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ String newSessionId = participantToExpire.getSessionId();
+ System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+ Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
+
+ ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+ setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
+ setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // controller session expiry
+ System.out.println("Expire controller session");
+ oldSessionId = controller.getSessionId();
+ ZkTestHelper.expireSession(controller.getZkClient());
+ newSessionId = controller.getSessionId();
+ System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+ Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
+
+ setupTool.addResourceToCluster(clusterName, "TestDB2", 8, "MasterSlave");
+ setupTool.rebalanceStorageCluster(clusterName, "TestDB2", 3);
+
+ result =
+ ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // clean up
+ System.out.println("Clean up ...");
+ // Logger.getRootLogger().setLevel(Level.DEBUG);
+ controller.disconnect();
+ Thread.sleep(100);
+ for (int i = 0; i < 5; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
new file mode 100644
index 0000000..83df0e4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -0,0 +1,210 @@
+package org.apache.helix.integration;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.mock.storage.MockJobIntf;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.mock.storage.MockParticipant.MockMSStateModel;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase
+{
+ private static Logger LOG =
+ Logger.getLogger(TestStateTransitionTimeout.class);
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ System.out.println("START " + CLASS_NAME + " at "
+ + new Date(System.currentTimeMillis()));
+
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ String namespace = "/" + CLUSTER_NAME;
+ if (_zkClient.exists(namespace))
+ {
+ _zkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(ZK_ADDR);
+
+ // setup storage cluster
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+
+ // Set the timeout values
+ IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + Message.Attributes.TIMEOUT;
+ idealState.getRecord().setSimpleField(stateTransition, "300");
+
+ String command = "-zkSvr " + ZK_ADDR + " -addResourceProperty "+ CLUSTER_NAME + " " + TEST_DB + " " + stateTransition + " 200";
+ ClusterSetup.processCommandLineArgs(command.split(" "));
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+ public static class TimeOutStateModel extends MockParticipant.MockMSStateModel
+ {
+ boolean _sleep = false;
+ StateTransitionError _error;
+ int _errorCallcount = 0;
+ public TimeOutStateModel(MockTransition transition, boolean sleep)
+ {
+ super(transition);
+ _sleep = sleep;
+ }
+
+ @Transition(to="SLAVE",from="OFFLINE")
+ public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
+ {
+ LOG.info("Become SLAVE from OFFLINE");
+
+ }
+
+ @Transition(to="MASTER",from="SLAVE")
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become MASTER from SLAVE");
+ if (_transition != null && _sleep)
+ {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Transition(to="SLAVE",from="MASTER")
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
+ {
+ LOG.info("Become SLAVE from MASTER");
+ }
+
+ @Transition(to="OFFLINE",from="SLAVE")
+ public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
+ {
+ LOG.info("Become OFFLINE from SLAVE");
+
+ }
+
+ @Transition(to="DROPPED",from="OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+ {
+ LOG.info("Become DROPPED from OFFLINE");
+
+ }
+
+ public void rollbackOnError(Message message, NotificationContext context,
+ StateTransitionError error)
+ {
+ _error = error;
+ _errorCallcount ++;
+ }
+ }
+
+ public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel>
+ {
+ Set<String> partitionsToSleep = new HashSet<String>();
+ int _sleepTime;
+
+ public SleepStateModelFactory(int sleepTime)
+ {
+ _sleepTime = sleepTime;
+ }
+
+ public void setPartitions(Collection<String> partitions)
+ {
+ partitionsToSleep.addAll(partitions);
+ }
+
+ public void addPartition(String partition)
+ {
+ partitionsToSleep.add(partition);
+ }
+
+ @Override
+ public TimeOutStateModel createNewStateModel(String stateUnitKey)
+ {
+ return new TimeOutStateModel(new MockParticipant.SleepTransition(_sleepTime), partitionsToSleep.contains(stateUnitKey));
+ }
+ }
+
+ @Test
+ public void testStateTransitionTimeOut() throws Exception
+ {
+ Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
+ MockParticipant[] participants = new MockParticipant[NODE_NR];
+ IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ SleepStateModelFactory factory = new SleepStateModelFactory(1000);
+ factories.put(instanceName, factory);
+ for(String p : idealState.getPartitionSet())
+ {
+ if(idealState.getPreferenceList(p).get(0).equals(instanceName))
+ {
+ factory.addPartition(p);
+ }
+ }
+
+ participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
+ participants[i].syncStart();
+ }
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ StartCMResult startResult =
+ TestHelper.startController(CLUSTER_NAME,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ _startCMResultMap.put(controllerName, startResult);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
+
+ Builder kb = accessor.keyBuilder();
+ ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+ for(String p : idealState.getPartitionSet())
+ {
+ String idealMaster = idealState.getPreferenceList(p).get(0);
+ Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+
+ TimeOutStateModel model = factories.get(idealMaster).getStateModel(p);
+ Assert.assertEquals(model._errorCallcount , 1);
+ Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
new file mode 100644
index 0000000..8c0953f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.testng.Assert;
+
+
+public class TestStatusUpdate extends ZkStandAloneCMTestBase
+{
+ // For now write participant StatusUpdates to log4j.
+ // TODO: Need to investigate another data channel to report to controller and re-enable
+ // this test
+ // @Test
+ public void testParticipantStatusUpdates() throws Exception
+ {
+ ZkClient zkClient = new ZkClient(ZkIntegrationTestBase.ZK_ADDR);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
+ Assert.assertNotNull(extViews);
+
+ for (ExternalView extView : extViews)
+ {
+ String resourceName = extView.getResourceName();
+ Set<String> partitionSet = extView.getPartitionSet();
+ for (String partition : partitionSet)
+ {
+ Map<String, String> stateMap = extView.getStateMap(partition);
+ for (String instance : stateMap.keySet())
+ {
+ String state = stateMap.get(instance);
+ StatusUpdateUtil.StatusUpdateContents statusUpdates =
+ StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor,
+ instance,
+ resourceName,
+ partition);
+
+ Map<String, StatusUpdateUtil.TaskStatus> taskMessages =
+ statusUpdates.getTaskMessages();
+ List<StatusUpdateUtil.Transition> transitions = statusUpdates.getTransitions();
+ if (state.equals("MASTER"))
+ {
+ Assert.assertEquals(transitions.size() >= 2,
+ true,
+ "Invalid number of transitions");
+ StatusUpdateUtil.Transition lastTransition =
+ transitions.get(transitions.size() - 1);
+ StatusUpdateUtil.Transition prevTransition =
+ transitions.get(transitions.size() - 2);
+ Assert.assertEquals(taskMessages.get(lastTransition.getMsgID()),
+ StatusUpdateUtil.TaskStatus.COMPLETED,
+ "Incomplete transition");
+ Assert.assertEquals(taskMessages.get(prevTransition.getMsgID()),
+ StatusUpdateUtil.TaskStatus.COMPLETED,
+ "Incomplete transition");
+ Assert.assertEquals(lastTransition.getFromState(), "SLAVE", "Invalid State");
+ Assert.assertEquals(lastTransition.getToState(), "MASTER", "Invalid State");
+ }
+ else if (state.equals("SLAVE"))
+ {
+ Assert.assertEquals(transitions.size() >= 1,
+ true,
+ "Invalid number of transitions");
+ StatusUpdateUtil.Transition lastTransition =
+ transitions.get(transitions.size() - 1);
+ Assert.assertEquals(lastTransition.getFromState().equals("MASTER")
+ || lastTransition.getFromState().equals("OFFLINE"),
+ true,
+ "Invalid transition");
+ Assert.assertEquals(lastTransition.getToState(), "SLAVE", "Invalid State");
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
new file mode 100644
index 0000000..45b0fc2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -0,0 +1,128 @@
+package org.apache.helix.integration;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSwapInstance extends ZkStandAloneCMTestBase
+{
+ @Test
+ public void TestSwap() throws Exception
+ {
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+ HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
+ _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
+
+
+ ZNRecord idealStateOld1 = new ZNRecord("TestDB");
+ ZNRecord idealStateOld2 = new ZNRecord("MyDB");
+
+ IdealState is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
+ idealStateOld1.merge(is1.getRecord());
+
+
+ IdealState is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
+ idealStateOld2.merge(is2.getRecord());
+
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+ ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2);
+
+ boolean exception = false;
+ try
+ {
+ _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
+ }
+ catch(Exception e)
+ {
+ exception = true;
+ }
+ Assert.assertTrue(exception);
+
+ _startCMResultMap.get(instanceName)._manager.disconnect();
+ _startCMResultMap.get(instanceName)._thread.interrupt();
+ Thread.sleep(1000);
+
+ exception = false;
+ try
+ {
+ _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ exception = true;
+ }
+ Assert.assertFalse(exception);
+ StartCMResult result2 =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
+ _startCMResultMap.put(instanceName2, result2);
+
+ result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
+
+ is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
+
+ for(String key : idealStateOld1.getMapFields().keySet())
+ {
+ for(String host : idealStateOld1.getMapField(key).keySet())
+ {
+ if(host.equals(instanceName))
+ {
+ Assert.assertTrue(
+ idealStateOld1.getMapField(key).get(instanceName).equals(
+ is1.getRecord().getMapField(key).get(instanceName2)));
+ }
+ else
+ {
+ Assert.assertTrue(
+ idealStateOld1.getMapField(key).get(host).equals(
+ is1.getRecord().getMapField(key).get(host)));
+ }
+ }
+ }
+
+ for(String key : idealStateOld1.getListFields().keySet())
+ {
+ Assert.assertEquals(idealStateOld1.getListField(key).size() , is1.getRecord().getListField(key).size());
+ for(int i = 0; i < idealStateOld1.getListField(key).size(); i++)
+ {
+ String host = idealStateOld1.getListField(key).get(i);
+ String newHost = is1.getRecord().getListField(key).get(i);
+ if(host.equals(instanceName))
+ {
+ Assert.assertTrue(
+ newHost.equals(instanceName2));
+ }
+ else
+ {
+ //System.out.println(key + " " + i+ " " + host + " "+newHost);
+ //System.out.println(idealStateOld1.getListField(key));
+ //System.out.println(is1.getRecord().getListField(key));
+
+ Assert.assertTrue(host.equals(newHost));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
new file mode 100644
index 0000000..c985f2a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Map;
+import java.util.logging.Level;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+public class ZkIntegrationTestBase
+{
+ private static Logger LOG =
+ Logger.getLogger(ZkIntegrationTestBase.class);
+
+ protected static ZkServer _zkServer;
+ protected static ZkClient _gZkClient;
+ protected static ClusterSetup _gSetupTool;
+
+ public static final String ZK_ADDR = "localhost:2183";
+ protected static final String CLUSTER_PREFIX = "CLUSTER";
+ protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
+
+ protected final String CONTROLLER_PREFIX = "controller";
+ protected final String PARTICIPANT_PREFIX = "localhost";
+
+ @BeforeSuite
+ public void beforeSuite() throws Exception
+ {
+ // TODO: use logging.properties file to config java.util.logging.Logger levels
+ java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
+ topJavaLogger.setLevel(Level.WARNING);
+
+ _zkServer = TestHelper.startZkSever(ZK_ADDR);
+ AssertJUnit.assertTrue(_zkServer != null);
+ ZKClientPool.reset();
+
+ _gZkClient = new ZkClient(ZK_ADDR);
+ _gZkClient.setZkSerializer(new ZNRecordSerializer());
+ _gSetupTool = new ClusterSetup(ZK_ADDR);
+ }
+
+ @AfterSuite
+ public void afterSuite()
+ {
+ ZKClientPool.reset();
+ TestHelper.stopZkServer(_zkServer);
+ _gZkClient.close();
+ }
+
+ protected String getShortClassName()
+ {
+ String className = this.getClass().getName();
+ return className.substring(className.lastIndexOf('.') + 1);
+ }
+
+ protected String getCurrentLeader(ZkClient zkClient, String clusterName)
+ {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null)
+ {
+ return null;
+ }
+ return leader.getInstanceName();
+ }
+
+ /**
+ * Stop current leader and returns the new leader
+ *
+ * @param zkClient
+ * @param clusterName
+ * @param startCMResultMap
+ * @return
+ */
+ protected String stopCurrentLeader(ZkClient zkClient,
+ String clusterName,
+ Map<String, StartCMResult> startCMResultMap)
+ {
+ String leader = getCurrentLeader(zkClient, clusterName);
+ Assert.assertTrue(leader != null);
+ System.out.println("stop leader: " + leader + " in " + clusterName);
+ Assert.assertTrue(leader != null);
+
+ StartCMResult result = startCMResultMap.remove(leader);
+ Assert.assertTrue(result._manager != null);
+ result._manager.disconnect();
+
+ Assert.assertTrue(result._thread != null);
+ result._thread.interrupt();
+
+ boolean isNewLeaderElected = false;
+ String newLeader = null;
+ try
+ {
+ for (int i = 0; i < 5; i++)
+ {
+ Thread.sleep(1000);
+ newLeader = getCurrentLeader(zkClient, clusterName);
+ if (!newLeader.equals(leader))
+ {
+ isNewLeaderElected = true;
+ System.out.println("new leader elected: " + newLeader + " in " + clusterName);
+ break;
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ if (isNewLeaderElected == false)
+ {
+ System.out.println("fail to elect a new leader in " + clusterName);
+ }
+ AssertJUnit.assertTrue(isNewLeaderElected);
+ return newLeader;
+ }
+
+ protected void enableHealthCheck(String clusterName)
+ {
+ ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
+ new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
new file mode 100644
index 0000000..a203539
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+/**
+ *
+ * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
+ * start 5 dummy participants verify the current states at end
+ */
+
+public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
+{
+ private static Logger LOG =
+ Logger.getLogger(ZkStandAloneCMTestBase.class);
+
+ protected static final int NODE_NR = 5;
+ protected static final int START_PORT = 12918;
+ protected static final String STATE_MODEL = "MasterSlave";
+ protected static final String TEST_DB = "TestDB";
+ protected static final int _PARTITIONS = 20;
+
+ protected ClusterSetup _setupTool = null;
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_"
+ + CLASS_NAME;
+
+ protected Map<String, StartCMResult> _startCMResultMap =
+ new HashMap<String, StartCMResult>();
+ protected ZkClient _zkClient;
+
+ int _replica = 3;
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+// Logger.getRootLogger().setLevel(Level.INFO);
+ System.out.println("START " + CLASS_NAME + " at "
+ + new Date(System.currentTimeMillis()));
+
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ String namespace = "/" + CLUSTER_NAME;
+ if (_zkClient.exists(namespace))
+ {
+ _zkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(ZK_ADDR);
+
+ // setup storage cluster
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+ // start dummy participants
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ if (_startCMResultMap.get(instanceName) != null)
+ {
+ LOG.error("fail to start particpant:" + instanceName
+ + "(participant with same name already exists)");
+ }
+ else
+ {
+ StartCMResult result =
+ TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _startCMResultMap.put(instanceName, result);
+ }
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ StartCMResult startResult =
+ TestHelper.startController(CLUSTER_NAME,
+ controllerName,
+ ZK_ADDR,
+ HelixControllerMain.STANDALONE);
+ _startCMResultMap.put(controllerName, startResult);
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception
+ {
+ /**
+ * shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+
+ StartCMResult result;
+ Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
+ while (it.hasNext())
+ {
+ String instanceName = it.next().getKey();
+ if (instanceName.startsWith(CONTROLLER_PREFIX))
+ {
+ result = _startCMResultMap.get(instanceName);
+ result._manager.disconnect();
+ result._thread.interrupt();
+ it.remove();
+ }
+ }
+
+ Thread.sleep(100);
+ it = _startCMResultMap.entrySet().iterator();
+ while (it.hasNext())
+ {
+ String instanceName = it.next().getKey();
+ result = _startCMResultMap.get(instanceName);
+ result._manager.disconnect();
+ result._thread.interrupt();
+ it.remove();
+ }
+
+ _zkClient.close();
+ // logger.info("END at " + new Date(System.currentTimeMillis()));
+ System.out.println("END " + CLASS_NAME + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
new file mode 100644
index 0000000..3e27bc4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.model.StatusUpdate;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+/**
+ *
+ * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
+ * start 5 dummy participants verify the current states at end
+ */
+
+public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase
+{
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ ZKPropertyTransferServer.PERIOD = 500;
+ ZkPropertyTransferClient.SEND_PERIOD = 500;
+ ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
+ super.beforeClass();
+
+ Thread.sleep(1000);
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ if (_startCMResultMap.get(instanceName) != null)
+ {
+ HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+ Builder kb = accessor.keyBuilder();
+ List<StatusUpdate> statusUpdates = accessor.getChildValues(
+ kb.stateTransitionStatus(instanceName, _startCMResultMap.get(instanceName)._manager.getSessionId(),
+ TEST_DB));
+ Assert.assertTrue(statusUpdates.size() > 0);
+ for(StatusUpdate update : statusUpdates)
+ {
+ Assert.assertTrue(update.getRecord().getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
+ Assert.assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception
+ {
+ super.afterClass();
+ ZKPropertyTransferServer.getInstance().shutdown();
+ ZKPropertyTransferServer.getInstance().reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
new file mode 100644
index 0000000..e4c7659
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.josql.ZNRecordJosqlFunctionHandler;
+import org.apache.helix.josql.ZNRecordRow;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.josql.Query;
+import org.josql.QueryExecutionException;
+import org.josql.QueryParseException;
+import org.josql.QueryResults;
+import org.testng.annotations.Test;
+
+
+public class TestClusterJosqlQueryProcessor
+{
+ @Test (groups = {"unitTest"})
+ public void queryClusterDataSample()
+ {
+ List<ZNRecord> liveInstances = new ArrayList<ZNRecord>();
+ Map<String, ZNRecord> liveInstanceMap = new HashMap<String, ZNRecord>();
+ List<String> instances = new ArrayList<String>();
+ for(int i = 0;i<5; i++)
+ {
+ String instance = "localhost_"+(12918+i);
+ instances.add(instance);
+ ZNRecord metaData = new ZNRecord(instance);
+ metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
+ UUID.randomUUID().toString());
+ metaData.setSimpleField("SCN", "" + (10-i));
+ liveInstances.add(metaData);
+ liveInstanceMap.put(instance, metaData);
+ }
+
+ //liveInstances.remove(0);
+ ZNRecord externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
+ instances, 21, 3, "TestDB", "MASTER", "SLAVE");
+
+
+ Criteria criteria = new Criteria();
+ criteria.setInstanceName("%");
+ criteria.setResource("TestDB");
+ criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ criteria.setPartition("TestDB_2%");
+ criteria.setPartitionState("SLAVE");
+
+ String josql =
+ " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN') AS 'SCN'" +
+ " FROM org.apache.helix.josql.ZNRecordRow " +
+ " WHERE mapKey LIKE 'TestDB_2%' " +
+ " AND mapSubKey LIKE '%' " +
+ " AND mapValue LIKE 'SLAVE' " +
+ " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) " +
+ " ORDER BY parseInt(getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN'))";
+
+ Query josqlQuery = new Query();
+ josqlQuery.setVariable("LIVEINSTANCES", liveInstances);
+ josqlQuery.setVariable("LIVEINSTANCESMAP", liveInstanceMap);
+ josqlQuery.addFunctionHandler(new ZNRecordRow());
+ josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
+ josqlQuery.addFunctionHandler(new Integer(0));
+ try
+ {
+ josqlQuery.parse(josql);
+ QueryResults qr = josqlQuery.execute(ZNRecordRow.convertMapFields(externalView));
+ @SuppressWarnings({ "unchecked", "unused" })
+ List<Object> result = qr.getResults();
+
+ }
+ catch (QueryParseException e)
+ {
+ e.printStackTrace();
+ } catch (QueryExecutionException e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
new file mode 100644
index 0000000..082d52e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
@@ -0,0 +1,228 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.josql.ClusterJosqlQueryProcessor;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestJosqlProcessor extends ZkStandAloneCMTestBase
+{
+ @Test (groups = {"integrationTest"})
+ public void testJosqlQuery() throws Exception
+ {
+ HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+
+ // Find the instance name that contains partition TestDB_2 and state is 'MASTER'
+ String SQL = "SELECT id " +
+ "FROM LIVEINSTANCES " +
+ "WHERE getMapFieldValue( getZNRecordFromMap(:IDEALSTATES , 'TestDB'), :partitionName, :_currObj.id)='MASTER'";
+ Map<String, Object> bindVariables = new HashMap<String, Object>();
+ bindVariables.put("partitionName", "TestDB_2");
+
+ ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
+ List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
+
+ Assert.assertEquals(result.size(), 1);
+ List<Object> firstList = (List<Object>) result.get(0);
+ Assert.assertTrue(((String)(firstList.get(0))).equalsIgnoreCase("localhost_12921"));
+
+ // Find the live instances names that hosts Partition TestDB_10 according to idealstate
+
+ SQL = "SELECT id " +
+ "FROM LIVEINSTANCES " +
+ "WHERE hasMapFieldKey( getZNRecordFromMap(:IDEALSTATES, 'TestDB'), :partitionName, :_currObj.id)='true'";
+ p = new ClusterJosqlQueryProcessor(manager);
+ bindVariables.put("partitionName", "TestDB_10");
+ result = p.runJoSqlQuery(SQL, bindVariables, null);
+
+ Assert.assertEquals(result.size(), 3);
+ Set<String> hosts = new HashSet<String>();
+ for(Object o : result)
+ {
+ String val = (String) ((List<Object>)o).get(0);
+ hosts.add(val);
+ }
+ Assert.assertTrue(hosts.contains("localhost_12918"));
+ Assert.assertTrue(hosts.contains("localhost_12920"));
+ Assert.assertTrue(hosts.contains("localhost_12921"));
+
+ // Find the partitions on host localhost_12919 and is on MASTER state
+ SQL = "SELECT id " +
+ "FROM PARTITIONS " +
+ "WHERE getMapFieldValue( getZNRecordFromMap(:EXTERNALVIEW, 'TestDB'), id, :instanceName)='MASTER'";
+ p = new ClusterJosqlQueryProcessor(manager);
+ bindVariables.clear();
+ bindVariables.put("instanceName", "localhost_12919");
+ result = p.runJoSqlQuery(SQL, bindVariables, null);
+
+ Assert.assertEquals(result.size(), 4);
+ Set<String> partitions = new HashSet<String>();
+ for(Object o : result)
+ {
+ String val = (String) ((List<Object>)o).get(0);
+ partitions.add(val);
+ }
+ Assert.assertTrue(partitions.contains("TestDB_6"));
+ Assert.assertTrue(partitions.contains("TestDB_7"));
+ Assert.assertTrue(partitions.contains("TestDB_9"));
+ Assert.assertTrue(partitions.contains("TestDB_14"));
+
+ // Find the partitions on host localhost_12919 and is on MASTER state
+ // Same as above but according to currentstates
+ SQL = "SELECT id " +
+ "FROM PARTITIONS " +
+ "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, :instanceName, 'TestDB'), :_currObj.id, :mapFieldKey)=:partitionState";
+
+ p = new ClusterJosqlQueryProcessor(manager);
+ bindVariables.clear();
+ bindVariables.put("instanceName", "localhost_12919");
+ bindVariables.put("mapFieldKey", "CURRENT_STATE");
+ bindVariables.put("partitionState", "MASTER");
+
+ result = p.runJoSqlQuery(SQL, bindVariables, null);
+
+ Assert.assertEquals(result.size(), 4);
+ partitions.clear();
+ partitions = new HashSet<String>();
+ for(Object o : result)
+ {
+ String val = (String) ((List<Object>)o).get(0);
+ partitions.add(val);
+ }
+ Assert.assertTrue(partitions.contains("TestDB_6"));
+ Assert.assertTrue(partitions.contains("TestDB_7"));
+ Assert.assertTrue(partitions.contains("TestDB_9"));
+ Assert.assertTrue(partitions.contains("TestDB_14"));
+
+ // get node name that hosts a certain partition with certain state
+
+ SQL = "SELECT id " +
+ "FROM LIVEINSTANCES " +
+ "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, id, 'TestDB'), :partitionName, :mapFieldKey)=:partitionState";
+
+ p = new ClusterJosqlQueryProcessor(manager);
+ bindVariables.clear();
+ bindVariables.put("partitionName", "TestDB_8");
+ bindVariables.put("mapFieldKey", "CURRENT_STATE");
+ bindVariables.put("partitionState", "SLAVE");
+
+ result = p.runJoSqlQuery(SQL, bindVariables, null);
+
+ Assert.assertEquals(result.size(), 2);
+ partitions.clear();
+ partitions = new HashSet<String>();
+ for(Object o : result)
+ {
+ String val = (String) ((List<Object>)o).get(0);
+ partitions.add(val);
+ }
+ Assert.assertTrue(partitions.contains("localhost_12918"));
+ Assert.assertTrue(partitions.contains("localhost_12922"));
+ }
+
+ @Test (groups = {"unitTest"})
+ public void parseFromTarget()
+ {
+ ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(null);
+ String sql = "SELECT id " +
+ "FROM LIVEINSTANCES ";
+ String from = p.parseFromTarget(sql);
+ Assert.assertTrue(from.equals("LIVEINSTANCES"));
+
+ sql = "SELECT id " +
+ "FROM LIVEINSTANCES WHERE 1=2";
+
+ from = p.parseFromTarget(sql);
+ Assert.assertTrue(from.equals("LIVEINSTANCES"));
+
+ sql = "SELECT id " +
+ "FROM LIVEINSTANCES";
+
+ from = p.parseFromTarget(sql);
+ Assert.assertTrue(from.equals("LIVEINSTANCES"));
+
+ sql = "SELECT id " +
+ " LIVEINSTANCES where tt=00";
+ boolean exceptionThrown = false;
+ try
+ {
+ from = p.parseFromTarget(sql);
+ }
+ catch(HelixException e)
+ {
+ exceptionThrown = true;
+ }
+ Assert.assertTrue(exceptionThrown);
+ }
+
+ @Test (groups=("unitTest"))
+ public void testOrderby() throws Exception
+ {
+ HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+
+ Map<String, ZNRecord> scnMap = new HashMap<String, ZNRecord>();
+ for(int i = 0;i < NODE_NR; i++)
+ {
+ String instance = "localhost_"+(12918+i);
+ ZNRecord metaData = new ZNRecord(instance);
+ metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
+ UUID.randomUUID().toString());
+ metaData.setMapField("SCN", new HashMap<String, String>());
+ for(int j = 0;j < _PARTITIONS; j++)
+ {
+ metaData.getMapField("SCN").put(TEST_DB+"_"+j, ""+i);
+ }
+ scnMap.put(instance, metaData);
+ }
+ Map<String, Object> bindVariables = new HashMap<String, Object>();
+ bindVariables.put("scnMap", scnMap);
+ String SQL =
+ " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey) AS 'SCN'" +
+ " FROM EXTERNALVIEW.Table " +
+ " WHERE mapKey LIKE 'TestDB_1' " +
+ " AND mapSubKey LIKE '%' " +
+ " AND mapValue LIKE 'SLAVE' " +
+ " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) " +
+ " ORDER BY parseInt(getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey))";
+
+ ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
+ List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
+ int prevSCN = -1;
+ for(Object row : result)
+ {
+ List<String> stringRow = (List<String>)row;
+ Assert.assertTrue(stringRow.get(1).equals("SLAVE"));
+ int scn = Integer.parseInt(stringRow.get(2));
+ Assert.assertTrue(scn > prevSCN);
+ prevSCN = scn;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/MockListener.java b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
new file mode 100644
index 0000000..3e68507
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager;
+
+import java.util.List;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+
+
+public class MockListener implements IdealStateChangeListener, LiveInstanceChangeListener,
+ ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+ ControllerChangeListener, MessageListener
+
+{
+ public boolean isIdealStateChangeListenerInvoked = false;
+ public boolean isLiveInstanceChangeListenerInvoked = false;
+ public boolean isCurrentStateChangeListenerInvoked = false;
+ public boolean isMessageListenerInvoked = false;
+ public boolean isConfigChangeListenerInvoked = false;
+ public boolean isExternalViewChangeListenerInvoked = false;
+ public boolean isControllerChangeListenerInvoked = false;
+
+ public void reset()
+ {
+ isIdealStateChangeListenerInvoked = false;
+ isLiveInstanceChangeListenerInvoked = false;
+ isCurrentStateChangeListenerInvoked = false;
+ isMessageListenerInvoked = false;
+ isConfigChangeListenerInvoked = false;
+ isExternalViewChangeListenerInvoked = false;
+ isControllerChangeListenerInvoked = false;
+ }
+
+ @Override
+ public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext)
+ {
+ isIdealStateChangeListenerInvoked = true;
+ }
+
+ @Override
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext)
+ {
+ isLiveInstanceChangeListenerInvoked = true;
+ }
+
+ @Override
+ public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
+ {
+ isConfigChangeListenerInvoked = true;
+ }
+
+ @Override
+ public void onStateChange(String instanceName,
+ List<CurrentState> statesInfo,
+ NotificationContext changeContext)
+ {
+ isCurrentStateChangeListenerInvoked = true;
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext)
+ {
+ isExternalViewChangeListenerInvoked = true;
+ }
+
+ @Override
+ public void onControllerChange(NotificationContext changeContext)
+ {
+ isControllerChangeListenerInvoked = true;
+ }
+
+ @Override
+ public void onMessage(String instanceName,
+ List<Message> messages,
+ NotificationContext changeContext)
+ {
+ isMessageListenerInvoked = true;
+ }
+}