You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:57 UTC
[4/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java
new file mode 100644
index 0000000..20605a0
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestClusterManagementWebapp.java
@@ -0,0 +1,649 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.webapp;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.apache.helix.tools.AdminTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.restlet.Client;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Protocol;
+import org.restlet.data.Reference;
+import org.restlet.data.Request;
+import org.restlet.data.Response;
+import org.restlet.resource.Representation;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+
+public class TestClusterManagementWebapp extends AdminTestBase
+{
+ @Test
+ public void testInvocation() throws Exception
+ {
+ verifyAddCluster();
+ verifyAddStateModel();
+ verifyAddHostedEntity();
+ verifyAddInstance();
+ verifyRebalance();
+ verifyEnableInstance();
+ verifyAlterIdealState();
+ verifyConfigAccessor();
+
+ verifyEnableCluster();
+
+ System.out.println("Test passed!!");
+ }
+
+ /*
+ * Test case as steps
+ */
+ String clusterName = "cluster-12345";
+ String resourceGroupName = "new-entity-12345";
+ String instance1 = "test-1";
+ String statemodel = "state_model";
+ int instancePort = 9999;
+ int partitions = 10;
+ int replicas = 3;
+
+ void verifyAddStateModel() throws JsonGenerationException,
+ JsonMappingException,
+ IOException
+ {
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+ + "/StateModelDefs/MasterSlave";
+ Reference resourceRef = new Reference(httpUrlBase);
+ Request request = new Request(Method.GET, resourceRef);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+
+ Map<String, String> paraMap = new HashMap<String, String>();
+
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND,
+ ClusterSetup.addStateModelDef);
+
+ ZNRecord r = new ZNRecord("Test");
+ r.merge(zn);
+ StateModelDefinition newStateModel = new StateModelDefinition(r);
+
+ httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/StateModelDefs";
+ resourceRef = new Reference(httpUrlBase);
+ request = new Request(Method.POST, resourceRef);
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap) + "&"
+ + JsonParameters.NEW_STATE_MODEL_DEF + "="
+ + ClusterRepresentationUtil.ZNRecordToJson(r),
+ MediaType.APPLICATION_ALL);
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ AssertJUnit.assertTrue(sw.toString().contains("Test"));
+ }
+
+ void verifyAddCluster() throws IOException,
+ InterruptedException
+ {
+ String httpUrlBase = "http://localhost:" + ADMIN_PORT + "/clusters";
+ Map<String, String> paraMap = new HashMap<String, String>();
+
+ paraMap.put(JsonParameters.CLUSTER_NAME, clusterName);
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
+
+ Reference resourceRef = new Reference(httpUrlBase);
+
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ AssertJUnit.assertTrue(zn.getListField("clusters").contains(clusterName));
+
+ }
+
+ void verifyAddHostedEntity() throws JsonGenerationException,
+ JsonMappingException,
+ IOException
+ {
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+ Map<String, String> paraMap = new HashMap<String, String>();
+
+ paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, resourceGroupName);
+ paraMap.put(JsonParameters.PARTITIONS, "" + partitions);
+ paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
+
+ Reference resourceRef = new Reference(httpUrlBase);
+
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ AssertJUnit.assertTrue(zn.getListField("ResourceGroups").contains(resourceGroupName));
+
+ httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+ + resourceGroupName;
+ resourceRef = new Reference(httpUrlBase);
+
+ request = new Request(Method.GET, resourceRef);
+
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+ }
+
+ void verifyAddInstance() throws JsonGenerationException,
+ JsonMappingException,
+ IOException
+ {
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/instances";
+ Map<String, String> paraMap = new HashMap<String, String>();
+ // Add 1 instance
+ paraMap.put(JsonParameters.INSTANCE_NAME, instance1 + ":" + instancePort);
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+
+ Reference resourceRef = new Reference(httpUrlBase);
+
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ TypeReference<ArrayList<ZNRecord>> typeRef = new TypeReference<ArrayList<ZNRecord>>()
+ {
+ };
+ List<ZNRecord> znList = mapper.readValue(new StringReader(sw.toString()), typeRef);
+ AssertJUnit.assertTrue(znList.get(0).getId().equals(instance1 + "_" + instancePort));
+
+ // the case to add more than 1 instances
+ paraMap.clear();
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+
+ String[] instances = { "test2", "test3", "test4", "test5" };
+
+ String instanceNames = "";
+ boolean first = true;
+ for (String instance : instances)
+ {
+ if (first == true)
+ {
+ first = false;
+ }
+ else
+ {
+ instanceNames += ";";
+ }
+ instanceNames += (instance + ":" + instancePort);
+ }
+ paraMap.put(JsonParameters.INSTANCE_NAMES, instanceNames);
+
+ request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ mapper = new ObjectMapper();
+
+ znList = mapper.readValue(new StringReader(sw.toString()), typeRef);
+
+ for (String instance : instances)
+ {
+ boolean found = false;
+ for (ZNRecord r : znList)
+ {
+ String instanceId = instance + "_" + instancePort;
+ if (r.getId().equals(instanceId))
+ {
+ found = true;
+ break;
+ }
+ }
+ AssertJUnit.assertTrue(found);
+ }
+ }
+
+ void verifyRebalance() throws JsonGenerationException,
+ JsonMappingException,
+ IOException
+ {
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+ + resourceGroupName + "/idealState";
+ Map<String, String> paraMap = new HashMap<String, String>();
+ // Add 1 instance
+ paraMap.put(JsonParameters.REPLICAS, "" + replicas);
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+
+ Reference resourceRef = new Reference(httpUrlBase);
+
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+
+ for (int i = 0; i < partitions; i++)
+ {
+ String partitionName = resourceGroupName + "_" + i;
+ assert (r.getMapField(partitionName).size() == replicas);
+ }
+
+ httpUrlBase = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName;
+ resourceRef = new Reference(httpUrlBase);
+ request = new Request(Method.GET, resourceRef);
+
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ }
+
+ void verifyEnableInstance() throws JsonGenerationException,
+ JsonMappingException,
+ IOException
+ {
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/instances/"
+ + instance1 + "_" + instancePort;
+ Map<String, String> paraMap = new HashMap<String, String>();
+ // Add 1 instance
+ paraMap.put(JsonParameters.ENABLED, "" + false);
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
+
+ Reference resourceRef = new Reference(httpUrlBase);
+
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ AssertJUnit.assertTrue(r.getSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString())
+ .equals("" + false));
+
+ // Then enable it
+ paraMap.put(JsonParameters.ENABLED, "" + true);
+ request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap), MediaType.APPLICATION_ALL);
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ mapper = new ObjectMapper();
+ r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ AssertJUnit.assertTrue(r.getSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString())
+ .equals("" + true));
+ }
+
+ void verifyAlterIdealState() throws IOException
+ {
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+ + resourceGroupName + "/idealState";
+
+ Reference resourceRef = new Reference(httpUrlBase);
+ Request request = new Request(Method.GET, resourceRef);
+
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord r = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ String partitionName = "new-entity-12345_3";
+ r.getMapFields().remove(partitionName);
+
+ Map<String, String> paraMap = new HashMap<String, String>();
+ // Add 1 instance
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
+
+ resourceRef = new Reference(httpUrlBase);
+
+ request = new Request(Method.POST, resourceRef);
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paraMap) + "&"
+ + JsonParameters.NEW_IDEAL_STATE + "="
+ + ClusterRepresentationUtil.ZNRecordToJson(r),
+ MediaType.APPLICATION_ALL);
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ mapper = new ObjectMapper();
+ ZNRecord r2 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+ AssertJUnit.assertTrue(!r2.getMapFields().containsKey(partitionName));
+
+ for (String key : r2.getMapFields().keySet())
+ {
+ AssertJUnit.assertTrue(r.getMapFields().containsKey(key));
+ }
+ }
+
+ // verify get/post configs in different scopes
+ void verifyConfigAccessor() throws Exception
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ Client client = new Client(Protocol.HTTP);
+
+ // set/get cluster scope configs
+ String url =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/cluster/"
+ + clusterName;
+
+ postConfig(client, url, mapper, ClusterSetup.setConfig, "key1=value1,key2=value2");
+
+ ZNRecord record = get(client, url, mapper);
+ Assert.assertEquals(record.getSimpleFields().size(), 2);
+ Assert.assertEquals(record.getSimpleField("key1"), "value1");
+ Assert.assertEquals(record.getSimpleField("key2"), "value2");
+
+ // set/get participant scope configs
+ url =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+ + "/configs/participant/localhost_12918";
+
+ postConfig(client, url, mapper, ClusterSetup.setConfig, "key3=value3,key4=value4");
+
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getSimpleFields().size(), 2);
+ Assert.assertEquals(record.getSimpleField("key3"), "value3");
+ Assert.assertEquals(record.getSimpleField("key4"), "value4");
+
+ // set/get resource scope configs
+ url =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+ + "/configs/resource/testResource";
+
+ postConfig(client, url, mapper, ClusterSetup.setConfig, "key5=value5,key6=value6");
+
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getSimpleFields().size(), 2);
+ Assert.assertEquals(record.getSimpleField("key5"), "value5");
+ Assert.assertEquals(record.getSimpleField("key6"), "value6");
+
+ // set/get partition scope configs
+ url =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+ + "/configs/partition/testResource/testPartition";
+
+ postConfig(client, url, mapper, ClusterSetup.setConfig, "key7=value7,key8=value8");
+
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getSimpleFields().size(), 2);
+ Assert.assertEquals(record.getSimpleField("key7"), "value7");
+ Assert.assertEquals(record.getSimpleField("key8"), "value8");
+
+ // list keys
+ url = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs";
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getListFields().size(), 1);
+ Assert.assertTrue(record.getListFields().containsKey("scopes"));
+ Assert.assertTrue(contains(record.getListField("scopes"),
+ "CLUSTER",
+ "PARTICIPANT",
+ "RESOURCE",
+ "PARTITION"));
+
+ url = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/cluster";
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getListFields().size(), 1);
+ Assert.assertTrue(record.getListFields().containsKey("CLUSTER"));
+ Assert.assertTrue(contains(record.getListField("CLUSTER"), clusterName));
+
+ url =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/participant";
+ record = get(client, url, mapper);
+ Assert.assertTrue(record.getListFields().containsKey("PARTICIPANT"));
+ Assert.assertTrue(contains(record.getListField("PARTICIPANT"), "localhost_12918"));
+
+ url = "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/configs/resource";
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getListFields().size(), 1);
+ Assert.assertTrue(record.getListFields().containsKey("RESOURCE"));
+ Assert.assertTrue(contains(record.getListField("RESOURCE"), "testResource"));
+
+ url =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName
+ + "/configs/partition/testResource";
+ record = get(client, url, mapper);
+ Assert.assertEquals(record.getListFields().size(), 1);
+ Assert.assertTrue(record.getListFields().containsKey("PARTITION"));
+ Assert.assertTrue(contains(record.getListField("PARTITION"), "testPartition"));
+
+ }
+
+ private ZNRecord get(Client client, String url, ObjectMapper mapper) throws Exception
+ {
+ Request request = new Request(Method.GET, new Reference(url));
+ Response response = client.handle(request);
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+ String responseStr = sw.toString();
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+
+ ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+ return record;
+ }
+
+ private void postConfig(Client client,
+ String url,
+ ObjectMapper mapper,
+ String command,
+ String configs) throws Exception
+ {
+ Map<String, String> params = new HashMap<String, String>();
+
+ params.put(JsonParameters.MANAGEMENT_COMMAND, command);
+ params.put(JsonParameters.CONFIGS, configs);
+
+ Request request = new Request(Method.POST, new Reference(url));
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(params), MediaType.APPLICATION_ALL);
+
+ Response response = client.handle(request);
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+ String responseStr = sw.toString();
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+ }
+
+ void verifyEnableCluster() throws Exception
+ {
+ System.out.println("START: verifyEnableCluster()");
+ String httpUrlBase =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/Controller";
+ Map<String, String> paramMap = new HashMap<String, String>();
+
+ paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableCluster);
+ paramMap.put(JsonParameters.ENABLED, "" + false);
+
+ Reference resourceRef = new Reference(httpUrlBase);
+
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paramMap), MediaType.APPLICATION_ALL);
+ Client client = new Client(Protocol.HTTP);
+ Response response = client.handle(request);
+
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ // verify pause znode exists
+ String pausePath = PropertyPathConfig.getPath(PropertyType.PAUSE, clusterName);
+ System.out.println("pausePath: " + pausePath);
+ boolean exists = _gZkClient.exists(pausePath);
+ Assert.assertTrue(exists, pausePath + " should exist");
+
+ // Then enable it
+ paramMap.put(JsonParameters.ENABLED, "" + true);
+ request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(JsonParameters.JSON_PARAMETERS + "="
+ + ClusterRepresentationUtil.ObjectToJson(paramMap), MediaType.APPLICATION_ALL);
+ client = new Client(Protocol.HTTP);
+ response = client.handle(request);
+
+ result = response.getEntity();
+ sw = new StringWriter();
+ result.write(sw);
+
+ System.out.println(sw.toString());
+
+ // verify pause znode doesn't exist
+ exists = _gZkClient.exists(pausePath);
+ Assert.assertFalse(exists, pausePath + " should be removed");
+
+ System.out.println("END: verifyEnableCluster()");
+ }
+
+ private boolean contains(List<String> list, String... items)
+ {
+ for (String item : items)
+ {
+ if (!list.contains(item))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/AccessOption.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/AccessOption.java b/helix-core/src/main/java/org/apache/helix/AccessOption.java
new file mode 100644
index 0000000..6cfc163
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/AccessOption.java
@@ -0,0 +1,52 @@
+package org.apache.helix;
+
+import org.apache.zookeeper.CreateMode;
+
+public class AccessOption
+{
+ public static int PERSISTENT = 0x1;
+ public static int EPHEMERAL = 0x2;
+ public static int PERSISTENT_SEQUENTIAL = 0x4;
+ public static int EPHEMERAL_SEQUENTIAL = 0x8;
+ public static int THROW_EXCEPTION_IFNOTEXIST = 0x10;
+
+ /**
+ * Helper method to get zookeeper create mode from options
+ *
+ * @param options
+ * @return zookeeper create mode
+ */
+ public static CreateMode getMode(int options)
+ {
+ if ((options & PERSISTENT) > 0)
+ {
+ return CreateMode.PERSISTENT;
+ }
+ else if ((options & EPHEMERAL) > 0)
+ {
+ return CreateMode.EPHEMERAL;
+ }
+ else if ((options & PERSISTENT_SEQUENTIAL) > 0)
+ {
+ return CreateMode.PERSISTENT_SEQUENTIAL;
+ }
+ else if ((options & EPHEMERAL_SEQUENTIAL) > 0)
+ {
+ return CreateMode.EPHEMERAL_SEQUENTIAL;
+ }
+
+ return null;
+ }
+
+ /**
+ * Helper method to get is-throw-exception-on-node-not-exist from options
+ *
+ * @param options
+ * @return
+ */
+ public static boolean isThrowExceptionIfNotExist(int options)
+ {
+ return (options & THROW_EXCEPTION_IFNOTEXIST) > 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
new file mode 100644
index 0000000..af46a52
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -0,0 +1,192 @@
+package org.apache.helix;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+public interface BaseDataAccessor<T>
+{
+ /**
+ * This will always attempt to create the znode, if it exists it will return false. Will
+ * create parents if they do not exist. For performance reasons, it may try to create
+ * child first and only if it fails it will try to create parent
+ *
+ * @param path
+ * @param record
+ * @return
+ */
+ boolean create(String path, T record, int options);
+
+ /**
+ * This will always attempt to set the data on existing node. If the znode does not
+ * exist it will create it.
+ *
+ * @param path
+ * @param record
+ * @return
+ */
+ boolean set(String path, T record, int options);
+
+ /**
+ * This will attempt to merge with existing data by calling znrecord.merge and if it
+ * does not exist it will create it znode
+ *
+ * @param path
+ * @param record
+ * @return
+ */
+ boolean update(String path, DataUpdater<T> updater, int options);
+
+ /**
+ * This will remove znode and all it's child nodes if any
+ *
+ * @param path
+ * @return
+ */
+ boolean remove(String path, int options);
+
+ /**
+ * Use it when creating children under a parent node. This will use async api for better
+ * performance. If the child already exists it will return false.
+ *
+ * @param parentPath
+ * @param record
+ * @return
+ */
+ boolean[] createChildren(List<String> paths, List<T> records, int options);
+
+ /**
+ * can set multiple children under a parent node. This will use async api for better
+ * performance. If this child does not exist it will create it.
+ *
+ * @param parentPath
+ * @param record
+ */
+ boolean[] setChildren(List<String> paths, List<T> records, int options);
+
+ /**
+ * Can update multiple nodes using async api for better performance. If a child does not
+ * exist it will create it.
+ *
+ * @param parentPath
+ * @param record
+ * @return
+ */
+ boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options);
+
+ /**
+ * remove multiple paths using async api. will remove any child nodes if any
+ *
+ * @param paths
+ * @return
+ */
+ boolean[] remove(List<String> paths, int options);
+
+ /**
+ * Get the {@link T} corresponding to the path
+ *
+ * @param path
+ * @return
+ */
+ T get(String path, Stat stat, int options);
+
+ /**
+ * Get List of {@link T} corresponding to the paths using async api
+ *
+ * @param paths
+ * @return
+ */
+ List<T> get(List<String> paths, List<Stat> stats, int options);
+
+ /**
+ * Get the children under a parent path using async api
+ *
+ * @param path
+ * @return
+ */
+ List<T> getChildren(String parentPath, List<Stat> stats, int options);
+
+ /**
+ * Returns the child names given a parent path
+ *
+ * @param type
+ * @param keys
+ * @return
+ */
+ List<String> getChildNames(String parentPath, int options);
+
+ /**
+ * checks if the path exists in zk
+ *
+ * @param path
+ * @return
+ */
+ boolean exists(String path, int options);
+
+ /**
+ * checks if the all the paths exists
+ *
+ * @param paths
+ * @return
+ */
+ boolean[] exists(List<String> paths, int options);
+
+ /**
+ * Get the stats of all the paths
+ *
+ * @param paths
+ * @return
+ */
+ Stat[] getStats(List<String> paths, int options);
+
+ /**
+ * Get the stats of all the paths
+ *
+ * @param paths
+ * @return
+ */
+ Stat getStat(String path, int options);
+
+ /**
+ * Subscribe data listener to path
+ *
+ * @param path
+ * @param listener
+ * @return
+ */
+ void subscribeDataChanges(String path, IZkDataListener listener);
+
+ /**
+ * Unsubscribe data listener to path
+ *
+ * @param path
+ * @param listener
+ */
+ void unsubscribeDataChanges(String path, IZkDataListener listener);
+
+ /**
+ * Subscribe child listener to path
+ *
+ * @param path
+ * @param listener
+ * @return
+ */
+ List<String> subscribeChildChanges(String path, IZkChildListener listener);
+
+ /**
+ * Unsubscribe child listener to path
+ *
+ * @param path
+ * @param listener
+ */
+ void unsubscribeChildChanges(String path, IZkChildListener listener);
+
+ /**
+ * reset the cache if any, when session expiry happens
+ */
+ void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
new file mode 100644
index 0000000..316ff54
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+
+/**
+ * Provides the ability to <br>
+ * <li>Send message to a specific component in the cluster[ participant,
+ * controller, Router(probably not needed) ]</li> <li>Broadcast message to all
+ * nodes</li> <li>Send message to instances that hold a specific resource</li>
+ * <li>Asynchronous request response api. Send message with a co-relation id and
+ * invoke a method when there is a response. Can support timeout.</li>
+ *
+ * @author kgopalak
+ *
+ */
+public interface ClusterMessagingService
+{
+ /**
+ * Send message matching the specifications mentioned in recipientCriteria.
+ *
+ * @param receipientCriteria
+ * @See Criteria
+ * @param message
+ * message to be sent. Some attributes of this message will be
+ * changed as required
+ * @return returns how many messages were successfully sent.
+ */
+ int send(Criteria recipientCriteria, Message message);
+
+ /**
+ * This will send the message to all instances matching the criteria<br>
+ * When there is a reply to the message sent AsynCallback.onReply will be
+ * invoked. Application can specify a timeout on AsyncCallback. After every
+ * reply is processed AsyncCallback.isDone will be invoked.<br>
+ * This method will return after sending the messages. <br>
+ * This is useful when message need to be sent and current thread need not
+ * wait for response since processing will be done in another thread.
+ *
+ * @param receipientCriteria
+ * @param message
+ * @param callbackOnReply
+ * @param timeOut
+ * @param retryCount
+ * @return
+ */
+ int send(Criteria receipientCriteria, Message message,
+ AsyncCallback callbackOnReply, int timeOut);
+
+ int send(Criteria receipientCriteria, Message message,
+ AsyncCallback callbackOnReply, int timeOut, int retryCount);
+
+ /**
+ * This will send the message to all instances matching the criteria<br>
+ * When there is a reply to the message sent AsynCallback.onReply will be
+ * invoked. Application can specify a timeout on AsyncCallback. After every
+ * reply is processed AsyncCallback.isDone will be invoked.<br>
+ * This method will return only after the AsyncCallback.isDone() returns true <br>
+ * This is useful when message need to be sent and current thread has to wait
+ * for response. <br>
+ * The current thread can use callbackOnReply instance to store application
+ * specific data.
+ *
+ * @param receipientCriteria
+ * @param message
+ * @param callbackOnReply
+ * @param timeOut
+ * @param retryCount
+ * @return
+ */
+ int sendAndWait(Criteria receipientCriteria, Message message,
+ AsyncCallback callbackOnReply, int timeOut);
+
+ int sendAndWait(Criteria receipientCriteria, Message message,
+ AsyncCallback callbackOnReply, int timeOut, int retryCount);
+
+ /**
+ * This will register a message handler factory to create handlers for
+ * message. In case client code defines its own message type, it can define a
+ * message handler factory to create handlers to process those messages.
+ * Messages are processed in a threadpool which is hosted by cluster manager,
+ * and cluster manager will call the factory to create handler, and the
+ * handler is called in the threadpool.
+ *
+ * Note that only one message handler factory can be registered with one
+ * message type.
+ *
+ * @param type
+ * The message type that the factory will create handler for
+ * @param factory
+ * The per-type message factory
+ * @param threadpoolSize
+ * size of the execution threadpool that handles the message
+ * @return
+ */
+ public void registerMessageHandlerFactory(String type,
+ MessageHandlerFactory factory);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ClusterView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterView.java b/helix-core/src/main/java/org/apache/helix/ClusterView.java
new file mode 100644
index 0000000..12c790c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ClusterView.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+@Deprecated
+public class ClusterView
+{
+ private Map<PropertyType, List<ZNRecord>> clusterPropertyLists;
+
+ // getter/setter's are needed for private fields for
+ // serialization/de-serialization
+ // ref:
+ // http://jackson.codehaus.org/DataBindingDeepDive
+ // @JsonProperty
+ public void setClusterPropertyLists(Map<PropertyType, List<ZNRecord>> clusterPropertyLists)
+ {
+ this.clusterPropertyLists = clusterPropertyLists;
+ }
+
+ // @JsonProperty
+ public Map<PropertyType, List<ZNRecord>> getPropertyLists()
+ {
+ return clusterPropertyLists;
+ }
+
+ public void setClusterPropertyList(PropertyType type, List<ZNRecord> propertyList)
+ {
+ clusterPropertyLists.put(type, propertyList);
+ }
+
+ public List<ZNRecord> getPropertyList(PropertyType type)
+ {
+ return clusterPropertyLists.get(type);
+ }
+
+ public void setMemberInstanceMap(Map<String, MemberInstance> memberInstanceMap)
+ {
+ this._memberInstanceMap = memberInstanceMap;
+ }
+
+ @JsonProperty
+ public Map<String, MemberInstance> getMemberInstanceMap()
+ {
+ return _memberInstanceMap;
+ }
+
+ public void set_memberInstanceMap(Map<String, MemberInstance> _memberInstanceMap)
+ {
+ this._memberInstanceMap = _memberInstanceMap;
+ }
+
+ private Map<String, MemberInstance> _memberInstanceMap;
+ private List<MemberInstance> _instances;
+
+ public void setInstances(List<MemberInstance> instances)
+ {
+ this._instances = instances;
+ }
+
+ public List<MemberInstance> getInstances()
+ {
+ return _instances;
+ }
+
+ public static class MemberInstance
+ {
+ private Map<PropertyType, List<ZNRecord>> _instanceProperties = new TreeMap<PropertyType, List<ZNRecord>>();
+
+ public void setClusterProperties(Map<PropertyType, List<ZNRecord>> instanceProperties)
+ {
+ this._instanceProperties = instanceProperties;
+ }
+
+ // @JsonProperty
+ public Map<PropertyType, List<ZNRecord>> getInstanceProperties()
+ {
+ return _instanceProperties;
+ }
+
+ public void setInstanceProperty(PropertyType type, List<ZNRecord> values)
+ {
+ _instanceProperties.put(type, values);
+ }
+
+ public List<ZNRecord> getInstanceProperty(PropertyType type)
+ {
+ return _instanceProperties.get(type);
+ }
+
+ private String _instanceName;
+
+ // for JSON de-serialization
+ public MemberInstance()
+ {
+
+ }
+
+ public MemberInstance(String instanceName)
+ {
+ this._instanceName = instanceName;
+ }
+
+ public String getInstanceName()
+ {
+ return _instanceName;
+ }
+
+ public void setInstanceName(String instanceName)
+ {
+ this._instanceName = instanceName;
+ }
+
+ }
+
+ public MemberInstance getMemberInstance(String instanceName, boolean createNewIfAbsent)
+ {
+ if (!_memberInstanceMap.containsKey(instanceName))
+ {
+ _memberInstanceMap.put(instanceName, new MemberInstance(instanceName));
+ }
+ return _memberInstanceMap.get(instanceName);
+ }
+
+ private List<ZNRecord> _externalView;
+
+ public ClusterView()
+ {
+ clusterPropertyLists = new TreeMap<PropertyType, List<ZNRecord>>();
+ setClusterPropertyList(PropertyType.IDEALSTATES, new ArrayList<ZNRecord>());
+ setClusterPropertyList(PropertyType.CONFIGS, new ArrayList<ZNRecord>());
+ setClusterPropertyList(PropertyType.LIVEINSTANCES, new ArrayList<ZNRecord>());
+ setClusterPropertyList(PropertyType.INSTANCES, new ArrayList<ZNRecord>());
+
+ _memberInstanceMap = new HashMap<String, ClusterView.MemberInstance>();
+ }
+
+ public List<ZNRecord> getExternalView()
+ {
+ return _externalView;
+ }
+
+ public void setExternalView(List<ZNRecord> externalView)
+ {
+ this._externalView = externalView;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
new file mode 100644
index 0000000..ec53000
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.util.StringTemplate;
+import org.apache.log4j.Logger;
+
+
+public class ConfigAccessor
+{
+ private static Logger LOG = Logger.getLogger(ConfigAccessor.class);
+
+ private static final StringTemplate template = new StringTemplate();
+ static
+ {
+ // @formatter:off
+ template.addEntry(ConfigScopeProperty.CLUSTER, 1, "/{clusterName}/CONFIGS/CLUSTER");
+ template.addEntry(ConfigScopeProperty.CLUSTER,
+ 2,
+ "/{clusterName}/CONFIGS/CLUSTER/{clusterName}|SIMPLEKEYS");
+ template.addEntry(ConfigScopeProperty.PARTICIPANT,
+ 1,
+ "/{clusterName}/CONFIGS/PARTICIPANT");
+ template.addEntry(ConfigScopeProperty.PARTICIPANT,
+ 2,
+ "/{clusterName}/CONFIGS/PARTICIPANT/{participantName}|SIMPLEKEYS");
+ template.addEntry(ConfigScopeProperty.RESOURCE, 1, "/{clusterName}/CONFIGS/RESOURCE");
+ template.addEntry(ConfigScopeProperty.RESOURCE,
+ 2,
+ "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|SIMPLEKEYS");
+ template.addEntry(ConfigScopeProperty.PARTITION,
+ 2,
+ "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|MAPKEYS");
+ template.addEntry(ConfigScopeProperty.PARTITION,
+ 3,
+ "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|MAPMAPKEYS|{partitionName}");
+ // @formatter:on
+ }
+
+ private final ZkClient zkClient;
+
+ public ConfigAccessor(ZkClient zkClient)
+ {
+ this.zkClient = zkClient;
+ }
+
+ /**
+ * Get config value
+ *
+ * @param scope
+ * @param key
+ * @return value or null if doesn't exist
+ */
+ public String get(ConfigScope scope, String key)
+ {
+ if (scope == null || scope.getScope() == null)
+ {
+ LOG.error("Scope can't be null");
+ return null;
+ }
+
+ String value = null;
+ String clusterName = scope.getClusterName();
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+ {
+ throw new HelixException("cluster " + clusterName + " is not setup yet");
+ }
+
+ String scopeStr = scope.getScopeStr();
+ String[] splits = scopeStr.split("\\|");
+
+ ZNRecord record = zkClient.readData(splits[0], true);
+
+ if (record != null)
+ {
+ if (splits.length == 1)
+ {
+ value = record.getSimpleField(key);
+ }
+ else if (splits.length == 2)
+ {
+ if (record.getMapField(splits[1]) != null)
+ {
+ value = record.getMapField(splits[1]).get(key);
+ }
+ }
+ }
+ return value;
+
+ }
+
+ /**
+ * Set a config value
+ *
+ * @param scope
+ * @param key
+ * @param value
+ */
+ public void set(ConfigScope scope, String key, String value)
+ {
+ if (scope == null || scope.getScope() == null)
+ {
+ LOG.error("Scope can't be null");
+ return;
+ }
+
+ String clusterName = scope.getClusterName();
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+ {
+ throw new HelixException("cluster " + clusterName + " is not setup yet");
+ }
+
+ String scopeStr = scope.getScopeStr();
+ String[] splits = scopeStr.split("\\|");
+
+ String id = splits[0].substring(splits[0].lastIndexOf('/') + 1);
+ ZNRecord update = new ZNRecord(id);
+ if (splits.length == 1)
+ {
+ update.setSimpleField(key, value);
+ }
+ else if (splits.length == 2)
+ {
+ if (update.getMapField(splits[1]) == null)
+ {
+ update.setMapField(splits[1], new TreeMap<String, String>());
+ }
+ update.getMapField(splits[1]).put(key, value);
+ }
+ ZKUtil.createOrUpdate(zkClient, splits[0], update, true, true);
+ return;
+ }
+
+ /**
+ * Remove config value
+ *
+ * @param scope
+ * @param key
+ */
+ public void remove(ConfigScope scope, String key)
+ {
+ if (scope == null || scope.getScope() == null)
+ {
+ LOG.error("Scope can't be null");
+ return;
+ }
+
+ String clusterName = scope.getClusterName();
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+ {
+ throw new HelixException("cluster " + clusterName + " is not setup yet");
+ }
+
+ String scopeStr = scope.getScopeStr();
+ String[] splits = scopeStr.split("\\|");
+
+ String id = splits[0].substring(splits[0].lastIndexOf('/') + 1);
+ ZNRecord update = new ZNRecord(id);
+ if (splits.length == 1)
+ {
+ // subtract doesn't care about value, use empty string
+ update.setSimpleField(key, "");
+ }
+ else if (splits.length == 2)
+ {
+ if (update.getMapField(splits[1]) == null)
+ {
+ update.setMapField(splits[1], new TreeMap<String, String>());
+ }
+ // subtract doesn't care about value, use empty string
+ update.getMapField(splits[1]).put(key, "");
+ }
+
+ ZKUtil.subtract(zkClient, splits[0], update);
+ return;
+ }
+
+ /**
+ * Get a list of config keys
+ *
+ * @param type
+ * @param clusterName
+ * @param keys
+ * @return
+ */
+ public List<String> getKeys(ConfigScopeProperty type,
+ String clusterName,
+ String... keys)
+ {
+ if (type == null || clusterName == null)
+ {
+ LOG.error("clusterName|scope can't be null");
+ return Collections.emptyList();
+ }
+
+ try
+ {
+ if (!ZKUtil.isClusterSetup(clusterName, zkClient))
+ {
+ LOG.error("cluster " + clusterName + " is not setup yet");
+ return Collections.emptyList();
+ }
+
+ String[] args = new String[1 + keys.length];
+ args[0] = clusterName;
+ System.arraycopy(keys, 0, args, 1, keys.length);
+ String scopeStr = template.instantiate(type, args);
+ String[] splits = scopeStr.split("\\|");
+ List<String> retKeys = null;
+ if (splits.length == 1)
+ {
+ retKeys = zkClient.getChildren(splits[0]);
+ }
+ else
+ {
+ ZNRecord record = zkClient.readData(splits[0]);
+
+ if (splits[1].startsWith("SIMPLEKEYS"))
+ {
+ retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
+
+ }
+ else if (splits[1].startsWith("MAPKEYS"))
+ {
+ retKeys = new ArrayList<String>(record.getMapFields().keySet());
+ }
+ else if (splits[1].startsWith("MAPMAPKEYS"))
+ {
+ retKeys = new ArrayList<String>(record.getMapField(splits[2]).keySet());
+ }
+ }
+ if (retKeys == null)
+ {
+ LOG.error("Invalid scope: " + type + " or keys: " + Arrays.toString(args));
+ return Collections.emptyList();
+ }
+
+ Collections.sort(retKeys);
+ return retKeys;
+ }
+ catch (Exception e)
+ {
+ return Collections.emptyList();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java b/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java
new file mode 100644
index 0000000..32b8622
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigChangeListener.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.InstanceConfig;
+
+
+/**
+ * @author kgopalak
+ *
+ */
+public interface ConfigChangeListener
+{
+
+ /**
+ * Invoked when participant config changes
+ *
+ * @param configs
+ * @param changeContext
+ */
+ public void onConfigChange(List<InstanceConfig> configs,
+ NotificationContext changeContext);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigScope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigScope.java b/helix-core/src/main/java/org/apache/helix/ConfigScope.java
new file mode 100644
index 0000000..158e928
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigScope.java
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.util.StringTemplate;
+import org.apache.log4j.Logger;
+
+
+public class ConfigScope
+{
+ public enum ConfigScopeProperty
+ {
+ CLUSTER, PARTICIPANT, RESOURCE, PARTITION, CONSTRAINT;
+ }
+
+ private static Logger LOG = Logger.getLogger(ConfigScope.class);
+
+ private static final List<ConfigScopeProperty> scopePriority =
+ new ArrayList<ConfigScopeProperty>();
+ private static final Map<ConfigScopeProperty, Map<ConfigScopeProperty, ConfigScopeProperty>> scopeTransition =
+ new HashMap<ConfigScopeProperty, Map<ConfigScopeProperty, ConfigScopeProperty>>();
+ private static final StringTemplate template = new StringTemplate();
+ static
+ {
+ // scope priority: CLUSTER > PARTICIPANT > RESOURCE > PARTITION
+ scopePriority.add(ConfigScopeProperty.CLUSTER);
+ scopePriority.add(ConfigScopeProperty.PARTICIPANT);
+ scopePriority.add(ConfigScopeProperty.RESOURCE);
+ scopePriority.add(ConfigScopeProperty.PARTITION);
+
+ // scope transition table to check valid inputs
+ scopeTransition.put(ConfigScopeProperty.CLUSTER,
+ new HashMap<ConfigScopeProperty, ConfigScopeProperty>());
+ scopeTransition.get(ConfigScopeProperty.CLUSTER).put(ConfigScopeProperty.PARTICIPANT,
+ ConfigScopeProperty.PARTICIPANT);
+ scopeTransition.get(ConfigScopeProperty.CLUSTER).put(ConfigScopeProperty.RESOURCE,
+ ConfigScopeProperty.RESOURCE);
+ scopeTransition.put(ConfigScopeProperty.RESOURCE,
+ new HashMap<ConfigScopeProperty, ConfigScopeProperty>());
+ scopeTransition.get(ConfigScopeProperty.RESOURCE).put(ConfigScopeProperty.PARTITION,
+ ConfigScopeProperty.PARTITION);
+
+ // string templates to generate znode path/index
+ // @formatter:off
+ template.addEntry(ConfigScopeProperty.CLUSTER,
+ 2,
+ "/{clusterName}/CONFIGS/CLUSTER/{clusterName}");
+ template.addEntry(ConfigScopeProperty.PARTICIPANT,
+ 2,
+ "/{clusterName}/CONFIGS/PARTICIPANT/{participantName}");
+ template.addEntry(ConfigScopeProperty.RESOURCE,
+ 2,
+ "/{clusterName}/CONFIGS/RESOURCE/{resourceName}");
+ template.addEntry(ConfigScopeProperty.PARTITION,
+ 3,
+ "/{clusterName}/CONFIGS/RESOURCE/{resourceName}|{partitionName}");
+ // @formatter:on
+ }
+
+ private final String _clusterName;
+ private final ConfigScopeProperty _scope;
+ private final String _scopeStr;
+
+ ConfigScope(ConfigScopeBuilder configScopeBuilder)
+ {
+ Map<ConfigScopeProperty, String> scopeMap = configScopeBuilder
+ .getScopeMap();
+ List<String> keys = new ArrayList<String>();
+
+ ConfigScopeProperty curScope = null;
+ for (ConfigScopeProperty scope : scopePriority)
+ {
+ if (scopeMap.containsKey(scope))
+ {
+ if (curScope == null && scope == ConfigScopeProperty.CLUSTER)
+ {
+ keys.add(scopeMap.get(scope));
+ curScope = ConfigScopeProperty.CLUSTER;
+ } else if (curScope == null)
+ {
+ throw new IllegalArgumentException("Missing CLUSTER scope. Can't build scope using " + configScopeBuilder);
+ } else
+ {
+ if (!scopeTransition.containsKey(curScope) || !scopeTransition.get(curScope).containsKey(scope))
+ {
+ throw new IllegalArgumentException("Can't build scope using " + configScopeBuilder);
+ }
+ keys.add(scopeMap.get(scope));
+ curScope = scopeTransition.get(curScope).get(scope);
+ }
+ }
+ }
+
+ if (curScope == ConfigScopeProperty.CLUSTER)
+ {
+ // append one more {clusterName}
+ keys.add(scopeMap.get(ConfigScopeProperty.CLUSTER));
+ }
+
+ String scopeStr = template.instantiate(curScope, keys.toArray(new String[0]));
+
+ _clusterName = keys.get(0);
+ _scopeStr = scopeStr;
+ _scope = curScope;
+ }
+
+ public ConfigScopeProperty getScope()
+ {
+ return _scope;
+ }
+
+ public String getClusterName()
+ {
+ return _clusterName;
+ }
+
+ public String getScopeStr()
+ {
+ return _scopeStr;
+ }
+
+ @Override
+ public String toString()
+ {
+ return super.toString() + ": " + _scopeStr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java b/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java
new file mode 100644
index 0000000..ec42fef
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ConfigScopeBuilder.java
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigScope.ConfigScopeProperty;
+import org.apache.helix.util.StringTemplate;
+import org.apache.log4j.Logger;
+
+
+public class ConfigScopeBuilder
+{
+ private static Logger LOG = Logger.getLogger(ConfigScopeBuilder.class);
+
+ private static StringTemplate template = new StringTemplate();
+ static
+ {
+ // @formatter:off
+ template.addEntry(ConfigScopeProperty.CLUSTER, 1, "CLUSTER={clusterName}");
+ template.addEntry(ConfigScopeProperty.RESOURCE, 2, "CLUSTER={clusterName},RESOURCE={resourceName}");
+ template.addEntry(ConfigScopeProperty.PARTITION, 3, "CLUSTER={clusterName},RESOURCE={resourceName},PARTITION={partitionName}");
+ template.addEntry(ConfigScopeProperty.PARTICIPANT, 2, "CLUSTER={clusterName},PARTICIPANT={participantName}");
+ // @formatter:on
+ }
+
+ private final Map<ConfigScopeProperty, String> _scopeMap;
+
+ public Map<ConfigScopeProperty, String> getScopeMap()
+ {
+ return _scopeMap;
+ }
+
+ public ConfigScopeBuilder()
+ {
+ _scopeMap = new HashMap<ConfigScopeProperty, String>();
+ }
+
+ public ConfigScopeBuilder forCluster(String clusterName)
+ {
+ _scopeMap.put(ConfigScopeProperty.CLUSTER, clusterName);
+ return this;
+ }
+
+ public ConfigScopeBuilder forParticipant(String participantName)
+ {
+ _scopeMap.put(ConfigScopeProperty.PARTICIPANT, participantName);
+ return this;
+ }
+
+ public ConfigScopeBuilder forResource(String resourceName)
+ {
+ _scopeMap.put(ConfigScopeProperty.RESOURCE, resourceName);
+ return this;
+
+ }
+
+ public ConfigScopeBuilder forPartition(String partitionName)
+ {
+ _scopeMap.put(ConfigScopeProperty.PARTITION, partitionName);
+ return this;
+
+ }
+
+ public ConfigScope build()
+ {
+ // TODO: validate the scopes map
+ return new ConfigScope(this);
+ }
+
+ public ConfigScope build(ConfigScopeProperty scope, String clusterName, String... scopeKeys)
+ {
+ if (scopeKeys == null)
+ {
+ scopeKeys = new String[]{};
+ }
+
+ String[] args = new String[1 + scopeKeys.length];
+ args[0] = clusterName;
+ System.arraycopy(scopeKeys, 0, args, 1, scopeKeys.length);
+ String scopePairs = template.instantiate(scope, args);
+
+ return build(scopePairs);
+ }
+
+ public ConfigScope build(String scopePairs)
+ {
+ String[] scopes = scopePairs.split("[\\s,]+");
+ for (String scope : scopes)
+ {
+ try
+ {
+ int idx = scope.indexOf('=');
+ if (idx == -1)
+ {
+ LOG.error("Invalid scope string: " + scope);
+ continue;
+ }
+
+ String scopeStr = scope.substring(0, idx);
+ String value = scope.substring(idx + 1);
+ ConfigScopeProperty scopeProperty = ConfigScopeProperty.valueOf(scopeStr);
+ _scopeMap.put(scopeProperty, value);
+ } catch (Exception e)
+ {
+ LOG.error("Invalid scope string: " + scope);
+ continue;
+ }
+ }
+
+ return build();
+ }
+
+ @Override
+ public String toString()
+ {
+ return _scopeMap.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java b/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java
new file mode 100644
index 0000000..6bcafde
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ControllerChangeListener.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+
+public interface ControllerChangeListener
+{
+ /**
+ * Invoked when controller changes
+ *
+ * @param changeContext
+ */
+ public void onControllerChange(NotificationContext changeContext);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/Criteria.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/Criteria.java b/helix-core/src/main/java/org/apache/helix/Criteria.java
new file mode 100644
index 0000000..302b4ee
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/Criteria.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+public class Criteria
+{
+ public enum DataSource
+ {
+ IDEALSTATES, EXTERNALVIEW
+ }
+ /**
+ * This can be CONTROLLER, PARTICIPANT, ROUTER Cannot be null
+ */
+ InstanceType recipientInstanceType;
+ /**
+ * If true this will only be process by the instance that was running when the
+ * message was sent. If the instance process dies and comes back up it will be
+ * ignored.
+ */
+ boolean sessionSpecific;
+ /**
+ * applicable only in case PARTICIPANT use * to broadcast to all instances
+ */
+ String instanceName = "";
+ /**
+ * Name of the resource. Use * to send message to all resources
+ * owned by an instance.
+ */
+ String resourceName = "";
+ /**
+ * Resource partition. Use * to send message to all partitions of a given
+ * resource
+ */
+ String partitionName = "";
+ /**
+ * State of the resource
+ */
+ String partitionState = "";
+ /**
+ * Exclude sending message to your self. True by default
+ */
+ boolean selfExcluded = true;
+ /**
+ * Determine if use external view or ideal state as source of truth
+ */
+ DataSource _dataSource = DataSource.EXTERNALVIEW;
+
+ public DataSource getDataSource()
+ {
+ return _dataSource;
+ }
+
+ public void setDataSource(DataSource source)
+ {
+ _dataSource = source;
+ }
+
+ public boolean isSelfExcluded()
+ {
+ return selfExcluded;
+ }
+
+ public void setSelfExcluded(boolean selfExcluded)
+ {
+ this.selfExcluded = selfExcluded;
+ }
+
+ public InstanceType getRecipientInstanceType()
+ {
+ return recipientInstanceType;
+ }
+
+ public void setRecipientInstanceType(InstanceType recipientInstanceType)
+ {
+ this.recipientInstanceType = recipientInstanceType;
+ }
+
+ public boolean isSessionSpecific()
+ {
+ return sessionSpecific;
+ }
+
+ public void setSessionSpecific(boolean sessionSpecific)
+ {
+ this.sessionSpecific = sessionSpecific;
+ }
+
+ public String getInstanceName()
+ {
+ return instanceName;
+ }
+
+ public void setInstanceName(String instanceName)
+ {
+ this.instanceName = instanceName;
+ }
+
+ public String getResource()
+ {
+ return resourceName;
+ }
+
+ public void setResource(String resourceName)
+ {
+ this.resourceName = resourceName;
+ }
+
+ public String getPartition()
+ {
+ return partitionName;
+ }
+
+ public void setPartition(String partitionName)
+ {
+ this.partitionName = partitionName;
+ }
+
+ public String getPartitionState()
+ {
+ return partitionState;
+ }
+
+ public void setPartitionState(String partitionState)
+ {
+ this.partitionState = partitionState;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("instanceName").append("=").append(instanceName);
+ sb.append("resourceName").append("=").append(resourceName);
+ sb.append("partitionName").append("=").append(partitionName);
+ sb.append("partitionState").append("=").append(partitionState);
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java
new file mode 100644
index 0000000..cad9e67
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/CurrentStateChangeListener.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.CurrentState;
+
+
+public interface CurrentStateChangeListener
+{
+
+ /**
+ * Invoked when current state changes
+ *
+ * @param instanceName
+ * @param statesInfo
+ * @param changeContext
+ */
+ public void onStateChange(String instanceName,
+ List<CurrentState> statesInfo,
+ NotificationContext changeContext);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/DataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/DataAccessor.java b/helix-core/src/main/java/org/apache/helix/DataAccessor.java
new file mode 100644
index 0000000..1fa7171
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/DataAccessor.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+import java.util.Map;
+/**
+ * use {@link HelixDataAccessor}.
+ */
+@Deprecated
+public interface DataAccessor
+{
+
+ /**
+ * Set a property, overwrite if it exists and creates if not exists
+ *
+ * @param type
+ * @param value
+ * @param keys
+ * @true if the operation was successful
+ */
+ boolean setProperty(PropertyType type, ZNRecord value, String... keys);
+
+ boolean setProperty(PropertyType type, HelixProperty value, String... keys);
+
+ /**
+ * Updates a property, either overwrite or merge based on the
+ * propertyType.mergeOnUpdate, fails to update if
+ * propertyType.updateOnlyOnExists and does not exist
+ *
+ * @param type
+ * @param value
+ * @param keys
+ * @return true if the update was successful
+ */
+ boolean updateProperty(PropertyType type, ZNRecord value, String... keys);
+
+ boolean updateProperty(PropertyType type, HelixProperty value, String... keys);
+
+ /**
+ * Return the property value, it must be a leaf
+ *
+ * @param type
+ * @param keys
+ * one or more keys used to get the path of znode
+ * @return value, Null if absent or on error
+ */
+ ZNRecord getProperty(PropertyType type, String... keys);
+
+ <T extends HelixProperty> T getProperty(Class<T> clazz, PropertyType type, String... keys);
+
+ /**
+ * Removes the property
+ *
+ * @param type
+ * @param keys
+ * @return
+ */
+ boolean removeProperty(PropertyType type, String... keys);
+
+ /**
+ * Return the child names of the property
+ *
+ * @param type
+ * @param keys
+ * @return SubPropertyNames
+ */
+ List<String> getChildNames(PropertyType type, String... keys);
+
+ /**
+ *
+ * @param type
+ * @param keys
+ * must point to parent of leaf znodes
+ * @return subPropertyValues
+ */
+ List<ZNRecord> getChildValues(PropertyType type, String... keys);
+
+ <T extends HelixProperty> List<T> getChildValues(Class<T> clazz, PropertyType type,
+ String... keys);
+
+ <T extends HelixProperty> Map<String, T> getChildValuesMap(Class<T> clazz, PropertyType type,
+ String... keys);
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java b/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java
new file mode 100644
index 0000000..0d52e4c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/ExternalViewChangeListener.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.ExternalView;
+
+
+public interface ExternalViewChangeListener
+{
+
+ /**
+ * Invoked when external view changes
+ *
+ * @param externalViewList
+ * @param changeContext
+ */
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
new file mode 100644
index 0000000..5a670dd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -0,0 +1,168 @@
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.log4j.Logger;
+
+// TODO: move to mananger.zk
+public class GroupCommit
+{
+ private static Logger LOG = Logger.getLogger(GroupCommit.class);
+ private static class Queue
+ {
+ final AtomicReference<Thread> _running = new AtomicReference<Thread>();
+ final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>();
+ }
+
+ private static class Entry
+ {
+ final String _key;
+ final ZNRecord _record;
+ AtomicBoolean _sent = new AtomicBoolean(false);
+
+ Entry(String key, ZNRecord record)
+ {
+ _key = key;
+ _record = record;
+ }
+ }
+
+ private final Queue[] _queues = new Queue[100];
+
+ // potential memory leak if we add resource and remove resource
+ // TODO: move the cache logic to data accessor
+// private final Map<String, ZNRecord> _cache = new ConcurrentHashMap<String, ZNRecord>();
+
+
+ public GroupCommit()
+ {
+ // Don't use Arrays.fill();
+ for (int i = 0; i < _queues.length; ++i)
+ {
+ _queues[i] = new Queue();
+ }
+ }
+
+ private Queue getQueue(String key)
+ {
+ return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
+ }
+
+ public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key, ZNRecord record)
+ {
+ Queue queue = getQueue(key);
+ Entry entry = new Entry(key, record);
+
+ queue._pending.add(entry);
+
+ while (!entry._sent.get())
+ {
+ if (queue._running.compareAndSet(null, Thread.currentThread()))
+ {
+ ArrayList<Entry> processed = new ArrayList<Entry>();
+ try
+ {
+ if (queue._pending.peek() == null)
+ return true;
+
+ // remove from queue
+ Entry first = queue._pending.poll();
+ processed.add(first);
+
+ String mergedKey = first._key;
+// ZNRecord merged = _cache.get(mergedKey);
+ ZNRecord merged = null;
+
+ try
+ {
+ // accessor will fallback to zk if not found in cache
+ merged = accessor.get(mergedKey, null, options);
+ }
+ catch (ZkNoNodeException e)
+ {
+ // OK.
+ }
+
+ /**
+ * If the local cache does not contain a value, need to check if there is a
+ * value in ZK; use it as initial value if exists
+ */
+ if (merged == null)
+ {
+// ZNRecord valueOnZk = null;
+// try
+// {
+// valueOnZk = accessor.get(mergedKey, null, 0);
+// }
+// catch(Exception e)
+// {
+// LOG.info(e);
+// }
+// if(valueOnZk != null)
+// {
+// merged = valueOnZk;
+// merged.merge(first._record);
+// }
+// else // Zk path has null data. use the first record as initial record.
+ {
+ merged = new ZNRecord(first._record);
+ }
+ }
+ else
+ {
+ merged.merge(first._record);
+ }
+ Iterator<Entry> it = queue._pending.iterator();
+ while (it.hasNext())
+ {
+ Entry ent = it.next();
+ if (!ent._key.equals(mergedKey))
+ continue;
+ processed.add(ent);
+ merged.merge(ent._record);
+ // System.out.println("After merging:" + merged);
+ it.remove();
+ }
+ // System.out.println("size:"+ processed.size());
+ accessor.set(mergedKey, merged, options);
+ // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
+ // _cache.put(mergedKey, merged);
+ }
+ finally
+ {
+ queue._running.set(null);
+ for (Entry e : processed)
+ {
+ synchronized (e)
+ {
+ e._sent.set(true);
+ e.notify();
+ }
+ }
+ }
+ }
+ else
+ {
+ synchronized (entry)
+ {
+ try
+ {
+ entry.wait(10);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java
new file mode 100644
index 0000000..bf5d82d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HealthStateChangeListener.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix;
+
+import java.util.List;
+
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.Message;
+
+
+public interface HealthStateChangeListener
+{
+
+ /**
+ * Invoked when health stats change
+ *
+ * @param instanceName
+ * @param reports
+ * @param changeContext
+ */
+ public void onHealthChange(String instanceName,
+ List<HealthStat> reports,
+ NotificationContext changeContext);
+}