You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC

[4/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java
new file mode 100644
index 0000000..20605a0
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java
@@ -0,0 +1,649 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.webapp;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.tools.AdminTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.restlet.Client;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Protocol;
+import org.restlet.data.Reference;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.resource.Representation;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestClusterManagementWebapp extends AdminTestBase
+{
+  @Test
+  public void testInvocation() throws Exception
+  {
+    verifyAddCluster();
+    verifyAddStateModel();
+    verifyAddHostedEntity();
+    verifyAddInstance();
+    verifyRebalance();
+    verifyEnableInstance();
+    verifyAlterIdealState();
+    verifyConfigAccessor();
+
+    verifyEnableCluster();
+
+    System.out.println("Test passed!!");
+  }
+
+  /*
+   * Test case as steps
+   */
+  String clusterName       = "cluster-12345";
+  String resourceGroupName = "new-entity-12345";
+  String instance1         = "test-1";
+  String statemodel        = "state_model";
+  int    instancePort      = 9999;
+  int    partitions        = 10;
+  int    replicas          = 3;
+
+  void verifyAddStateModel() throws JsonGenerationException,
+      JsonMappingException,
+      IOException
+  {
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+            + "/StateModelDefs/MasterSlave";
+    Reference resourceRef = new Reference(httpUrlBase);
+    Request request = new Request(Method.GET, resourceRef);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+
+    Map<String, String> paraMap = new HashMap<String, String>();
+
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND,
+                ClusterSetup.addStateModelDef);
+
+    ZNRecord r = new ZNRecord("Test");
+    r.merge(zn);
+    StateModelDefinition newStateModel = new StateModelDefinition(r);
+
+    httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/StateModelDefs";
+    resourceRef = new Reference(httpUrlBase);
+    request = new Request(Method.POST, resourceRef);
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+                          + ClusterRepresentationUtil.ObjectToJson(paraMap) + "&"
+                          + JsonParameters.NEW_STATE_MODEL_DEF + "="
+                          + ClusterRepresentationUtil.ZNRecordToJson(r),
+                      MediaType.APPLICATION_ALL);
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    AssertJUnit.assertTrue(sw.toString().contains("Test"));
+  }
+
+  void verifyAddCluster() throws IOException,
+      InterruptedException
+  {
+    String httpUrlBase = "http://localhost:" + ADMIN_PORT + "/clusters";
+    Map<String, String> paraMap = new HashMap<String, String>();
+
+    paraMap.put(JsonParameters.CLUSTER_NAME, clusterName);
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
+
+    Reference resourceRef = new Reference(httpUrlBase);
+
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    AssertJUnit.assertTrue(zn.getListField("clusters").contains(clusterName));
+
+  }
+
+  void verifyAddHostedEntity() throws JsonGenerationException,
+      JsonMappingException,
+      IOException
+  {
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+    Map<String, String> paraMap = new HashMap<String, String>();
+
+    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, resourceGroupName);
+    paraMap.put(JsonParameters.PARTITIONS, "" + partitions);
+    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
+
+    Reference resourceRef = new Reference(httpUrlBase);
+
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    AssertJUnit.assertTrue(zn.getListField("ResourceGroups").contains(resourceGroupName));
+
+    httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+            + resourceGroupName;
+    resourceRef = new Reference(httpUrlBase);
+
+    request = new Request(Method.GET, resourceRef);
+
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+  }
+
+  void verifyAddInstance() throws JsonGenerationException,
+      JsonMappingException,
+      IOException
+  {
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/instances";
+    Map<String, String> paraMap = new HashMap<String, String>();
+    // Add 1 instance
+    paraMap.put(JsonParameters.INSTANCE_NAME, instance1 + ":" + instancePort);
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+
+    Reference resourceRef = new Reference(httpUrlBase);
+
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    ObjectMapper mapper = new ObjectMapper();
+
+    TypeReference<ArrayList<ZNRecord>> typeRef = new TypeReference<ArrayList<ZNRecord>>()
+    {
+    };
+    List<ZNRecord> znList = mapper.readValue(new StringReader(sw.toString()), typeRef);
+    AssertJUnit.assertTrue(znList.get(0).getId().equals(instance1 + "_" + instancePort));
+
+    // the case to add more than 1 instances
+    paraMap.clear();
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+
+    String[] instances = { "test2", "test3", "test4", "test5" };
+
+    String instanceNames = "";
+    boolean first = true;
+    for (String instance : instances)
+    {
+      if (first == true)
+      {
+        first = false;
+      }
+      else
+      {
+        instanceNames += ";";
+      }
+      instanceNames += (instance + ":" + instancePort);
+    }
+    paraMap.put(JsonParameters.INSTANCE_NAMES, instanceNames);
+
+    request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    mapper = new ObjectMapper();
+
+    znList = mapper.readValue(new StringReader(sw.toString()), typeRef);
+
+    for (String instance : instances)
+    {
+      boolean found = false;
+      for (ZNRecord r : znList)
+      {
+        String instanceId = instance + "_" + instancePort;
+        if (r.getId().equals(instanceId))
+        {
+          found = true;
+          break;
+        }
+      }
+      AssertJUnit.assertTrue(found);
+    }
+  }
+
+  void verifyRebalance() throws JsonGenerationException,
+      JsonMappingException,
+      IOException
+  {
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+            + resourceGroupName + "/idealState";
+    Map<String, String> paraMap = new HashMap<String, String>();
+    // Add 1 instance
+    paraMap.put(JsonParameters.REPLICAS, "" + replicas);
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+
+    Reference resourceRef = new Reference(httpUrlBase);
+
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+
+    for (int i = 0; i < partitions; i++)
+    {
+      String partitionName = resourceGroupName + "_" + i;
+      assert (r.getMapField(partitionName).size() == replicas);
+    }
+
+    httpUrlBase = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName;
+    resourceRef = new Reference(httpUrlBase);
+    request = new Request(Method.GET, resourceRef);
+
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+  }
+
+  void verifyEnableInstance() throws JsonGenerationException,
+      JsonMappingException,
+      IOException
+  {
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/instances/"
+            + instance1 + "_" + instancePort;
+    Map<String, String> paraMap = new HashMap<String, String>();
+    // Add 1 instance
+    paraMap.put(JsonParameters.ENABLED, "" + false);
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
+
+    Reference resourceRef = new Reference(httpUrlBase);
+
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    AssertJUnit.assertTrue(r.getSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString())
+                            .equals("" + false));
+
+    // Then enable it
+    paraMap.put(JsonParameters.ENABLED, "" + true);
+    request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    mapper = new ObjectMapper();
+    r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    AssertJUnit.assertTrue(r.getSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString())
+                            .equals("" + true));
+  }
+
+  void verifyAlterIdealState() throws IOException
+  {
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+            + resourceGroupName + "/idealState";
+
+    Reference resourceRef = new Reference(httpUrlBase);
+    Request request = new Request(Method.GET, resourceRef);
+
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    ObjectMapper mapper = new ObjectMapper();
+    ZNRecord r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    String partitionName = "new-entity-12345_3";
+    r.getMapFields().remove(partitionName);
+
+    Map<String, String> paraMap = new HashMap<String, String>();
+    // Add 1 instance
+    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
+
+    resourceRef = new Reference(httpUrlBase);
+
+    request = new Request(Method.POST, resourceRef);
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+                          + ClusterRepresentationUtil.ObjectToJson(paraMap) + "&"
+                          + JsonParameters.NEW_IDEAL_STATE + "="
+                          + ClusterRepresentationUtil.ZNRecordToJson(r),
+                      MediaType.APPLICATION_ALL);
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    mapper = new ObjectMapper();
+    ZNRecord r2 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    AssertJUnit.assertTrue(!r2.getMapFields().containsKey(partitionName));
+
+    for (String key : r2.getMapFields().keySet())
+    {
+      AssertJUnit.assertTrue(r.getMapFields().containsKey(key));
+    }
+  }
+
+  // verify get/post configs in different scopes
+  void verifyConfigAccessor() throws Exception
+  {
+    ObjectMapper mapper = new ObjectMapper();
+    Client client = new Client(Protocol.HTTP);
+
+    // set/get cluster scope configs
+    String url =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/cluster/"
+            + clusterName;
+
+    postConfig(client, url, mapper, ClusterSetup.setConfig, "key1=value1,key2=value2");
+
+    ZNRecord record = get(client, url, mapper);
+    Assert.assertEquals(record.getSimpleFields().size(), 2);
+    Assert.assertEquals(record.getSimpleField("key1"), "value1");
+    Assert.assertEquals(record.getSimpleField("key2"), "value2");
+
+    // set/get participant scope configs
+    url =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+            + "/configs/participant/localhost_12918";
+
+    postConfig(client, url, mapper, ClusterSetup.setConfig, "key3=value3,key4=value4");
+
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getSimpleFields().size(), 2);
+    Assert.assertEquals(record.getSimpleField("key3"), "value3");
+    Assert.assertEquals(record.getSimpleField("key4"), "value4");
+
+    // set/get resource scope configs
+    url =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+            + "/configs/resource/testResource";
+
+    postConfig(client, url, mapper, ClusterSetup.setConfig, "key5=value5,key6=value6");
+
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getSimpleFields().size(), 2);
+    Assert.assertEquals(record.getSimpleField("key5"), "value5");
+    Assert.assertEquals(record.getSimpleField("key6"), "value6");
+
+    // set/get partition scope configs
+    url =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+            + "/configs/partition/testResource/testPartition";
+
+    postConfig(client, url, mapper, ClusterSetup.setConfig, "key7=value7,key8=value8");
+
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getSimpleFields().size(), 2);
+    Assert.assertEquals(record.getSimpleField("key7"), "value7");
+    Assert.assertEquals(record.getSimpleField("key8"), "value8");
+
+    // list keys
+    url = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs";
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getListFields().size(), 1);
+    Assert.assertTrue(record.getListFields().containsKey("scopes"));
+    Assert.assertTrue(contains(record.getListField("scopes"),
+                               "CLUSTER",
+                               "PARTICIPANT",
+                               "RESOURCE",
+                               "PARTITION"));
+
+    url = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/cluster";
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getListFields().size(), 1);
+    Assert.assertTrue(record.getListFields().containsKey("CLUSTER"));
+    Assert.assertTrue(contains(record.getListField("CLUSTER"), clusterName));
+
+    url =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/participant";
+    record = get(client, url, mapper);
+    Assert.assertTrue(record.getListFields().containsKey("PARTICIPANT"));
+    Assert.assertTrue(contains(record.getListField("PARTICIPANT"), "localhost_12918"));
+
+    url = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/resource";
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getListFields().size(), 1);
+    Assert.assertTrue(record.getListFields().containsKey("RESOURCE"));
+    Assert.assertTrue(contains(record.getListField("RESOURCE"), "testResource"));
+
+    url =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+            + "/configs/partition/testResource";
+    record = get(client, url, mapper);
+    Assert.assertEquals(record.getListFields().size(), 1);
+    Assert.assertTrue(record.getListFields().containsKey("PARTITION"));
+    Assert.assertTrue(contains(record.getListField("PARTITION"), "testPartition"));
+
+  }
+
+  private ZNRecord get(Client client, String url, ObjectMapper mapper) throws Exception
+  {
+    Request request = new Request(Method.GET, new Reference(url));
+    Response response = client.handle(request);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+    String responseStr = sw.toString();
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+
+    ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+    return record;
+  }
+
+  private void postConfig(Client client,
+                          String url,
+                          ObjectMapper mapper,
+                          String command,
+                          String configs) throws Exception
+  {
+    Map<String, String> params = new HashMap<String, String>();
+
+    params.put(JsonParameters.MANAGEMENT_COMMAND, command);
+    params.put(JsonParameters.CONFIGS, configs);
+
+    Request request = new Request(Method.POST, new Reference(url));
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(params), MediaType.APPLICATION_ALL);
+
+    Response response = client.handle(request);
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+    String responseStr = sw.toString();
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+    Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+  }
+
+  void verifyEnableCluster() throws Exception
+  {
+    System.out.println("START: verifyEnableCluster()");
+    String httpUrlBase =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/Controller";
+    Map<String, String> paramMap = new HashMap<String, String>();
+
+    paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableCluster);
+    paramMap.put(JsonParameters.ENABLED, "" + false);
+
+    Reference resourceRef = new Reference(httpUrlBase);
+
+    Request request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paramMap), MediaType.APPLICATION_ALL);
+    Client client = new Client(Protocol.HTTP);
+    Response response = client.handle(request);
+
+    Representation result = response.getEntity();
+    StringWriter sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    // verify pause znode exists
+    String pausePath = PropertyPathConfig.getPath(PropertyType.PAUSE, clusterName);
+    System.out.println("pausePath: " + pausePath);
+    boolean exists = _gZkClient.exists(pausePath);
+    Assert.assertTrue(exists, pausePath + " should exist");
+
+    // Then enable it
+    paramMap.put(JsonParameters.ENABLED, "" + true);
+    request = new Request(Method.POST, resourceRef);
+
+    request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+        + ClusterRepresentationUtil.ObjectToJson(paramMap), MediaType.APPLICATION_ALL);
+    client = new Client(Protocol.HTTP);
+    response = client.handle(request);
+
+    result = response.getEntity();
+    sw = new StringWriter();
+    result.write(sw);
+
+    System.out.println(sw.toString());
+
+    // verify pause znode doesn't exist
+    exists = _gZkClient.exists(pausePath);
+    Assert.assertFalse(exists, pausePath + " should be removed");
+
+    System.out.println("END: verifyEnableCluster()");
+  }
+
+  private boolean contains(List<String> list, String... items)
+  {
+    for (String item : items)
+    {
+      if (!list.contains(item))
+      {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/AccessOption.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/AccessOption.java b/helix-core/src/main/java/org/apache/helix/AccessOption.java
new file mode 100644
index 0000000..6cfc163
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/AccessOption.java
@@ -0,0 +1,52 @@
+package org.apache.helix;
+
+import org.apache.zookeeper.CreateMode;
+
+public class AccessOption
+{
+  public static int PERSISTENT = 0x1;
+  public static int EPHEMERAL = 0x2;
+  public static int PERSISTENT_SEQUENTIAL = 0x4;
+  public static int EPHEMERAL_SEQUENTIAL = 0x8;
+  public static int THROW_EXCEPTION_IFNOTEXIST = 0x10;
+
+  /**
+   * Helper method to get zookeeper create mode from options
+   * 
+   * @param options
+   * @return zookeeper create mode
+   */
+  public static CreateMode getMode(int options)
+  {
+    if ((options & PERSISTENT) > 0)
+    {
+      return CreateMode.PERSISTENT;
+    }
+    else if ((options & EPHEMERAL) > 0)
+    {
+      return CreateMode.EPHEMERAL;
+    }
+    else if ((options & PERSISTENT_SEQUENTIAL) > 0)
+    {
+      return CreateMode.PERSISTENT_SEQUENTIAL;
+    }
+    else if ((options & EPHEMERAL_SEQUENTIAL) > 0)
+    {
+      return CreateMode.EPHEMERAL_SEQUENTIAL;
+    }
+
+    return null;
+  }
+
+  /**
+   * Helper method to get is-throw-exception-on-node-not-exist from options
+   * 
+   * @param options
+   * @return
+   */
+  public static boolean isThrowExceptionIfNotExist(int options)
+  {
+    return (options & THROW_EXCEPTION_IFNOTEXIST) > 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
new file mode 100644
index 0000000..af46a52
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -0,0 +1,192 @@
+package org.apache.helix;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+public interface BaseDataAccessor<T>
+{
+  /**
+   * This will always attempt to create the znode, if it exists it will return false. Will
+   * create parents if they do not exist. For performance reasons, it may try to create
+   * child first and only if it fails it will try to create parent
+   * 
+   * @param path
+   * @param record
+   * @return
+   */
+  boolean create(String path, T record, int options);
+
+  /**
+   * This will always attempt to set the data on existing node. If the znode does not
+   * exist it will create it.
+   * 
+   * @param path
+   * @param record
+   * @return
+   */
+  boolean set(String path, T record, int options);
+
+  /**
+   * This will attempt to merge with existing data by calling znrecord.merge and if it
+   * does not exist it will create it znode
+   * 
+   * @param path
+   * @param record
+   * @return
+   */
+  boolean update(String path, DataUpdater<T> updater, int options);
+
+  /**
+   * This will remove znode and all it's child nodes if any
+   * 
+   * @param path
+   * @return
+   */
+  boolean remove(String path, int options);
+
+  /**
+   * Use it when creating children under a parent node. This will use async api for better
+   * performance. If the child already exists it will return false.
+   * 
+   * @param parentPath
+   * @param record
+   * @return
+   */
+  boolean[] createChildren(List<String> paths, List<T> records, int options);
+
+  /**
+   * can set multiple children under a parent node. This will use async api for better
+   * performance. If this child does not exist it will create it.
+   * 
+   * @param parentPath
+   * @param record
+   */
+  boolean[] setChildren(List<String> paths, List<T> records, int options);
+
+  /**
+   * Can update multiple nodes using async api for better performance. If a child does not
+   * exist it will create it.
+   * 
+   * @param parentPath
+   * @param record
+   * @return
+   */
+  boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options);
+
+  /**
+   * remove multiple paths using async api. will remove any child nodes if any
+   * 
+   * @param paths
+   * @return
+   */
+  boolean[] remove(List<String> paths, int options);
+
+  /**
+   * Get the {@link T} corresponding to the path
+   * 
+   * @param path
+   * @return
+   */
+  T get(String path, Stat stat, int options);
+
+  /**
+   * Get List of {@link T} corresponding to the paths using async api
+   * 
+   * @param paths
+   * @return
+   */
+  List<T> get(List<String> paths, List<Stat> stats, int options);
+
+  /**
+   * Get the children under a parent path using async api
+   * 
+   * @param path
+   * @return
+   */
+  List<T> getChildren(String parentPath, List<Stat> stats, int options);
+
+  /**
+   * Returns the child names given a parent path
+   * 
+   * @param type
+   * @param keys
+   * @return
+   */
+  List<String> getChildNames(String parentPath, int options);
+
+  /**
+   * checks if the path exists in zk
+   * 
+   * @param path
+   * @return
+   */
+  boolean exists(String path, int options);
+
+  /**
+   * checks if the all the paths exists
+   * 
+   * @param paths
+   * @return
+   */
+  boolean[] exists(List<String> paths, int options);
+
+  /**
+   * Get the stats of all the paths
+   * 
+   * @param paths
+   * @return
+   */
+  Stat[] getStats(List<String> paths, int options);
+
+  /**
+   * Get the stats of all the paths
+   * 
+   * @param paths
+   * @return
+   */
+  Stat getStat(String path, int options);
+
+  /**
+   * Subscribe data listener to path
+   * 
+   * @param path
+   * @param listener
+   * @return
+   */
+  void subscribeDataChanges(String path, IZkDataListener listener);
+
+  /**
+   * Unsubscribe data listener to path
+   * 
+   * @param path
+   * @param listener
+   */
+  void unsubscribeDataChanges(String path, IZkDataListener listener);
+
+  /**
+   * Subscribe child listener to path
+   * 
+   * @param path
+   * @param listener
+   * @return
+   */
+  List<String> subscribeChildChanges(String path, IZkChildListener listener);
+
+  /**
+   * Unsubscribe child listener to path
+   * 
+   * @param path
+   * @param listener
+   */
+  void unsubscribeChildChanges(String path, IZkChildListener listener);
+
+  /**
+   * reset the cache if any, when session expiry happens
+   */
+  void reset();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
new file mode 100644
index 0000000..316ff54
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+
+/**
+ * Provides the ability to <br>
+ * <li>Send message to a specific component in the cluster[ participant,
+ * controller, Router(probably not needed) ]</li> <li>Broadcast message to all
+ * nodes</li> <li>Send message to instances that hold a specific resource</li>
+ * <li>Asynchronous request response api. Send message with a co-relation id and
+ * invoke a method when there is a response. Can support timeout.</li>
+ * 
+ * @author kgopalak
+ * 
+ */
+public interface ClusterMessagingService
+{
+  /**
+   * Send message matching the specifications mentioned in recipientCriteria.
+   * 
+   * @param receipientCriteria
+   * @See Criteria
+   * @param message
+   *          message to be sent. Some attributes of this message will be
+   *          changed as required
+   * @return returns how many messages were successfully sent.
+   */
+  int send(Criteria recipientCriteria, Message message);
+
+  /**
+   * This will send the message to all instances matching the criteria<br>
+   * When there is a reply to the message sent AsynCallback.onReply will be
+   * invoked. Application can specify a timeout on AsyncCallback. After every
+   * reply is processed AsyncCallback.isDone will be invoked.<br>
+   * This method will return after sending the messages. <br>
+   * This is useful when message need to be sent and current thread need not
+   * wait for response since processing will be done in another thread.
+   * 
+   * @param receipientCriteria
+   * @param message
+   * @param callbackOnReply
+   * @param timeOut
+   * @param retryCount
+   * @return
+   */
+  int send(Criteria receipientCriteria, Message message,
+      AsyncCallback callbackOnReply, int timeOut);
+
+  int send(Criteria receipientCriteria, Message message,
+      AsyncCallback callbackOnReply, int timeOut, int retryCount);
+
+  /**
+   * This will send the message to all instances matching the criteria<br>
+   * When there is a reply to the message sent AsynCallback.onReply will be
+   * invoked. Application can specify a timeout on AsyncCallback. After every
+   * reply is processed AsyncCallback.isDone will be invoked.<br>
+   * This method will return only after the AsyncCallback.isDone() returns true <br>
+   * This is useful when message need to be sent and current thread has to wait
+   * for response. <br>
+   * The current thread can use callbackOnReply instance to store application
+   * specific data.
+   * 
+   * @param receipientCriteria
+   * @param message
+   * @param callbackOnReply
+   * @param timeOut
+   * @param retryCount
+   * @return
+   */
+  int sendAndWait(Criteria receipientCriteria, Message message,
+      AsyncCallback callbackOnReply, int timeOut);
+
+  int sendAndWait(Criteria receipientCriteria, Message message,
+      AsyncCallback callbackOnReply, int timeOut, int retryCount);
+
+  /**
+   * This will register a message handler factory to create handlers for
+   * message. In case client code defines its own message type, it can define a
+   * message handler factory to create handlers to process those messages.
+   * Messages are processed in a threadpool which is hosted by cluster manager,
+   * and cluster manager will call the factory to create handler, and the
+   * handler is called in the threadpool.
+   * 
+   * Note that only one message handler factory can be registered with one
+   * message type.
+   * 
+   * @param type
+   *          The message type that the factory will create handler for
+   * @param factory
+   *          The per-type message factory
+   * @param threadpoolSize
+   *        size of the execution threadpool that handles the message 
+   * @return
+   */
+  public void registerMessageHandlerFactory(String type,
+      MessageHandlerFactory factory);
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ClusterView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterView.java b/helix-core/src/main/java/org/apache/helix/ClusterView.java
new file mode 100644
index 0000000..12c790c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ClusterView.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+@Deprecated
+public class ClusterView
+{
+  private Map<PropertyType, List<ZNRecord>> clusterPropertyLists;
+
+  // getter/setter's are needed for private fields for
+  // serialization/de-serialization
+  // ref:
+  // http://jackson.codehaus.org/DataBindingDeepDive
+  // @JsonProperty
+  public void setClusterPropertyLists(Map<PropertyType, List<ZNRecord>> clusterPropertyLists)
+  {
+    this.clusterPropertyLists = clusterPropertyLists;
+  }
+
+  // @JsonProperty
+  public Map<PropertyType, List<ZNRecord>> getPropertyLists()
+  {
+    return clusterPropertyLists;
+  }
+
+  public void setClusterPropertyList(PropertyType type, List<ZNRecord> propertyList)
+  {
+    clusterPropertyLists.put(type, propertyList);
+  }
+
+  public List<ZNRecord> getPropertyList(PropertyType type)
+  {
+    return clusterPropertyLists.get(type);
+  }
+
+  public void setMemberInstanceMap(Map<String, MemberInstance> memberInstanceMap)
+  {
+    this._memberInstanceMap = memberInstanceMap;
+  }
+
+  @JsonProperty
+  public Map<String, MemberInstance> getMemberInstanceMap()
+  {
+    return _memberInstanceMap;
+  }
+
+  public void set_memberInstanceMap(Map<String, MemberInstance> _memberInstanceMap)
+  {
+    this._memberInstanceMap = _memberInstanceMap;
+  }
+
+  private Map<String, MemberInstance> _memberInstanceMap;
+  private List<MemberInstance> _instances;
+
+  public void setInstances(List<MemberInstance> instances)
+  {
+    this._instances = instances;
+  }
+
+  public List<MemberInstance> getInstances()
+  {
+    return _instances;
+  }
+
+  public static class MemberInstance
+  {
+    private Map<PropertyType, List<ZNRecord>> _instanceProperties = new TreeMap<PropertyType, List<ZNRecord>>();
+
+    public void setClusterProperties(Map<PropertyType, List<ZNRecord>> instanceProperties)
+    {
+      this._instanceProperties = instanceProperties;
+    }
+
+    // @JsonProperty
+    public Map<PropertyType, List<ZNRecord>> getInstanceProperties()
+    {
+      return _instanceProperties;
+    }
+
+    public void setInstanceProperty(PropertyType type, List<ZNRecord> values)
+    {
+      _instanceProperties.put(type, values);
+    }
+
+    public List<ZNRecord> getInstanceProperty(PropertyType type)
+    {
+      return _instanceProperties.get(type);
+    }
+
+    private String _instanceName;
+
+    // for JSON de-serialization
+    public MemberInstance()
+    {
+
+    }
+
+    public MemberInstance(String instanceName)
+    {
+      this._instanceName = instanceName;
+    }
+
+    public String getInstanceName()
+    {
+      return _instanceName;
+    }
+
+    public void setInstanceName(String instanceName)
+    {
+      this._instanceName = instanceName;
+    }
+
+  }
+
+  public MemberInstance getMemberInstance(String instanceName, boolean createNewIfAbsent)
+  {
+    if (!_memberInstanceMap.containsKey(instanceName))
+    {
+      _memberInstanceMap.put(instanceName, new MemberInstance(instanceName));
+    }
+    return _memberInstanceMap.get(instanceName);
+  }
+
+  private List<ZNRecord> _externalView;
+
+  public ClusterView()
+  {
+    clusterPropertyLists = new TreeMap<PropertyType, List<ZNRecord>>();
+    setClusterPropertyList(PropertyType.IDEALSTATES, new ArrayList<ZNRecord>());
+    setClusterPropertyList(PropertyType.CONFIGS, new ArrayList<ZNRecord>());
+    setClusterPropertyList(PropertyType.LIVEINSTANCES, new ArrayList<ZNRecord>());
+    setClusterPropertyList(PropertyType.INSTANCES, new ArrayList<ZNRecord>());
+
+    _memberInstanceMap = new HashMap<String, ClusterView.MemberInstance>();
+  }
+
+  public List<ZNRecord> getExternalView()
+  {
+    return _externalView;
+  }
+
+  public void setExternalView(List<ZNRecord> externalView)
+  {
+    this._externalView = externalView;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
new file mode 100644
index 0000000..ec53000
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.StringTemplate;
+import org.apache.log4j.Logger;
+
+
+public class ConfigAccessor
+{
+  private static Logger               LOG      = Logger.getLogger(ConfigAccessor.class);
+
+  private static final StringTemplate template = new StringTemplate();
+  static
+  {
+    // @formatter:off
+    template.addEntry(ConfigScopeProperty.CLUSTER, 1, "/{clusterName}/CONFIGS/CLUSTER");
+    template.addEntry(ConfigScopeProperty.CLUSTER,
+                      2,
+                      "/{clusterName}/CONFIGS/CLUSTER/{clusterName}|SIMPLEKEYS");
+    template.addEntry(ConfigScopeProperty.PARTICIPANT,
+                      1,
+                      "/{clusterName}/CONFIGS/PARTICIPANT");
+    template.addEntry(ConfigScopeProperty.PARTICIPANT,
+                      2,
+                      "/{clusterName}/CONFIGS/PARTICIPANT/{participantName}|SIMPLEKEYS");
+    template.addEntry(ConfigScopeProperty.RESOURCE, 1, "/{clusterName}/CONFIGS/RESOURCE");
+    template.addEntry(ConfigScopeProperty.RESOURCE,
+                      2,
+                      "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|SIMPLEKEYS");
+    template.addEntry(ConfigScopeProperty.PARTITION,
+                      2,
+                      "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|MAPKEYS");
+    template.addEntry(ConfigScopeProperty.PARTITION,
+                      3,
+                      "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|MAPMAPKEYS|{partitionName}");
+    // @formatter:on
+  }
+
+  private final ZkClient              zkClient;
+
+  public ConfigAccessor(ZkClient zkClient)
+  {
+    this.zkClient = zkClient;
+  }
+
+  /**
+   * Get config value
+   * 
+   * @param scope
+   * @param key
+   * @return value or null if doesn't exist
+   */
+  public String get(ConfigScope scope, String key)
+  {
+    if (scope == null || scope.getScope() == null)
+    {
+      LOG.error("Scope can't be null");
+      return null;
+    }
+
+    String value = null;
+    String clusterName = scope.getClusterName();
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    String scopeStr = scope.getScopeStr();
+    String[] splits = scopeStr.split("\\|");
+
+    ZNRecord record = zkClient.readData(splits[0], true);
+
+    if (record != null)
+    {
+      if (splits.length == 1)
+      {
+        value = record.getSimpleField(key);
+      }
+      else if (splits.length == 2)
+      {
+        if (record.getMapField(splits[1]) != null)
+        {
+          value = record.getMapField(splits[1]).get(key);
+        }
+      }
+    }
+    return value;
+
+  }
+
+  /**
+   * Set a config value
+   * 
+   * @param scope
+   * @param key
+   * @param value
+   */
+  public void set(ConfigScope scope, String key, String value)
+  {
+    if (scope == null || scope.getScope() == null)
+    {
+      LOG.error("Scope can't be null");
+      return;
+    }
+
+    String clusterName = scope.getClusterName();
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    String scopeStr = scope.getScopeStr();
+    String[] splits = scopeStr.split("\\|");
+
+    String id = splits[0].substring(splits[0].lastIndexOf('/') + 1);
+    ZNRecord update = new ZNRecord(id);
+    if (splits.length == 1)
+    {
+      update.setSimpleField(key, value);
+    }
+    else if (splits.length == 2)
+    {
+      if (update.getMapField(splits[1]) == null)
+      {
+        update.setMapField(splits[1], new TreeMap<String, String>());
+      }
+      update.getMapField(splits[1]).put(key, value);
+    }
+    ZKUtil.createOrUpdate(zkClient, splits[0], update, true, true);
+    return;
+  }
+
+  /**
+   * Remove config value
+   * 
+   * @param scope
+   * @param key
+   */
+  public void remove(ConfigScope scope, String key)
+  {
+    if (scope == null || scope.getScope() == null)
+    {
+      LOG.error("Scope can't be null");
+      return;
+    }
+
+    String clusterName = scope.getClusterName();
+    if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+    {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    String scopeStr = scope.getScopeStr();
+    String[] splits = scopeStr.split("\\|");
+
+    String id = splits[0].substring(splits[0].lastIndexOf('/') + 1);
+    ZNRecord update = new ZNRecord(id);
+    if (splits.length == 1)
+    {
+      // subtract doesn't care about value, use empty string
+      update.setSimpleField(key, "");
+    }
+    else if (splits.length == 2)
+    {
+      if (update.getMapField(splits[1]) == null)
+      {
+        update.setMapField(splits[1], new TreeMap<String, String>());
+      }
+      // subtract doesn't care about value, use empty string
+      update.getMapField(splits[1]).put(key, "");
+    }
+
+    ZKUtil.subtract(zkClient, splits[0], update);
+    return;
+  }
+
+  /**
+   * Get a list of config keys
+   * 
+   * @param type
+   * @param clusterName
+   * @param keys
+   * @return
+   */
+  public List<String> getKeys(ConfigScopeProperty type,
+                              String clusterName,
+                              String... keys)
+  {
+    if (type == null || clusterName == null)
+    {
+      LOG.error("clusterName|scope can't be null");
+      return Collections.emptyList();
+    }
+
+    try
+    {
+      if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+      {
+        LOG.error("cluster " + clusterName + " is not setup yet");
+        return Collections.emptyList();
+      }
+
+      String[] args = new String[1 + keys.length];
+      args[0] = clusterName;
+      System.arraycopy(keys, 0, args, 1, keys.length);
+      String scopeStr = template.instantiate(type, args);
+      String[] splits = scopeStr.split("\\|");
+      List<String> retKeys = null;
+      if (splits.length == 1)
+      {
+        retKeys = zkClient.getChildren(splits[0]);
+      }
+      else
+      {
+        ZNRecord record = zkClient.readData(splits[0]);
+
+        if (splits[1].startsWith("SIMPLEKEYS"))
+        {
+          retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
+
+        }
+        else if (splits[1].startsWith("MAPKEYS"))
+        {
+          retKeys = new ArrayList<String>(record.getMapFields().keySet());
+        }
+        else if (splits[1].startsWith("MAPMAPKEYS"))
+        {
+          retKeys = new ArrayList<String>(record.getMapField(splits[2]).keySet());
+        }
+      }
+      if (retKeys == null)
+      {
+        LOG.error("Invalid scope: " + type + " or keys: " + Arrays.toString(args));
+        return Collections.emptyList();
+      }
+
+      Collections.sort(retKeys);
+      return retKeys;
+    }
+    catch (Exception e)
+    {
+      return Collections.emptyList();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java b/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java
new file mode 100644
index 0000000..32b8622
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.InstanceConfig;
+
+
+/**
+ * @author kgopalak
+ * 
+ */
+public interface ConfigChangeListener
+{
+
+  /**
+   * Invoked when participant config changes
+   * 
+   * @param configs
+   * @param changeContext
+   */
+  public void onConfigChange(List<InstanceConfig> configs,
+                             NotificationContext changeContext);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigScope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigScope.java b/helix-core/src/main/java/org/apache/helix/ConfigScope.java
new file mode 100644
index 0000000..158e928
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigScope.java
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.util.StringTemplate;
+import org.apache.log4j.Logger;
+
+
+public class ConfigScope
+{
+  public enum ConfigScopeProperty
+  {
+    CLUSTER, PARTICIPANT, RESOURCE, PARTITION, CONSTRAINT;
+  }
+
+  private static Logger LOG = Logger.getLogger(ConfigScope.class);
+
+  private static final List<ConfigScopeProperty> scopePriority =
+      new ArrayList<ConfigScopeProperty>();
+  private static final Map<ConfigScopeProperty, Map<ConfigScopeProperty, ConfigScopeProperty>> scopeTransition =
+      new HashMap<ConfigScopeProperty, Map<ConfigScopeProperty, ConfigScopeProperty>>();
+  private static final StringTemplate template = new StringTemplate();
+  static
+  {
+    // scope priority: CLUSTER > PARTICIPANT > RESOURCE > PARTITION
+    scopePriority.add(ConfigScopeProperty.CLUSTER);
+    scopePriority.add(ConfigScopeProperty.PARTICIPANT);
+    scopePriority.add(ConfigScopeProperty.RESOURCE);
+    scopePriority.add(ConfigScopeProperty.PARTITION);
+
+    // scope transition table to check valid inputs
+    scopeTransition.put(ConfigScopeProperty.CLUSTER,
+                        new HashMap<ConfigScopeProperty, ConfigScopeProperty>());
+    scopeTransition.get(ConfigScopeProperty.CLUSTER).put(ConfigScopeProperty.PARTICIPANT,
+                                                         ConfigScopeProperty.PARTICIPANT);
+    scopeTransition.get(ConfigScopeProperty.CLUSTER).put(ConfigScopeProperty.RESOURCE,
+                                                         ConfigScopeProperty.RESOURCE);
+    scopeTransition.put(ConfigScopeProperty.RESOURCE,
+                        new HashMap<ConfigScopeProperty, ConfigScopeProperty>());
+    scopeTransition.get(ConfigScopeProperty.RESOURCE).put(ConfigScopeProperty.PARTITION,
+                                                          ConfigScopeProperty.PARTITION);
+
+    // string templates to generate znode path/index
+    // @formatter:off
+    template.addEntry(ConfigScopeProperty.CLUSTER,
+                      2,
+                      "/{clusterName}/CONFIGS/CLUSTER/{clusterName}");
+    template.addEntry(ConfigScopeProperty.PARTICIPANT,
+                      2,
+                      "/{clusterName}/CONFIGS/PARTICIPANT/{participantName}");
+    template.addEntry(ConfigScopeProperty.RESOURCE,
+                      2,
+                      "/{clusterName}/CONFIGS/RESOURCE/{resourceName}");
+    template.addEntry(ConfigScopeProperty.PARTITION,
+                      3,
+                      "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|{partitionName}");
+    // @formatter:on
+  }
+
+  private final String _clusterName;
+  private final ConfigScopeProperty _scope;
+  private final String _scopeStr;
+
+  ConfigScope(ConfigScopeBuilder configScopeBuilder)
+  {
+    Map<ConfigScopeProperty, String> scopeMap = configScopeBuilder
+        .getScopeMap();
+    List<String> keys = new ArrayList<String>();
+
+    ConfigScopeProperty curScope = null;
+    for (ConfigScopeProperty scope : scopePriority)
+    {
+      if (scopeMap.containsKey(scope))
+      {
+        if (curScope == null && scope == ConfigScopeProperty.CLUSTER)
+        {
+          keys.add(scopeMap.get(scope));
+          curScope = ConfigScopeProperty.CLUSTER;
+        } else if (curScope == null)
+        {
+          throw new IllegalArgumentException("Missing CLUSTER scope. Can't build scope using " + configScopeBuilder);
+        } else
+        {
+          if (!scopeTransition.containsKey(curScope) || !scopeTransition.get(curScope).containsKey(scope))
+          {
+            throw new IllegalArgumentException("Can't build scope using " + configScopeBuilder);
+          }
+          keys.add(scopeMap.get(scope));
+          curScope = scopeTransition.get(curScope).get(scope);
+        }
+      }
+    }
+
+    if (curScope == ConfigScopeProperty.CLUSTER)
+    {
+      // append one more {clusterName}
+      keys.add(scopeMap.get(ConfigScopeProperty.CLUSTER));
+    }
+
+    String scopeStr = template.instantiate(curScope, keys.toArray(new String[0]));
+
+    _clusterName = keys.get(0);
+    _scopeStr = scopeStr;
+    _scope = curScope;
+  }
+
+  public ConfigScopeProperty getScope()
+  {
+    return _scope;
+  }
+
+  public String getClusterName()
+  {
+    return _clusterName;
+  }
+
+  public String getScopeStr()
+  {
+    return _scopeStr;
+  }
+
+  @Override
+  public String toString()
+  {
+    return super.toString() + ": " + _scopeStr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java b/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java
new file mode 100644
index 0000000..ec42fef
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.util.StringTemplate;
+import org.apache.log4j.Logger;
+
+
+public class ConfigScopeBuilder
+{
+  private static Logger LOG = Logger.getLogger(ConfigScopeBuilder.class);
+
+  private static StringTemplate template = new StringTemplate();
+  static
+  {
+    // @formatter:off
+    template.addEntry(ConfigScopeProperty.CLUSTER, 1, "CLUSTER={clusterName}");
+    template.addEntry(ConfigScopeProperty.RESOURCE, 2, "CLUSTER={clusterName},RESOURCE={resourceName}");
+    template.addEntry(ConfigScopeProperty.PARTITION, 3, "CLUSTER={clusterName},RESOURCE={resourceName},PARTITION={partitionName}");
+    template.addEntry(ConfigScopeProperty.PARTICIPANT, 2, "CLUSTER={clusterName},PARTICIPANT={participantName}");
+    // @formatter:on
+  }
+
+  private final Map<ConfigScopeProperty, String> _scopeMap;
+
+  public Map<ConfigScopeProperty, String> getScopeMap()
+  {
+    return _scopeMap;
+  }
+
+  public ConfigScopeBuilder()
+  {
+    _scopeMap = new HashMap<ConfigScopeProperty, String>();
+  }
+
+  public ConfigScopeBuilder forCluster(String clusterName)
+  {
+    _scopeMap.put(ConfigScopeProperty.CLUSTER, clusterName);
+    return this;
+  }
+
+  public ConfigScopeBuilder forParticipant(String participantName)
+  {
+    _scopeMap.put(ConfigScopeProperty.PARTICIPANT, participantName);
+    return this;
+  }
+
+  public ConfigScopeBuilder forResource(String resourceName)
+  {
+    _scopeMap.put(ConfigScopeProperty.RESOURCE, resourceName);
+    return this;
+
+  }
+
+  public ConfigScopeBuilder forPartition(String partitionName)
+  {
+    _scopeMap.put(ConfigScopeProperty.PARTITION, partitionName);
+    return this;
+
+  }
+
+  public ConfigScope build()
+  {
+    // TODO: validate the scopes map
+    return new ConfigScope(this);
+  }
+
+  public ConfigScope build(ConfigScopeProperty scope, String clusterName, String... scopeKeys)
+  {
+    if (scopeKeys == null)
+    {
+      scopeKeys = new String[]{};
+    }
+
+    String[] args = new String[1 + scopeKeys.length];
+    args[0] = clusterName;
+    System.arraycopy(scopeKeys, 0, args, 1, scopeKeys.length);
+    String scopePairs = template.instantiate(scope, args);
+
+    return build(scopePairs);
+  }
+
+  public ConfigScope build(String scopePairs)
+  {
+    String[] scopes = scopePairs.split("[\\s,]+");
+    for (String scope : scopes)
+    {
+      try
+      {
+        int idx = scope.indexOf('=');
+        if (idx == -1)
+        {
+          LOG.error("Invalid scope string: " + scope);
+          continue;
+        }
+
+        String scopeStr = scope.substring(0, idx);
+        String value = scope.substring(idx + 1);
+        ConfigScopeProperty scopeProperty = ConfigScopeProperty.valueOf(scopeStr);
+        _scopeMap.put(scopeProperty, value);
+      } catch (Exception e)
+      {
+        LOG.error("Invalid scope string: " + scope);
+        continue;
+      }
+    }
+
+    return build();
+  }
+
+  @Override
+  public String toString()
+  {
+    return _scopeMap.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java b/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java
new file mode 100644
index 0000000..6bcafde
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+
+public interface ControllerChangeListener
+{
+  /**
+   * Invoked when controller changes
+   * 
+   * @param changeContext
+   */
+  public void onControllerChange(NotificationContext changeContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/Criteria.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/Criteria.java b/helix-core/src/main/java/org/apache/helix/Criteria.java
new file mode 100644
index 0000000..302b4ee
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/Criteria.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+public class Criteria
+{
+  public enum DataSource
+  {
+    IDEALSTATES, EXTERNALVIEW
+  }
+  /**
+   * This can be CONTROLLER, PARTICIPANT, ROUTER Cannot be null
+   */
+  InstanceType recipientInstanceType;
+  /**
+   * If true this will only be process by the instance that was running when the
+   * message was sent. If the instance process dies and comes back up it will be
+   * ignored.
+   */
+  boolean sessionSpecific;
+  /**
+   * applicable only in case PARTICIPANT use * to broadcast to all instances
+   */
+  String instanceName = "";
+  /**
+   * Name of the resource. Use * to send message to all resources
+   * owned by an instance.
+   */
+  String resourceName = "";
+  /**
+   * Resource partition. Use * to send message to all partitions of a given
+   * resource
+   */
+  String partitionName = "";
+  /**
+   * State of the resource
+   */
+  String partitionState = "";
+  /**
+   * Exclude sending message to your self. True by default
+   */
+  boolean selfExcluded = true;
+  /**
+   * Determine if use external view or ideal state as source of truth
+   */
+  DataSource _dataSource = DataSource.EXTERNALVIEW;
+  
+  public DataSource getDataSource()
+  {
+    return _dataSource;
+  }
+  
+  public void setDataSource(DataSource source)
+  {
+    _dataSource = source;
+  }
+
+  public boolean isSelfExcluded()
+  {
+    return selfExcluded;
+  }
+
+  public void setSelfExcluded(boolean selfExcluded)
+  {
+    this.selfExcluded = selfExcluded;
+  }
+
+  public InstanceType getRecipientInstanceType()
+  {
+    return recipientInstanceType;
+  }
+
+  public void setRecipientInstanceType(InstanceType recipientInstanceType)
+  {
+    this.recipientInstanceType = recipientInstanceType;
+  }
+
+  public boolean isSessionSpecific()
+  {
+    return sessionSpecific;
+  }
+
+  public void setSessionSpecific(boolean sessionSpecific)
+  {
+    this.sessionSpecific = sessionSpecific;
+  }
+
+  public String getInstanceName()
+  {
+    return instanceName;
+  }
+
+  public void setInstanceName(String instanceName)
+  {
+    this.instanceName = instanceName;
+  }
+
+  public String getResource()
+  {
+    return resourceName;
+  }
+
+  public void setResource(String resourceName)
+  {
+    this.resourceName = resourceName;
+  }
+
+  public String getPartition()
+  {
+    return partitionName;
+  }
+
+  public void setPartition(String partitionName)
+  {
+    this.partitionName = partitionName;
+  }
+
+  public String getPartitionState()
+  {
+    return partitionState;
+  }
+
+  public void setPartitionState(String partitionState)
+  {
+    this.partitionState = partitionState;
+  }
+
+  public String toString()
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("instanceName").append("=").append(instanceName);
+    sb.append("resourceName").append("=").append(resourceName);
+    sb.append("partitionName").append("=").append(partitionName);
+    sb.append("partitionState").append("=").append(partitionState);
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java
new file mode 100644
index 0000000..cad9e67
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.CurrentState;
+
+
+public interface CurrentStateChangeListener
+{
+
+  /**
+   * Invoked when current state changes
+   * 
+   * @param instanceName
+   * @param statesInfo
+   * @param changeContext
+   */
+  public void onStateChange(String instanceName,
+                            List<CurrentState> statesInfo,
+                            NotificationContext changeContext);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/DataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/DataAccessor.java b/helix-core/src/main/java/org/apache/helix/DataAccessor.java
new file mode 100644
index 0000000..1fa7171
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/DataAccessor.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+import java.util.Map;
+/**
+ * use {@link HelixDataAccessor}.
+ */
+@Deprecated
+public interface DataAccessor
+{
+
+  /**
+   * Set a property, overwrite if it exists and creates if not exists
+   * 
+   * @param type
+   * @param value
+   * @param keys
+   * @true if the operation was successful
+   */
+  boolean setProperty(PropertyType type, ZNRecord value, String... keys);
+
+  boolean setProperty(PropertyType type, HelixProperty value, String... keys);
+
+  /**
+   * Updates a property, either overwrite or merge based on the
+   * propertyType.mergeOnUpdate, fails to update if
+   * propertyType.updateOnlyOnExists and does not exist
+   * 
+   * @param type
+   * @param value
+   * @param keys
+   * @return true if the update was successful
+   */
+  boolean updateProperty(PropertyType type, ZNRecord value, String... keys);
+
+  boolean updateProperty(PropertyType type, HelixProperty value, String... keys);
+
+  /**
+   * Return the property value, it must be a leaf
+   * 
+   * @param type
+   * @param keys
+   *          one or more keys used to get the path of znode
+   * @return value, Null if absent or on error
+   */
+  ZNRecord getProperty(PropertyType type, String... keys);
+
+  <T extends HelixProperty> T getProperty(Class<T> clazz, PropertyType type, String... keys);
+
+  /**
+   * Removes the property
+   * 
+   * @param type
+   * @param keys
+   * @return
+   */
+  boolean removeProperty(PropertyType type, String... keys);
+
+  /**
+   * Return the child names of the property
+   * 
+   * @param type
+   * @param keys
+   * @return SubPropertyNames
+   */
+  List<String> getChildNames(PropertyType type, String... keys);
+
+  /**
+   * 
+   * @param type
+   * @param keys
+   *          must point to parent of leaf znodes
+   * @return subPropertyValues
+   */
+  List<ZNRecord> getChildValues(PropertyType type, String... keys);
+
+  <T extends HelixProperty> List<T> getChildValues(Class<T> clazz, PropertyType type,
+      String... keys);
+
+  <T extends HelixProperty> Map<String, T> getChildValuesMap(Class<T> clazz, PropertyType type,
+      String... keys);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java b/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java
new file mode 100644
index 0000000..0d52e4c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.ExternalView;
+
+
+public interface ExternalViewChangeListener
+{
+
+  /**
+   * Invoked when external view changes
+   * 
+   * @param externalViewList
+   * @param changeContext
+   */
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+                                   NotificationContext changeContext);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
new file mode 100644
index 0000000..5a670dd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -0,0 +1,168 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.log4j.Logger;
+
+// TODO: move to mananger.zk
+public class GroupCommit
+{ 
+  private static Logger  LOG = Logger.getLogger(GroupCommit.class);
+  private static class Queue
+  {
+    final AtomicReference<Thread>      _running = new AtomicReference<Thread>();
+    final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>();
+  }
+
+  private static class Entry
+  {
+    final String   _key;
+    final ZNRecord _record;
+    AtomicBoolean  _sent = new AtomicBoolean(false);
+
+    Entry(String key, ZNRecord record)
+    {
+      _key = key;
+      _record = record;
+    }
+  }
+
+  private final Queue[]               _queues = new Queue[100];
+  
+  // potential memory leak if we add resource and remove resource
+  // TODO: move the cache logic to data accessor
+//  private final Map<String, ZNRecord> _cache  = new ConcurrentHashMap<String, ZNRecord>();
+
+  
+  public GroupCommit()
+  {
+    // Don't use Arrays.fill();
+    for (int i = 0; i < _queues.length; ++i)
+    {
+      _queues[i] = new Queue();
+    }
+  }
+
+  private Queue getQueue(String key)
+  {
+    return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
+  }
+
+  public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key, ZNRecord record)
+  {
+    Queue queue = getQueue(key);
+    Entry entry = new Entry(key, record);
+
+    queue._pending.add(entry);
+
+    while (!entry._sent.get())
+    {
+      if (queue._running.compareAndSet(null, Thread.currentThread()))
+      {
+        ArrayList<Entry> processed = new ArrayList<Entry>();
+        try
+        {
+          if (queue._pending.peek() == null)
+            return true;
+
+          // remove from queue
+          Entry first = queue._pending.poll();
+          processed.add(first);
+
+          String mergedKey = first._key;
+//          ZNRecord merged = _cache.get(mergedKey);
+          ZNRecord merged = null;
+          
+          try
+          {
+            // accessor will fallback to zk if not found in cache
+            merged = accessor.get(mergedKey, null, options);
+          }
+          catch (ZkNoNodeException e)
+          {
+            // OK.
+          }
+          
+          /**
+           * If the local cache does not contain a value, need to check if there is a 
+           * value in ZK; use it as initial value if exists
+           */
+          if (merged == null)
+          {
+//            ZNRecord valueOnZk = null;
+//            try
+//            {
+//              valueOnZk = accessor.get(mergedKey, null, 0);
+//            }
+//            catch(Exception e)
+//            {
+//              LOG.info(e);
+//            }
+//            if(valueOnZk != null)
+//            {
+//              merged = valueOnZk;
+//              merged.merge(first._record);
+//            }
+//            else // Zk path has null data. use the first record as initial record.
+            {
+              merged = new ZNRecord(first._record);
+            }
+          }
+          else
+          {
+            merged.merge(first._record);
+          }
+          Iterator<Entry> it = queue._pending.iterator();
+          while (it.hasNext())
+          {
+            Entry ent = it.next();
+            if (!ent._key.equals(mergedKey))
+              continue;
+            processed.add(ent);
+            merged.merge(ent._record);
+            // System.out.println("After merging:" + merged);
+            it.remove();
+          }
+          // System.out.println("size:"+ processed.size());
+          accessor.set(mergedKey, merged, options);
+          // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
+          // _cache.put(mergedKey, merged);
+        }
+        finally
+        {
+          queue._running.set(null);
+          for (Entry e : processed)
+          {
+            synchronized (e)
+            {
+              e._sent.set(true);
+              e.notify();
+            }
+          }
+        }
+      }
+      else
+      {
+        synchronized (entry)
+        {
+          try
+          {
+            entry.wait(10);
+          }
+          catch (InterruptedException e)
+          {
+            e.printStackTrace();
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java
new file mode 100644
index 0000000..bf5d82d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.Message;
+
+
+public interface HealthStateChangeListener
+{
+
+  /**
+   * Invoked when health stats change
+   * 
+   * @param instanceName
+   * @param reports
+   * @param changeContext
+   */
+  public void onHealthChange(String instanceName,
+                             List<HealthStat> reports,
+                             NotificationContext changeContext);
+}