You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:59 UTC

[35/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
new file mode 100644
index 0000000..3bfa1c0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -0,0 +1,425 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.ZKPathDataDumpTask;
+import org.apache.helix.util.HelixUtil;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
+{
+  TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
+  public static class TestMessagingHandlerFactory implements
+      MessageHandlerFactory
+  {
+    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+    @Override
+    public MessageHandler createHandler(Message message,
+        NotificationContext context)
+    {
+      return new TestMessagingHandler(message, context);
+    }
+
+    @Override
+    public String getMessageType()
+    {
+      return "TestParticipant";
+    }
+
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    public class TestMessagingHandler extends MessageHandler
+    {
+      public TestMessagingHandler(Message message, NotificationContext context)
+      {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException
+      {
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        String destName = _message.getTgtName();
+        synchronized (_results)
+        {
+          if (!_results.containsKey(_message.getPartitionName()))
+          {
+            _results.put(_message.getPartitionName(),
+                new ConcurrentSkipListSet<String>());
+          }
+        }
+        _results.get(_message.getPartitionName()).add(destName);
+
+        return result;
+      }
+
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type)
+      {
+        // TODO Auto-generated method stub
+
+      }
+    }
+  }
+
+  @Test()
+  public void TestSchedulerMsg() throws Exception
+  {
+    _factory._results.clear();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    helixDataAccessor.createProperty(
+        keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+        schedulerMessage);
+
+    Thread.sleep(15000);
+
+    Assert.assertEquals(_PARTITIONS, _factory._results.size());
+    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+        MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+        .getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+        .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for(String key : statusUpdate.getMapFields().keySet())
+    {
+      if(key.startsWith("MessageResult "))
+      {
+        messageResultCount ++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    
+    int count = 0;
+    for (Set<String> val : _factory._results.values())
+    {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+    
+    // test the ZkPathDataDumpTask
+    String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(),
+        PropertyType.STATUSUPDATES_CONTROLLER);
+    List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
+    Assert.assertTrue(subPaths.size() > 0);
+    for(String subPath : subPaths)
+    {
+      String nextPath = controllerStatusPath + "/" + subPath;
+      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      Assert.assertTrue(subsubPaths.size() > 0);
+    }
+    
+    String instanceStatusPath = HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
+        PropertyType.STATUSUPDATES);
+    
+    subPaths = _zkClient.getChildren(instanceStatusPath);
+    Assert.assertTrue(subPaths.size() > 0);
+    for(String subPath : subPaths)
+    {
+      String nextPath = instanceStatusPath + "/" + subPath;
+      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      Assert.assertTrue(subsubPaths.size() > 0);
+      for(String subsubPath : subsubPaths)
+      {
+        String nextnextPath = nextPath + "/" + subsubPath;
+        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
+      }
+    }
+    
+    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
+    dumpTask.run();
+    
+    subPaths = _zkClient.getChildren(controllerStatusPath);
+    Assert.assertTrue(subPaths.size() > 0);
+    for(String subPath : subPaths)
+    {
+      String nextPath = controllerStatusPath + "/" + subPath;
+      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      Assert.assertTrue(subsubPaths.size() == 0);
+    }
+    
+    subPaths = _zkClient.getChildren(instanceStatusPath);
+    Assert.assertTrue(subPaths.size() > 0);
+    for(String subPath : subPaths)
+    {
+      String nextPath = instanceStatusPath + "/" + subPath;
+      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      Assert.assertTrue(subsubPaths.size() > 0);
+      for(String subsubPath : subsubPaths)
+      {
+        String nextnextPath = nextPath + "/" + subsubPath;
+        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
+      }
+    }
+  }
+  
+
+  @Test()
+  public void TestSchedulerMsg2() throws Exception
+  {
+    _factory._results.clear();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+    
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+    
+    class MockAsyncCallback extends AsyncCallback
+    {
+      Message _message;
+      public MockAsyncCallback()
+      {
+      }
+
+      @Override
+      public void onTimeOut()
+      {
+        // TODO Auto-generated method stub
+
+      }
+
+      @Override
+      public void onReplyMessage(Message message)
+      {
+        _message = message;
+      }
+
+    }
+    MockAsyncCallback callback = new MockAsyncCallback();
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+    
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+    Assert.assertEquals(_PARTITIONS, _factory._results.size());
+    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+        MessageType.SCHEDULER_MSG.toString(), msgId);
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+        .getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+        .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for(String key : statusUpdate.getMapFields().keySet())
+    {
+      if(key.startsWith("MessageResult "))
+      {
+        messageResultCount ++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    
+    int count = 0;
+    for (Set<String> val : _factory._results.values())
+    {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+  }
+
+  @Test()
+  public void TestSchedulerZeroMsg() throws Exception
+  {
+    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageType(), factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_DOESNOTEXIST");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    PropertyKey controllerMessageKey = keyBuilder
+        .controllerMessage(schedulerMessage.getMsgId());
+    helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
+
+    Thread.sleep(3000);
+
+    Assert.assertEquals(0, factory._results.size());
+    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+        MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+        .getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+        .get("MessageCount").equals("0"));
+    int count = 0;
+    for (Set<String> val : factory._results.values())
+    {
+      count += val.size();
+    }
+    Assert.assertEquals(count, 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
new file mode 100644
index 0000000..8cc2380
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
@@ -0,0 +1,66 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSchemataSM extends ZkIntegrationTestBase
+{
+  @Test
+  public void testSchemataSM() throws Exception
+  {
+    String testName = "TestSchemataSM";
+    String clusterName = testName;
+
+    MockParticipant[] participants = new MockParticipant[5];
+//    Logger.getRootLogger().setLevel(Level.INFO);
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
+                                                         // port
+                            "localhost", // participant name prefix
+                            "TestSchemata", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            1, // replicas
+                            "STORAGE_DEFAULT_SM_SCHEMATA",
+                            true); // do rebalance
+
+    TestHelper.startController(clusterName,
+                               "controller_0",
+                               ZK_ADDR,
+                               HelixControllerMain.STANDALONE);
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] =
+          new MockParticipant(clusterName,
+                              instanceName,
+                              ZK_ADDR,
+                              null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
new file mode 100644
index 0000000..12e500a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -0,0 +1,115 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkTestHelper.TestZkHelixManager;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+
+public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
+{
+
+  public class SessionExpiryTransition extends MockTransition
+  {
+    private final AtomicBoolean _done = new AtomicBoolean();
+
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+    {
+      TestZkHelixManager manager = (TestZkHelixManager)context.getManager();
+     
+      String instance = message.getTgtName();
+      String partition = message.getPartitionName();
+      if (instance.equals("localhost_12918")
+          && partition.equals("TestDB0_1")  // TestDB0_1 is SLAVE on localhost_12918
+          && _done.getAndSet(true) == false)
+      {
+        try
+        {
+          ZkTestHelper.expireSession(manager.getZkClient());
+        }
+        catch (Exception e)
+        {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+ 
+  // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
+  // when there is pending messages
+  // @Test
+  public void testSessionExpiryInTransition() throws Exception
+  {
+    Logger.getRootLogger().setLevel(Level.WARN);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipant[] participants = new MockParticipant[5];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            10, // partitions per resource
+                            5, // number of nodes
+                            3, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      TestZkHelixManager manager =
+          new TestZkHelixManager(clusterName,
+                                 instanceName,
+                                 InstanceType.PARTICIPANT,
+                                 ZK_ADDR);
+      participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // clean up
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    Thread.sleep(2000);
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
new file mode 100644
index 0000000..21bde31
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestStandAloneCMMain extends ZkStandAloneCMTestBase
+{
+  private static Logger logger = Logger.getLogger(TestStandAloneCMMain.class);
+
+  @Test()
+  public void testStandAloneCMMain() throws Exception
+  {
+    logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
+
+    for (int i = 1; i <= 2; i++)
+    {
+      String controllerName = "controller_" + i;
+      StartCMResult startResult =
+          TestHelper.startController(CLUSTER_NAME,
+                                            controllerName,
+                                            ZK_ADDR,
+                                            HelixControllerMain.STANDALONE);
+      _startCMResultMap.put(controllerName, startResult);
+    }
+
+    stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    logger.info("STOP testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
new file mode 100644
index 0000000..b26a346
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -0,0 +1,136 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkTestHelper.TestZkHelixManager;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestStandAloneCMSessionExpiry.class);
+
+  @Test()
+  public void testStandAloneCMSessionExpiry() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.DEBUG);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName,
+                            ZK_ADDR,
+                            12918,
+                            PARTICIPANT_PREFIX,
+                            "TestDB",
+                            1,
+                            20,
+                            5,
+                            3,
+                            "MasterSlave",
+                            true);
+
+    MockParticipant[] participants = new MockParticipant[5];
+    for (int i = 0; i < 5; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      TestZkHelixManager manager =
+          new TestZkHelixManager(clusterName,
+                                 instanceName,
+                                 InstanceType.PARTICIPANT,
+                                 ZK_ADDR);
+      participants[i] = new MockParticipant(manager, null);
+      participants[i].syncStart();
+    }
+
+    TestZkHelixManager controller =
+        new TestZkHelixManager(clusterName,
+                               "controller_0",
+                               InstanceType.CONTROLLER,
+                               ZK_ADDR);
+    controller.connect();
+
+    boolean result;
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                   clusterName));
+    Assert.assertTrue(result);
+
+    // participant session expiry
+    TestZkHelixManager participantToExpire = (TestZkHelixManager)participants[1].getManager();
+
+    System.out.println("Expire participant session");
+    String oldSessionId = participantToExpire.getSessionId();
+    
+    ZkTestHelper.expireSession(participantToExpire.getZkClient());
+    String newSessionId = participantToExpire.getSessionId();
+    System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+    Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
+    setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                   clusterName));
+    Assert.assertTrue(result);
+
+    // controller session expiry
+    System.out.println("Expire controller session");
+    oldSessionId = controller.getSessionId();
+    ZkTestHelper.expireSession(controller.getZkClient());
+    newSessionId = controller.getSessionId();
+    System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+    Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
+
+    setupTool.addResourceToCluster(clusterName, "TestDB2", 8, "MasterSlave");
+    setupTool.rebalanceStorageCluster(clusterName, "TestDB2", 3);
+
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                                   clusterName));
+    Assert.assertTrue(result);
+
+    // clean up
+    System.out.println("Clean up ...");
+    // Logger.getRootLogger().setLevel(Level.DEBUG);
+    controller.disconnect();
+    Thread.sleep(100);
+    for (int i = 0; i < 5; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
new file mode 100644
index 0000000..83df0e4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -0,0 +1,210 @@
+package org.apache.helix.integration;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.mock.storage.MockJobIntf;
+import org.apache.helix.mock.storage.MockParticipant;
+import org.apache.helix.mock.storage.MockTransition;
+import org.apache.helix.mock.storage.MockParticipant.MockMSStateModel;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase
+{
+  private static Logger                LOG               =
+      Logger.getLogger(TestStateTransitionTimeout.class);
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    System.out.println("START " + CLASS_NAME + " at "
+        + new Date(System.currentTimeMillis()));
+
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    String namespace = "/" + CLUSTER_NAME;
+    if (_zkClient.exists(namespace))
+    {
+      _zkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+    
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+    
+    // Set the timeout values
+    IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + Message.Attributes.TIMEOUT;
+    idealState.getRecord().setSimpleField(stateTransition, "300");
+    
+    String command = "-zkSvr " + ZK_ADDR + " -addResourceProperty "+ CLUSTER_NAME + " " + TEST_DB + " " + stateTransition + " 200";
+    ClusterSetup.processCommandLineArgs(command.split(" "));
+  }
+  
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+  public static class TimeOutStateModel extends MockParticipant.MockMSStateModel
+  {
+    boolean _sleep = false;
+    StateTransitionError _error;
+    int _errorCallcount = 0;
+    public TimeOutStateModel(MockTransition transition, boolean sleep)
+    {
+      super(transition);
+      _sleep = sleep;
+    }
+
+    @Transition(to="SLAVE",from="OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
+    {
+      LOG.info("Become SLAVE from OFFLINE");
+      
+    }
+
+    @Transition(to="MASTER",from="SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become MASTER from SLAVE");
+      if (_transition != null && _sleep)
+      {
+        _transition.doTransition(message, context);
+      }
+    }
+
+    @Transition(to="SLAVE",from="MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
+    {
+      LOG.info("Become SLAVE from MASTER");
+    }
+
+    @Transition(to="OFFLINE",from="SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
+    {
+      LOG.info("Become OFFLINE from SLAVE");
+      
+    }
+
+    @Transition(to="DROPPED",from="OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
+    {
+      LOG.info("Become DROPPED from OFFLINE");
+      
+    }
+    
+    public void rollbackOnError(Message message, NotificationContext context,
+        StateTransitionError error)
+    {
+      _error = error;
+      _errorCallcount ++;
+    }
+  }
+  
+  public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel>
+  {
+    Set<String> partitionsToSleep = new HashSet<String>();
+    int _sleepTime;
+    
+    public SleepStateModelFactory(int sleepTime)
+    {
+      _sleepTime = sleepTime;
+    }
+    
+    public void setPartitions(Collection<String> partitions)
+    {
+      partitionsToSleep.addAll(partitions);
+    }
+    
+    public void addPartition(String partition)
+    {
+      partitionsToSleep.add(partition);
+    }
+    
+    @Override
+    public TimeOutStateModel createNewStateModel(String stateUnitKey)
+    {
+      return new TimeOutStateModel(new MockParticipant.SleepTransition(_sleepTime), partitionsToSleep.contains(stateUnitKey));
+    }
+  }
+  
+  @Test
+  public void testStateTransitionTimeOut() throws Exception
+  {
+    Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
+    MockParticipant[] participants = new MockParticipant[NODE_NR];
+    IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      SleepStateModelFactory factory = new SleepStateModelFactory(1000);
+      factories.put(instanceName, factory);
+      for(String p : idealState.getPartitionSet())
+      {
+        if(idealState.getPreferenceList(p).get(0).equals(instanceName))
+        {
+          factory.addPartition(p);
+        }
+      }
+      
+      participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
+      participants[i].syncStart();
+    }
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    StartCMResult startResult =
+        TestHelper.startController(CLUSTER_NAME,
+                                   controllerName,
+                                   ZK_ADDR,
+                                   HelixControllerMain.STANDALONE);
+    _startCMResultMap.put(controllerName, startResult);
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              CLUSTER_NAME));
+    Assert.assertTrue(result);
+    HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
+    
+    Builder kb = accessor.keyBuilder();
+    ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+    for(String p : idealState.getPartitionSet())
+    {
+      String idealMaster = idealState.getPreferenceList(p).get(0);
+      Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+      
+      TimeOutStateModel model = factories.get(idealMaster).getStateModel(p);
+      Assert.assertEquals(model._errorCallcount , 1);
+      Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
new file mode 100644
index 0000000..8c0953f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.testng.Assert;
+
+
+public class TestStatusUpdate extends ZkStandAloneCMTestBase
+{
+  // For now write participant StatusUpdates to log4j.
+  // TODO: Need to investigate another data channel to report to controller and re-enable
+  // this test
+  // @Test
+  public void testParticipantStatusUpdates() throws Exception
+  {
+    ZkClient zkClient = new ZkClient(ZkIntegrationTestBase.ZK_ADDR);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
+    Assert.assertNotNull(extViews);
+
+    for (ExternalView extView : extViews)
+    {
+      String resourceName = extView.getResourceName();
+      Set<String> partitionSet = extView.getPartitionSet();
+      for (String partition : partitionSet)
+      {
+        Map<String, String> stateMap = extView.getStateMap(partition);
+        for (String instance : stateMap.keySet())
+        {
+          String state = stateMap.get(instance);
+          StatusUpdateUtil.StatusUpdateContents statusUpdates =
+              StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor,
+                                                                            instance,
+                                                                            resourceName,
+                                                                            partition);
+
+          Map<String, StatusUpdateUtil.TaskStatus> taskMessages =
+              statusUpdates.getTaskMessages();
+          List<StatusUpdateUtil.Transition> transitions = statusUpdates.getTransitions();
+          if (state.equals("MASTER"))
+          {
+            Assert.assertEquals(transitions.size() >= 2,
+                                true,
+                                "Invalid number of transitions");
+            StatusUpdateUtil.Transition lastTransition =
+                transitions.get(transitions.size() - 1);
+            StatusUpdateUtil.Transition prevTransition =
+                transitions.get(transitions.size() - 2);
+            Assert.assertEquals(taskMessages.get(lastTransition.getMsgID()),
+                                StatusUpdateUtil.TaskStatus.COMPLETED,
+                                "Incomplete transition");
+            Assert.assertEquals(taskMessages.get(prevTransition.getMsgID()),
+                                StatusUpdateUtil.TaskStatus.COMPLETED,
+                                "Incomplete transition");
+            Assert.assertEquals(lastTransition.getFromState(), "SLAVE", "Invalid State");
+            Assert.assertEquals(lastTransition.getToState(), "MASTER", "Invalid State");
+          }
+          else if (state.equals("SLAVE"))
+          {
+            Assert.assertEquals(transitions.size() >= 1,
+                                true,
+                                "Invalid number of transitions");
+            StatusUpdateUtil.Transition lastTransition =
+                transitions.get(transitions.size() - 1);
+            Assert.assertEquals(lastTransition.getFromState().equals("MASTER")
+                                    || lastTransition.getFromState().equals("OFFLINE"),
+                                true,
+                                "Invalid transition");
+            Assert.assertEquals(lastTransition.getToState(), "SLAVE", "Invalid State");
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
new file mode 100644
index 0000000..45b0fc2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -0,0 +1,128 @@
+package org.apache.helix.integration;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSwapInstance extends ZkStandAloneCMTestBase
+{
+  @Test
+  public void TestSwap() throws Exception
+  {
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
+    
+    
+    ZNRecord idealStateOld1 = new ZNRecord("TestDB");
+    ZNRecord idealStateOld2 = new ZNRecord("MyDB");
+    
+    IdealState is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
+    idealStateOld1.merge(is1.getRecord());
+    
+
+    IdealState is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
+    idealStateOld2.merge(is2.getRecord());
+    
+    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+    
+    String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444);
+    _setupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2);
+    
+    boolean exception = false;
+    try
+    {
+      _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
+    }
+    catch(Exception e)
+    {
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+    
+    _startCMResultMap.get(instanceName)._manager.disconnect();
+    _startCMResultMap.get(instanceName)._thread.interrupt();
+    Thread.sleep(1000);
+    
+    exception = false;
+    try
+    {
+      _setupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
+    }
+    catch(Exception e)
+    {
+      e.printStackTrace();
+      exception = true;
+    }
+    Assert.assertFalse(exception);
+    StartCMResult result2 =
+        TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
+    _startCMResultMap.put(instanceName2, result2);
+    
+    result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+    
+    is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
+    
+    is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
+    
+    for(String key : idealStateOld1.getMapFields().keySet())
+    {
+      for(String host : idealStateOld1.getMapField(key).keySet())
+      {
+        if(host.equals(instanceName))
+        {
+          Assert.assertTrue(
+          idealStateOld1.getMapField(key).get(instanceName).equals(
+          is1.getRecord().getMapField(key).get(instanceName2)));
+        }
+        else
+        {
+          Assert.assertTrue(
+              idealStateOld1.getMapField(key).get(host).equals(
+              is1.getRecord().getMapField(key).get(host)));
+        }
+      }
+    }
+    
+    for(String key : idealStateOld1.getListFields().keySet())
+    {
+      Assert.assertEquals(idealStateOld1.getListField(key).size() , is1.getRecord().getListField(key).size());
+      for(int i = 0; i < idealStateOld1.getListField(key).size(); i++)
+      {
+        String host = idealStateOld1.getListField(key).get(i);
+        String newHost = is1.getRecord().getListField(key).get(i);
+        if(host.equals(instanceName))
+        {
+          Assert.assertTrue(
+              newHost.equals(instanceName2));
+        }
+        else
+        {
+          //System.out.println(key + " " + i+ " " + host + " "+newHost);
+          //System.out.println(idealStateOld1.getListField(key));
+          //System.out.println(is1.getRecord().getListField(key));
+          
+          Assert.assertTrue(host.equals(newHost));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
new file mode 100644
index 0000000..c985f2a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Map;
+import java.util.logging.Level;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.TestHelper;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+public class ZkIntegrationTestBase
+{
+  private static Logger         LOG                       =
+                                                              Logger.getLogger(ZkIntegrationTestBase.class);
+
+  protected static ZkServer     _zkServer;
+  protected static ZkClient     _gZkClient;
+  protected static ClusterSetup _gSetupTool;
+
+  public static final String    ZK_ADDR                   = "localhost:2183";
+  protected static final String CLUSTER_PREFIX            = "CLUSTER";
+  protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
+
+  protected final String        CONTROLLER_PREFIX         = "controller";
+  protected final String        PARTICIPANT_PREFIX        = "localhost";
+
+  @BeforeSuite
+  public void beforeSuite() throws Exception
+  {
+    // TODO: use logging.properties file to config java.util.logging.Logger levels
+    java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
+    topJavaLogger.setLevel(Level.WARNING);
+    
+    _zkServer = TestHelper.startZkSever(ZK_ADDR);
+    AssertJUnit.assertTrue(_zkServer != null);
+    ZKClientPool.reset();
+
+    _gZkClient = new ZkClient(ZK_ADDR);
+    _gZkClient.setZkSerializer(new ZNRecordSerializer());
+    _gSetupTool = new ClusterSetup(ZK_ADDR);
+  }
+
+  @AfterSuite
+  public void afterSuite()
+  {
+    ZKClientPool.reset();
+    TestHelper.stopZkServer(_zkServer);
+    _gZkClient.close();
+  }
+
+  protected String getShortClassName()
+  {
+    String className = this.getClass().getName();
+    return className.substring(className.lastIndexOf('.') + 1);
+  }
+
+  protected String getCurrentLeader(ZkClient zkClient, String clusterName)
+  {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader == null)
+    {
+      return null;
+    }
+    return leader.getInstanceName();
+  }
+
+  /**
+   * Stop current leader and returns the new leader
+   * 
+   * @param zkClient
+   * @param clusterName
+   * @param startCMResultMap
+   * @return
+   */
+  protected String stopCurrentLeader(ZkClient zkClient,
+                                     String clusterName,
+                                     Map<String, StartCMResult> startCMResultMap)
+  {
+    String leader = getCurrentLeader(zkClient, clusterName);
+    Assert.assertTrue(leader != null);
+    System.out.println("stop leader: " + leader + " in " + clusterName);
+    Assert.assertTrue(leader != null);
+
+    StartCMResult result = startCMResultMap.remove(leader);
+    Assert.assertTrue(result._manager != null);
+    result._manager.disconnect();
+
+    Assert.assertTrue(result._thread != null);
+    result._thread.interrupt();
+
+    boolean isNewLeaderElected = false;
+    String newLeader = null;
+    try
+    {
+      for (int i = 0; i < 5; i++)
+      {
+        Thread.sleep(1000);
+        newLeader = getCurrentLeader(zkClient, clusterName);
+        if (!newLeader.equals(leader))
+        {
+          isNewLeaderElected = true;
+          System.out.println("new leader elected: " + newLeader + " in " + clusterName);
+          break;
+        }
+      }
+    }
+    catch (InterruptedException e)
+    {
+      e.printStackTrace();
+    }
+    if (isNewLeaderElected == false)
+    {
+      System.out.println("fail to elect a new leader in " + clusterName);
+    }
+    AssertJUnit.assertTrue(isNewLeaderElected);
+    return newLeader;
+  }
+
+  protected void enableHealthCheck(String clusterName)
+  {
+    ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
+    new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
new file mode 100644
index 0000000..a203539
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+/**
+ * 
+ * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
+ * start 5 dummy participants verify the current states at end
+ */
+
+public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
+{
+  private static Logger                LOG               =
+                                                             Logger.getLogger(ZkStandAloneCMTestBase.class);
+
+  protected static final int           NODE_NR           = 5;
+  protected static final int           START_PORT        = 12918;
+  protected static final String        STATE_MODEL       = "MasterSlave";
+  protected static final String        TEST_DB           = "TestDB";
+  protected static final int           _PARTITIONS       = 20;
+
+  protected ClusterSetup               _setupTool        = null;
+  protected final String               CLASS_NAME        = getShortClassName();
+  protected final String               CLUSTER_NAME      = CLUSTER_PREFIX + "_"
+                                                             + CLASS_NAME;
+
+  protected Map<String, StartCMResult> _startCMResultMap =
+                                                             new HashMap<String, StartCMResult>();
+  protected ZkClient                   _zkClient;
+  
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+//    Logger.getRootLogger().setLevel(Level.INFO);
+    System.out.println("START " + CLASS_NAME + " at "
+        + new Date(System.currentTimeMillis()));
+
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    String namespace = "/" + CLUSTER_NAME;
+    if (_zkClient.exists(namespace))
+    {
+      _zkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(ZK_ADDR);
+
+    // setup storage cluster
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String storageNodeName = PARTICIPANT_PREFIX + ":" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
+
+    // start dummy participants
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      if (_startCMResultMap.get(instanceName) != null)
+      {
+        LOG.error("fail to start particpant:" + instanceName
+            + "(participant with same name already exists)");
+      }
+      else
+      {
+        StartCMResult result =
+            TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+        _startCMResultMap.put(instanceName, result);
+      }
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    StartCMResult startResult =
+        TestHelper.startController(CLUSTER_NAME,
+                                   controllerName,
+                                   ZK_ADDR,
+                                   HelixControllerMain.STANDALONE);
+    _startCMResultMap.put(controllerName, startResult);
+    
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
+                                                                              CLUSTER_NAME));
+
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception
+  {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+   
+    StartCMResult result;
+    Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
+    while (it.hasNext())
+    {
+      String instanceName = it.next().getKey();
+      if (instanceName.startsWith(CONTROLLER_PREFIX))
+      {
+        result = _startCMResultMap.get(instanceName);
+        result._manager.disconnect();
+        result._thread.interrupt();
+        it.remove();
+      }
+    }
+
+    Thread.sleep(100);
+    it = _startCMResultMap.entrySet().iterator();
+    while (it.hasNext())
+    {
+      String instanceName = it.next().getKey();
+      result = _startCMResultMap.get(instanceName);
+      result._manager.disconnect();
+      result._thread.interrupt();
+      it.remove();
+    }
+
+    _zkClient.close();
+    // logger.info("END at " + new Date(System.currentTimeMillis()));
+    System.out.println("END " + CLASS_NAME + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
new file mode 100644
index 0000000..3e27bc4
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.integration;
+
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.model.StatusUpdate;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * 
+ * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
+ * start 5 dummy participants verify the current states at end
+ */
+
+public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase
+{
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    ZKPropertyTransferServer.PERIOD = 500;
+    ZkPropertyTransferClient.SEND_PERIOD = 500;
+    ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
+    super.beforeClass();
+    
+    Thread.sleep(1000);
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      if (_startCMResultMap.get(instanceName) != null)
+      {
+        HelixDataAccessor accessor = _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+        Builder kb = accessor.keyBuilder();
+        List<StatusUpdate> statusUpdates = accessor.getChildValues(
+            kb.stateTransitionStatus(instanceName, _startCMResultMap.get(instanceName)._manager.getSessionId(),
+                TEST_DB));
+        Assert.assertTrue(statusUpdates.size() > 0);
+        for(StatusUpdate update : statusUpdates)
+        {
+          Assert.assertTrue(update.getRecord().getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
+          Assert.assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception
+  {
+    super.afterClass();
+    ZKPropertyTransferServer.getInstance().shutdown();
+    ZKPropertyTransferServer.getInstance().reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
new file mode 100644
index 0000000..e4c7659
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/josql/TestClusterJosqlQueryProcessor.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.josql.ZNRecordJosqlFunctionHandler;
+import org.apache.helix.josql.ZNRecordRow;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.tools.IdealStateCalculatorForStorageNode;
+import org.josql.Query;
+import org.josql.QueryExecutionException;
+import org.josql.QueryParseException;
+import org.josql.QueryResults;
+import org.testng.annotations.Test;
+
+
+public class TestClusterJosqlQueryProcessor
+{
+  @Test (groups = {"unitTest"})
+  public void queryClusterDataSample() 
+  {
+    List<ZNRecord> liveInstances = new ArrayList<ZNRecord>();
+    Map<String, ZNRecord> liveInstanceMap = new HashMap<String, ZNRecord>();
+    List<String> instances = new ArrayList<String>();
+    for(int i = 0;i<5; i++)
+    {
+      String instance = "localhost_"+(12918+i);
+      instances.add(instance);
+      ZNRecord metaData = new ZNRecord(instance);
+      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
+          UUID.randomUUID().toString());
+      metaData.setSimpleField("SCN", "" + (10-i));
+      liveInstances.add(metaData);
+      liveInstanceMap.put(instance, metaData);
+    }
+    
+    //liveInstances.remove(0);
+    ZNRecord externalView = IdealStateCalculatorForStorageNode.calculateIdealState(
+        instances, 21, 3, "TestDB", "MASTER", "SLAVE");
+    
+    
+    Criteria criteria = new Criteria();
+    criteria.setInstanceName("%");
+    criteria.setResource("TestDB");
+    criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    criteria.setPartition("TestDB_2%");
+    criteria.setPartitionState("SLAVE");
+    
+    String josql = 
+      " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN') AS 'SCN'" +
+      " FROM org.apache.helix.josql.ZNRecordRow " + 
+      " WHERE mapKey LIKE 'TestDB_2%' " +
+        " AND mapSubKey LIKE '%' " +
+        " AND mapValue LIKE 'SLAVE' " +
+        " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) " +
+        " ORDER BY parseInt(getSimpleFieldValue(getZNRecordFromMap(:LIVEINSTANCESMAP, mapSubKey), 'SCN'))";
+    
+    Query josqlQuery = new Query();
+    josqlQuery.setVariable("LIVEINSTANCES", liveInstances);
+    josqlQuery.setVariable("LIVEINSTANCESMAP", liveInstanceMap);
+    josqlQuery.addFunctionHandler(new ZNRecordRow());
+    josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
+    josqlQuery.addFunctionHandler(new Integer(0));
+    try
+    {
+      josqlQuery.parse(josql);
+      QueryResults qr = josqlQuery.execute(ZNRecordRow.convertMapFields(externalView));
+      @SuppressWarnings({ "unchecked", "unused" })
+      List<Object> result = qr.getResults();
+      
+    } 
+    catch (QueryParseException e)
+    {
+      e.printStackTrace();
+    } catch (QueryExecutionException e)
+    {
+      e.printStackTrace();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
new file mode 100644
index 0000000..082d52e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
@@ -0,0 +1,228 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.josql;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.josql.ClusterJosqlQueryProcessor;
+import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestJosqlProcessor extends ZkStandAloneCMTestBase
+{
+  @Test (groups = {"integrationTest"})
+  public void testJosqlQuery() throws Exception
+  {
+    HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+    
+    // Find the instance name that contains partition TestDB_2 and state is 'MASTER'
+    String SQL = "SELECT id  " + 
+        "FROM LIVEINSTANCES " + 
+        "WHERE getMapFieldValue( getZNRecordFromMap(:IDEALSTATES , 'TestDB'), :partitionName, :_currObj.id)='MASTER'";
+    Map<String, Object> bindVariables = new HashMap<String, Object>();
+    bindVariables.put("partitionName", "TestDB_2");
+    
+    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
+    List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
+    
+    Assert.assertEquals(result.size(), 1);
+    List<Object> firstList = (List<Object>) result.get(0);
+    Assert.assertTrue(((String)(firstList.get(0))).equalsIgnoreCase("localhost_12921"));
+    
+    // Find the live instances names that hosts Partition TestDB_10 according to idealstate
+    
+    SQL = "SELECT id  " + 
+        "FROM LIVEINSTANCES " + 
+        "WHERE hasMapFieldKey( getZNRecordFromMap(:IDEALSTATES, 'TestDB'), :partitionName, :_currObj.id)='true'";
+    p = new ClusterJosqlQueryProcessor(manager);
+    bindVariables.put("partitionName", "TestDB_10");
+    result = p.runJoSqlQuery(SQL, bindVariables, null);
+    
+    Assert.assertEquals(result.size(), 3);
+    Set<String> hosts = new HashSet<String>();
+    for(Object o : result)
+    {
+      String val = (String) ((List<Object>)o).get(0);
+      hosts.add(val);
+    }
+    Assert.assertTrue(hosts.contains("localhost_12918"));
+    Assert.assertTrue(hosts.contains("localhost_12920"));
+    Assert.assertTrue(hosts.contains("localhost_12921"));
+    
+    // Find the partitions on host localhost_12919 and is on MASTER state
+    SQL = "SELECT id  " + 
+        "FROM PARTITIONS " + 
+        "WHERE getMapFieldValue( getZNRecordFromMap(:EXTERNALVIEW, 'TestDB'), id, :instanceName)='MASTER'";
+    p = new ClusterJosqlQueryProcessor(manager);
+    bindVariables.clear();
+    bindVariables.put("instanceName", "localhost_12919");
+    result = p.runJoSqlQuery(SQL, bindVariables, null);
+    
+    Assert.assertEquals(result.size(), 4);
+    Set<String> partitions = new HashSet<String>();
+    for(Object o : result)
+    {
+      String val = (String) ((List<Object>)o).get(0);
+      partitions.add(val);
+    }
+    Assert.assertTrue(partitions.contains("TestDB_6"));
+    Assert.assertTrue(partitions.contains("TestDB_7"));
+    Assert.assertTrue(partitions.contains("TestDB_9"));
+    Assert.assertTrue(partitions.contains("TestDB_14"));
+
+    // Find the partitions on host localhost_12919 and is on MASTER state
+    // Same as above but according to currentstates
+    SQL = "SELECT id  " + 
+        "FROM PARTITIONS " + 
+        "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, :instanceName, 'TestDB'), :_currObj.id, :mapFieldKey)=:partitionState";
+    
+    p = new ClusterJosqlQueryProcessor(manager);
+    bindVariables.clear();
+    bindVariables.put("instanceName", "localhost_12919");
+    bindVariables.put("mapFieldKey", "CURRENT_STATE");
+    bindVariables.put("partitionState", "MASTER");
+    
+    result = p.runJoSqlQuery(SQL,  bindVariables, null);
+    
+    Assert.assertEquals(result.size(), 4);
+    partitions.clear();
+    partitions = new HashSet<String>();
+    for(Object o : result)
+    {
+      String val = (String) ((List<Object>)o).get(0);
+      partitions.add(val);
+    }
+    Assert.assertTrue(partitions.contains("TestDB_6"));
+    Assert.assertTrue(partitions.contains("TestDB_7"));
+    Assert.assertTrue(partitions.contains("TestDB_9"));
+    Assert.assertTrue(partitions.contains("TestDB_14"));
+    
+    // get node name that hosts a certain partition with certain state
+    
+    SQL = "SELECT id  " + 
+        "FROM LIVEINSTANCES " + 
+        "WHERE getMapFieldValue( getZNRecordFromMap(:CURRENTSTATES, id, 'TestDB'), :partitionName, :mapFieldKey)=:partitionState";
+    
+    p = new ClusterJosqlQueryProcessor(manager);
+    bindVariables.clear();
+    bindVariables.put("partitionName", "TestDB_8");
+    bindVariables.put("mapFieldKey", "CURRENT_STATE");
+    bindVariables.put("partitionState", "SLAVE");
+    
+    result = p.runJoSqlQuery(SQL,  bindVariables, null);
+    
+    Assert.assertEquals(result.size(), 2);
+    partitions.clear();
+    partitions = new HashSet<String>();
+    for(Object o : result)
+    {
+      String val = (String) ((List<Object>)o).get(0);
+      partitions.add(val);
+    }
+    Assert.assertTrue(partitions.contains("localhost_12918"));
+    Assert.assertTrue(partitions.contains("localhost_12922"));
+  }
+  
+  @Test (groups = {"unitTest"})
+  public void parseFromTarget() 
+  {
+    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(null);
+    String sql = "SELECT id  " + 
+        "FROM LIVEINSTANCES ";
+    String from = p.parseFromTarget(sql);
+    Assert.assertTrue(from.equals("LIVEINSTANCES"));
+    
+    sql = "SELECT id      " + 
+        "FROM    LIVEINSTANCES  WHERE 1=2";
+    
+    from = p.parseFromTarget(sql);
+    Assert.assertTrue(from.equals("LIVEINSTANCES"));
+    
+    sql = "SELECT id      " + 
+        "FROM LIVEINSTANCES";
+    
+    from = p.parseFromTarget(sql);
+    Assert.assertTrue(from.equals("LIVEINSTANCES"));
+    
+    sql = "SELECT id      " + 
+        " LIVEINSTANCES where tt=00";
+    boolean exceptionThrown = false;
+    try
+    {
+      from = p.parseFromTarget(sql);
+    }
+    catch(HelixException e)
+    {
+      exceptionThrown = true;
+    }
+    Assert.assertTrue(exceptionThrown);
+  }
+  
+  @Test (groups=("unitTest"))
+  public void testOrderby() throws Exception
+  {
+    HelixManager manager = ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+    
+    Map<String, ZNRecord> scnMap = new HashMap<String, ZNRecord>();
+    for(int i = 0;i < NODE_NR; i++)
+    {
+      String instance = "localhost_"+(12918+i);
+      ZNRecord metaData = new ZNRecord(instance);
+      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
+          UUID.randomUUID().toString());
+      metaData.setMapField("SCN", new HashMap<String, String>());
+      for(int j = 0;j < _PARTITIONS; j++)
+      {
+        metaData.getMapField("SCN").put(TEST_DB+"_"+j, ""+i);
+      }
+      scnMap.put(instance, metaData);
+    }
+    Map<String, Object> bindVariables = new HashMap<String, Object>();
+    bindVariables.put("scnMap", scnMap);
+    String SQL = 
+        " SELECT DISTINCT mapSubKey AS 'subkey', mapValue AS 'mapValue' , getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey) AS 'SCN'" +
+        " FROM EXTERNALVIEW.Table " + 
+        " WHERE mapKey LIKE 'TestDB_1' " +
+          " AND mapSubKey LIKE '%' " +
+          " AND mapValue LIKE 'SLAVE' " +
+          " AND mapSubKey IN ((SELECT [*]id FROM :LIVEINSTANCES)) " +
+          " ORDER BY parseInt(getMapFieldValue(getZNRecordFromMap(:scnMap, mapSubKey), 'SCN', mapKey))";
+
+    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
+    List<Object> result = p.runJoSqlQuery(SQL, bindVariables, null);
+    int prevSCN = -1;
+    for(Object row : result)
+    {
+      List<String> stringRow = (List<String>)row;
+      Assert.assertTrue(stringRow.get(1).equals("SLAVE"));
+      int scn = Integer.parseInt(stringRow.get(2));
+      Assert.assertTrue(scn > prevSCN);
+      prevSCN = scn;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/MockListener.java b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
new file mode 100644
index 0000000..3e68507
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.manager;
+
+import java.util.List;
+
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+
+
+public class MockListener implements IdealStateChangeListener, LiveInstanceChangeListener,
+    ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+    ControllerChangeListener, MessageListener
+
+{
+  public boolean isIdealStateChangeListenerInvoked = false;
+  public boolean isLiveInstanceChangeListenerInvoked = false;
+  public boolean isCurrentStateChangeListenerInvoked = false;
+  public boolean isMessageListenerInvoked = false;
+  public boolean isConfigChangeListenerInvoked = false;
+  public boolean isExternalViewChangeListenerInvoked = false;
+  public boolean isControllerChangeListenerInvoked = false;
+
+  public void reset()
+  {
+    isIdealStateChangeListenerInvoked = false;
+    isLiveInstanceChangeListenerInvoked = false;
+    isCurrentStateChangeListenerInvoked = false;
+    isMessageListenerInvoked = false;
+    isConfigChangeListenerInvoked = false;
+    isExternalViewChangeListenerInvoked = false;
+    isControllerChangeListenerInvoked = false;
+  }
+
+  @Override
+  public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext)
+  {
+    isIdealStateChangeListenerInvoked = true;
+  }
+
+  @Override
+  public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext)
+  {
+    isLiveInstanceChangeListenerInvoked = true;
+  }
+
+  @Override
+  public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
+  {
+    isConfigChangeListenerInvoked = true;
+  }
+
+  @Override
+  public void onStateChange(String instanceName,
+                            List<CurrentState> statesInfo,
+                            NotificationContext changeContext)
+  {
+    isCurrentStateChangeListenerInvoked = true;
+  }
+
+  @Override
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+                                   NotificationContext changeContext)
+  {
+    isExternalViewChangeListenerInvoked = true;
+  }
+
+  @Override
+  public void onControllerChange(NotificationContext changeContext)
+  {
+    isControllerChangeListenerInvoked = true;
+  }
+
+  @Override
+  public void onMessage(String instanceName,
+                        List<Message> messages,
+                        NotificationContext changeContext)
+  {
+    isMessageListenerInvoked = true;
+  }
+}