You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 19:13:15 UTC

[1/7] git commit: [HELIX-455] Add REST API for submitting jobs

Repository: helix
Updated Branches:
  refs/heads/master 713586c42 -> 6d30c9c58


[HELIX-455] Add REST API for submitting jobs


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/208f1fa5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/208f1fa5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/208f1fa5

Branch: refs/heads/master
Commit: 208f1fa5211774778fbe2f7ba2254b7c91eaa04c
Parents: 713586c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jun 27 11:39:39 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 10:31:06 2014 -0700

----------------------------------------------------------------------
 .../helix/webapp/RestAdminApplication.java      |   2 +
 .../webapp/resources/WorkflowsResource.java     | 138 +++++++++++++++++++
 2 files changed, 140 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/208f1fa5/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
index 0940c39..7e7a3b9 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
@@ -50,6 +50,7 @@ import org.apache.helix.webapp.resources.StateModelResource;
 import org.apache.helix.webapp.resources.StateModelsResource;
 import org.apache.helix.webapp.resources.StatusUpdateResource;
 import org.apache.helix.webapp.resources.StatusUpdatesResource;
+import org.apache.helix.webapp.resources.WorkflowsResource;
 import org.apache.helix.webapp.resources.ZkChildResource;
 import org.apache.helix.webapp.resources.ZkPathResource;
 import org.restlet.Application;
@@ -90,6 +91,7 @@ public class RestAdminApplication extends Application {
     router.attach("/clusters/{clusterName}/resourceGroups", ResourceGroupsResource.class);
     router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}",
         ResourceGroupResource.class);
+    router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class);
     router.attach("/clusters/{clusterName}/instances", InstancesResource.class);
     router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class);
     router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}",

http://git-wip-us.apache.org/repos/asf/helix/blob/208f1fa5/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
new file mode 100644
index 0000000..f09155b
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -0,0 +1,138 @@
+package org.apache.helix.webapp.resources;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Parameter;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import com.google.common.collect.Lists;
+
+public class WorkflowsResource extends ServerResource {
+  private final static Logger LOG = Logger.getLogger(WorkflowsResource.class);
+
+  public WorkflowsResource() {
+    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+    setNegotiated(false);
+  }
+
+  @Override
+  public Representation get() {
+    StringRepresentation presentation = null;
+    try {
+      String clusterName = (String) getRequest().getAttributes().get("clusterName");
+      presentation = getHostedEntitiesRepresentation(clusterName);
+    }
+
+    catch (Exception e) {
+      String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+      presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+      LOG.error("", e);
+    }
+    return presentation;
+  }
+
+  StringRepresentation getHostedEntitiesRepresentation(String clusterName)
+      throws JsonGenerationException, JsonMappingException, IOException {
+    // Get all resources
+    ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+    HelixDataAccessor accessor =
+        ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+
+    // Create the result
+    ZNRecord hostedEntitiesRecord = new ZNRecord("Workflows");
+
+    // Filter out non-workflow resources
+    Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, HelixProperty> e = it.next();
+      HelixProperty resource = e.getValue();
+      Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
+      if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
+          || !simpleFields.containsKey(WorkflowConfig.DAG)) {
+        it.remove();
+      }
+    }
+
+    // Populate the result
+    List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet());
+    hostedEntitiesRecord.setListField("WorkflowList", allResources);
+
+    StringRepresentation representation =
+        new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+            MediaType.APPLICATION_JSON);
+
+    return representation;
+  }
+
+  @Override
+  public Representation post(Representation entity) {
+    try {
+      String clusterName = (String) getRequest().getAttributes().get("clusterName");
+      Form form = new Form(entity);
+
+      // Get the workflow and submit it
+      if (form.size() < 1) {
+        throw new HelixException("yaml workflow is required!");
+      }
+      Parameter payload = form.get(0);
+      String yamlPayload = payload.getName();
+      if (yamlPayload == null) {
+        throw new HelixException("yaml workflow is required!");
+      }
+      String zkAddr =
+          (String) getContext().getAttributes().get(RestAdminApplication.ZKSERVERADDRESS);
+      HelixManager manager =
+          HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.ADMINISTRATOR,
+              zkAddr);
+      manager.connect();
+      try {
+        Workflow workflow = Workflow.parse(yamlPayload);
+        TaskDriver driver = new TaskDriver(manager);
+        driver.start(workflow);
+      } finally {
+        manager.disconnect();
+      }
+
+      getResponse().setEntity(getHostedEntitiesRepresentation(clusterName));
+      getResponse().setStatus(Status.SUCCESS_OK);
+    }
+
+    catch (Exception e) {
+      getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+          MediaType.APPLICATION_JSON);
+      getResponse().setStatus(Status.SUCCESS_OK);
+      LOG.error("Error in posting " + entity, e);
+    }
+    return null;
+  }
+}


[4/7] git commit: [HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core

Posted by ka...@apache.org.
[HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/85277291
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/85277291
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/85277291

Branch: refs/heads/master
Commit: 85277291607154f6d63ce0a8e401e221e55cc95c
Parents: e914edb
Author: zzhang <zz...@apache.org>
Authored: Tue May 20 17:56:01 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 11:32:43 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   7 +-
 .../java/org/apache/helix/PropertyType.java     |  12 -
 .../helix/controller/HelixControllerMain.java   |  19 -
 .../restlet/ZKPropertyTransferServer.java       | 247 -----------
 .../controller/restlet/ZNRecordUpdate.java      |  83 ----
 .../restlet/ZNRecordUpdateResource.java         |  79 ----
 .../restlet/ZkPropertyTransferApplication.java  |  45 --
 .../restlet/ZkPropertyTransferClient.java       | 177 --------
 .../helix/controller/restlet/package-info.java  |  23 -
 .../manager/zk/ParticipantManagerHelper.java    |   1 -
 .../helix/manager/zk/ZKHelixDataAccessor.java   |  71 +---
 .../apache/helix/manager/zk/ZKHelixManager.java |   2 -
 .../manager/zk/ZNRecordStreamingSerializer.java |   6 +-
 .../helix/manager/zk/ZkHelixLeaderElection.java |  10 -
 .../helix/manager/zk/ZkHelixParticipant.java    |   2 -
 .../healthcheck/TestAlertActionTriggering.java  | 224 ----------
 .../helix/healthcheck/TestAlertFireHistory.java | 422 -------------------
 .../helix/integration/TestAutoRebalance.java    |   6 +-
 .../TestAutoRebalancePartitionLimit.java        |   2 +-
 .../TestCustomizedIdealStateRebalancer.java     |   2 +-
 .../helix/integration/TestDisableNode.java      |   2 +-
 .../helix/integration/TestDisablePartition.java |   2 +-
 .../helix/integration/TestDropResource.java     |   2 +-
 .../helix/integration/TestMessagingService.java |   4 +-
 .../helix/integration/TestSchedulerMessage.java |   6 +-
 .../TestUserDefRebalancerCompatibility.java     |   3 +-
 .../integration/TestZkCallbackHandlerLeak.java  |  77 ++--
 ...dAloneCMTestBaseWithPropertyServerCheck.java |  88 ----
 .../manager/TestConsecutiveZkSessionExpiry.java |   4 +-
 .../TestDistributedControllerManager.java       |   4 +-
 .../manager/TestZkCallbackHandlerLeak.java      |  76 ++--
 .../manager/zk/TestLiveInstanceBounce.java      |   4 +-
 .../zk/TestZKPropertyTransferServer.java        |  63 ---
 .../manager/zk/TestZkStateChangeListener.java   |   5 +-
 34 files changed, 106 insertions(+), 1674 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index b8e7a9a..0d06914 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,6 @@ under the License.
       org.apache.zookeeper.txn*;resolution:=optional,
       org.apache.zookeeper*;version="[3.3,4)",
       org.codehaus.jackson*;version="[1.8,2)",
-      org.restlet;version="[2.2.1,3]",
       *
     </osgi.import>
     <osgi.ignore>
@@ -131,9 +130,9 @@ under the License.
       <version>2.1</version>
     </dependency>
     <dependency>
-      <groupId>org.restlet.jse</groupId>
-      <artifactId>org.restlet</artifactId>
-      <version>2.2.1</version>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.6</version>
     </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/PropertyType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index 75adb20..579f454 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -85,8 +85,6 @@ public enum PropertyType {
    */
   boolean isCached;
 
-  boolean usePropertyTransferServer;
-
   private PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate) {
     this(type, isPersistent, mergeOnUpdate, false);
   }
@@ -114,7 +112,6 @@ public enum PropertyType {
     this.updateOnlyOnExists = updateOnlyOnExists;
     this.createOnlyIfAbsent = createOnlyIfAbsent;
     this.isCached = isCached;
-    this.usePropertyTransferServer = isAsyncWrite;
   }
 
   /**
@@ -204,13 +201,4 @@ public enum PropertyType {
   public boolean isCached() {
     return isCached;
   }
-
-  /**
-   * Check if this property uses a property transfer server
-   * @return true if a property transfer server is used, false otherwise
-   */
-  public boolean usePropertyTransferServer() {
-    return usePropertyTransferServer;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 62f3b23..b6c16b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -47,7 +47,6 @@ import org.apache.commons.cli.ParseException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.manager.zk.HelixManagerShutdownHook;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
@@ -58,7 +57,6 @@ public class HelixControllerMain {
   public static final String cluster = "cluster";
   public static final String help = "help";
   public static final String mode = "mode";
-  public static final String propertyTransferServicePort = "propertyTransferPort";
   public static final String name = "controllerName";
   public static final String STANDALONE = "STANDALONE";
   public static final String DISTRIBUTED = "DISTRIBUTED";
@@ -101,19 +99,11 @@ public class HelixControllerMain {
     controllerNameOption.setRequired(false);
     controllerNameOption.setArgName("Cluster controller name (Optional)");
 
-    Option portOption =
-        OptionBuilder.withLongOpt(propertyTransferServicePort)
-            .withDescription("Webservice port for ZkProperty controller transfer").create();
-    portOption.setArgs(1);
-    portOption.setRequired(false);
-    portOption.setArgName("Cluster controller property transfer port (Optional)");
-
     Options options = new Options();
     options.addOption(helpOption);
     options.addOption(zkServerOption);
     options.addOption(clusterOption);
     options.addOption(modeOption);
-    options.addOption(portOption);
     options.addOption(controllerNameOption);
 
     return options;
@@ -208,15 +198,11 @@ public class HelixControllerMain {
     String clusterName = cmd.getOptionValue(cluster);
     String controllerMode = STANDALONE;
     String controllerName = null;
-    int propertyTransServicePort = -1;
 
     if (cmd.hasOption(mode)) {
       controllerMode = cmd.getOptionValue(mode);
     }
 
-    if (cmd.hasOption(propertyTransferServicePort)) {
-      propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
-    }
     if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name)) {
       throw new IllegalArgumentException(
           "A unique cluster controller name is required in DISTRIBUTED mode");
@@ -228,10 +214,6 @@ public class HelixControllerMain {
     logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
         + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
 
-    if (propertyTransServicePort > 0) {
-      ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
-    }
-
     HelixManager manager =
         startHelixController(zkConnectString, clusterName, controllerName, controllerMode);
 
@@ -244,7 +226,6 @@ public class HelixControllerMain {
           + " interrupted");
     } finally {
       manager.disconnect();
-      ZKPropertyTransferServer.getInstance().shutdown();
     }
 
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
deleted file mode 100644
index 66bd257..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZKPropertyTransferServer.java
+++ /dev/null
@@ -1,247 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.restlet.Component;
-import org.restlet.Context;
-import org.restlet.data.Protocol;
-
-/**
- * Controller side restlet server that receives ZNRecordUpdate requests from
- * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is
- * to optimize the concurrency level of zookeeper access for ZNRecord updates
- * that does not require real-time, like message handling status updates and
- * healthcheck reports.
- * As one server will be used by multiple helix controllers that runs on the same machine,
- * This class is designed as a singleton. Application is responsible to call init()
- * and shutdown() on the getInstance().
- */
-public class ZKPropertyTransferServer {
-  public static final String PORT = "port";
-  public static String RESTRESOURCENAME = "ZNRecordUpdates";
-  public static final String SERVER = "ZKPropertyTransferServer";
-
-  // Frequency period for the ZNRecords are batch written to zookeeper
-  public static int PERIOD = 10 * 1000;
-  // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
-  public static int MAX_UPDATE_LIMIT = 10000;
-  private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
-
-  int _localWebservicePort;
-  String _webserviceUrl;
-  ZkBaseDataAccessor<ZNRecord> _accessor;
-  String _zkAddress;
-
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
-      new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-
-  boolean _initialized = false;
-  boolean _shutdownFlag = false;
-  Component _component = null;
-  Timer _timer = null;
-
-  static {
-    org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
-  }
-
-  /**
-   * Timertask for zookeeper batched writes
-   */
-  class ZKPropertyTransferTask extends TimerTask {
-    @Override
-    public void run() {
-      try {
-        sendData();
-      } catch (Throwable t) {
-        LOG.error("", t);
-      }
-
-    }
-  }
-
-  void sendData() {
-    LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
-    ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
-
-    synchronized (_dataBufferRef) {
-      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-
-    if (updateCache != null) {
-      List<String> paths = new ArrayList<String>();
-      List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
-      List<ZNRecord> vals = new ArrayList<ZNRecord>();
-      // BUGBUG : what if the instance is dropped?
-      for (ZNRecordUpdate holder : updateCache.values()) {
-        paths.add(holder.getPath());
-        updaters.add(holder.getZNRecordUpdater());
-        vals.add(holder.getRecord());
-      }
-      // Batch write the accumulated updates into zookeeper
-      long timeStart = System.currentTimeMillis();
-      if (paths.size() > 0) {
-        _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
-      }
-      LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in "
-          + (System.currentTimeMillis() - timeStart) + " ms");
-    } else {
-      LOG.warn("null _dataQueueRef. Should be in the beginning only");
-    }
-  }
-
-  static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
-
-  private ZKPropertyTransferServer() {
-    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-  }
-
-  public static ZKPropertyTransferServer getInstance() {
-    return _instance;
-  }
-
-  public boolean isInitialized() {
-    return _initialized;
-  }
-
-  public void init(int localWebservicePort, String zkAddress) {
-    if (!_initialized && !_shutdownFlag) {
-      LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
-      _localWebservicePort = localWebservicePort;
-      ZkClient zkClient = new ZkClient(zkAddress);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
-      _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
-      _zkAddress = zkAddress;
-      startServer();
-    } else {
-      LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: "
-          + _shutdownFlag);
-    }
-  }
-
-  public String getWebserviceUrl() {
-    if (!_initialized || _shutdownFlag) {
-      LOG.debug("inited:" + _initialized + " shutdownFlag:" + _shutdownFlag + " , return");
-      return null;
-    }
-    return _webserviceUrl;
-  }
-
-  /**
-   * Add an ZNRecordUpdate to the change queue.
-   * Called by the webservice front-end.
-   */
-  void enqueueData(ZNRecordUpdate e) {
-    if (!_initialized || _shutdownFlag) {
-      LOG.error("zkDataTransferServer inited:" + _initialized + " shutdownFlag:" + _shutdownFlag
-          + " , return");
-      return;
-    }
-    // Do local merge if receive multiple update on the same path
-    synchronized (_dataBufferRef) {
-      e.getRecord().setSimpleField(SERVER, _webserviceUrl);
-      if (_dataBufferRef.get().containsKey(e.getPath())) {
-        ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
-        oldVal = e.getZNRecordUpdater().update(oldVal);
-        _dataBufferRef.get().get(e.getPath())._record = oldVal;
-      } else {
-        _dataBufferRef.get().put(e.getPath(), e);
-      }
-    }
-    if (_dataBufferRef.get().size() > MAX_UPDATE_LIMIT) {
-      sendData();
-    }
-  }
-
-  void startServer() {
-    LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress "
-        + _zkAddress);
-
-    _component = new Component();
-
-    _component.getServers().add(Protocol.HTTP, _localWebservicePort);
-    Context applicationContext = _component.getContext().createChildContext();
-    applicationContext.getAttributes().put(SERVER, this);
-    applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
-    ZkPropertyTransferApplication application =
-        new ZkPropertyTransferApplication(applicationContext);
-    // Attach the application to the component and start it
-    _component.getDefaultHost().attach(application);
-    _timer = new Timer(true);
-    _timer.schedule(new ZKPropertyTransferTask(), PERIOD, PERIOD);
-
-    try {
-      _webserviceUrl =
-          "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":"
-              + _localWebservicePort + "/" + RESTRESOURCENAME;
-      _component.start();
-      _initialized = true;
-    } catch (Exception e) {
-      LOG.error("", e);
-    }
-    LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress "
-        + _zkAddress);
-  }
-
-  public void shutdown() {
-    if (_shutdownFlag) {
-      LOG.error("ZKPropertyTransferServer already has been shutdown...");
-      return;
-    }
-    LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress "
-        + _zkAddress);
-    if (_timer != null) {
-      _timer.cancel();
-    }
-    if (_component != null) {
-      try {
-        _component.stop();
-      } catch (Exception e) {
-        LOG.error("", e);
-      }
-    }
-    _shutdownFlag = true;
-  }
-
-  public void reset() {
-    if (_shutdownFlag == true) {
-      _shutdownFlag = false;
-      _initialized = false;
-      _component = null;
-      _timer = null;
-      _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
deleted file mode 100644
index deef748..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdate.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordUpdater;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonIgnore;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- * Unit of transfered ZNRecord updates. Contains the ZNRecord Value, zkPath
- * to store the update value, and the property type (used to merge the ZNRecord)
- * For ZNRecord subtraction, it is currently not supported yet.
- */
-public class ZNRecordUpdate {
-  public enum OpCode {
-    // TODO: create is not supported; but update will create if not exist
-    CREATE,
-    UPDATE,
-    SET
-  }
-
-  final String _path;
-  ZNRecord _record;
-  final OpCode _code;
-
-  @JsonCreator
-  public ZNRecordUpdate(@JsonProperty("path") String path, @JsonProperty("opcode") OpCode code,
-      @JsonProperty("record") ZNRecord record) {
-    _path = path;
-    _record = record;
-    _code = code;
-  }
-
-  public String getPath() {
-    return _path;
-  }
-
-  public ZNRecord getRecord() {
-    return _record;
-  }
-
-  public OpCode getOpcode() {
-    return _code;
-  }
-
-  @JsonIgnore(true)
-  public DataUpdater<ZNRecord> getZNRecordUpdater() {
-    if (_code == OpCode.SET)
-
-    {
-      return new ZNRecordUpdater(_record) {
-        @Override
-        public ZNRecord update(ZNRecord current) {
-          return _record;
-        }
-      };
-    } else if ((_code == OpCode.UPDATE)) {
-      return new ZNRecordUpdater(_record);
-    } else {
-      throw new UnsupportedOperationException("Not supported : " + _code);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
deleted file mode 100644
index 3877686..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZNRecordUpdateResource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringReader;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.restlet.data.Form;
-import org.restlet.data.MediaType;
-import org.restlet.data.Status;
-import org.restlet.representation.Representation;
-import org.restlet.representation.Variant;
-import org.restlet.resource.ServerResource;
-
-/**
- * REST resource for ZkPropertyTransfer server to receive PUT requests
- * that submits ZNRecordUpdates
- */
-public class ZNRecordUpdateResource extends ServerResource {
-  public static final String UPDATEKEY = "ZNRecordUpdate";
-  private static Logger LOG = Logger.getLogger(ZNRecordUpdateResource.class);
-
-  public ZNRecordUpdateResource() {
-    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
-    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
-    setNegotiated(false);
-  }
-
-  @Override
-  public Representation put(Representation entity) {
-    try {
-      ZKPropertyTransferServer server = ZKPropertyTransferServer.getInstance();
-
-      Form form = new Form(entity);
-      String jsonPayload = form.getFirstValue(UPDATEKEY, true);
-
-      // Parse the map from zkPath --> ZNRecordUpdate from the payload
-      StringReader sr = new StringReader(jsonPayload);
-      ObjectMapper mapper = new ObjectMapper();
-      TypeReference<TreeMap<String, ZNRecordUpdate>> typeRef =
-          new TypeReference<TreeMap<String, ZNRecordUpdate>>() {
-          };
-      Map<String, ZNRecordUpdate> holderMap = mapper.readValue(sr, typeRef);
-      // Enqueue the ZNRecordUpdate for sending
-      for (ZNRecordUpdate holder : holderMap.values()) {
-        server.enqueueData(holder);
-        LOG.info("Received " + holder.getPath() + " from "
-            + getRequest().getClientInfo().getAddress());
-      }
-      getResponse().setStatus(Status.SUCCESS_OK);
-    } catch (Exception e) {
-      LOG.error("", e);
-      getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
-    }
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
deleted file mode 100644
index 68d35cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferApplication.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.restlet.Application;
-import org.restlet.Context;
-import org.restlet.Restlet;
-import org.restlet.routing.Router;
-
-/**
- * Restlet application for ZkPropertyTransfer server
- */
-public class ZkPropertyTransferApplication extends Application {
-  public ZkPropertyTransferApplication() {
-    super();
-  }
-
-  public ZkPropertyTransferApplication(Context context) {
-    super(context);
-  }
-
-  @Override
-  public Restlet createInboundRoot() {
-    Router router = new Router(getContext());
-    router.attach("/" + ZKPropertyTransferServer.RESTRESOURCENAME, ZNRecordUpdateResource.class);
-    return router;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
deleted file mode 100644
index 092d845..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/ZkPropertyTransferClient.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.controller.restlet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.restlet.Client;
-import org.restlet.Request;
-import org.restlet.Response;
-import org.restlet.data.MediaType;
-import org.restlet.data.Method;
-import org.restlet.data.Protocol;
-import org.restlet.data.Reference;
-import org.restlet.data.Status;
-
-public class ZkPropertyTransferClient {
-  private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
-  public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
-  public static int SEND_PERIOD = 10 * 1000;
-
-  public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
-
-  int _maxConcurrentTasks;
-  ExecutorService _executorService;
-  Client[] _clients;
-  AtomicInteger _requestCount = new AtomicInteger(0);
-
-  // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
-  AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef =
-      new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
-  Timer _timer;
-  volatile String _webServiceUrl = "";
-
-  static {
-    org.restlet.engine.Engine.setLogLevel(Level.SEVERE);
-  }
-
-  public ZkPropertyTransferClient(int maxConcurrentTasks) {
-    _maxConcurrentTasks = maxConcurrentTasks;
-    _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
-    _clients = new Client[_maxConcurrentTasks];
-    for (int i = 0; i < _clients.length; i++) {
-      _clients[i] = new Client(Protocol.HTTP);
-    }
-    _timer = new Timer(true);
-    _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
-    _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-  }
-
-  class SendZNRecordTimerTask extends TimerTask {
-    @Override
-    public void run() {
-      sendUpdateBatch();
-    }
-  }
-
-  public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl) {
-    try {
-      LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to "
-          + webserviceUrl);
-      _webServiceUrl = webserviceUrl;
-      update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
-      synchronized (_dataBufferRef) {
-        if (_dataBufferRef.get().containsKey(update._path)) {
-          ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
-          oldVal = update.getZNRecordUpdater().update(oldVal);
-          _dataBufferRef.get().get(update.getPath())._record = oldVal;
-        } else {
-          _dataBufferRef.get().put(update.getPath(), update);
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("", e);
-    }
-  }
-
-  void sendUpdateBatch() {
-    LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to "
-        + _webServiceUrl);
-    Map<String, ZNRecordUpdate> updateCache = null;
-
-    synchronized (_dataBufferRef) {
-      updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
-    }
-
-    if (updateCache != null && updateCache.size() > 0) {
-      ZNRecordUpdateUploadTask task =
-          new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl,
-              _clients[_requestCount.intValue() % _maxConcurrentTasks]);
-      _requestCount.incrementAndGet();
-      _executorService.submit(task);
-      LOG.trace("Queue size :" + ((ThreadPoolExecutor) _executorService).getQueue().size());
-    }
-  }
-
-  public void shutdown() {
-    LOG.info("Shutting down ZkPropertyTransferClient");
-    _executorService.shutdown();
-    _timer.cancel();
-    for (Client client : _clients) {
-      try {
-        client.stop();
-      } catch (Exception e) {
-        LOG.error("", e);
-      }
-    }
-  }
-
-  class ZNRecordUpdateUploadTask implements Callable<Void> {
-    Map<String, ZNRecordUpdate> _updateMap;
-    String _webServiceUrl;
-    Client _client;
-
-    ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client) {
-      _updateMap = update;
-      _webServiceUrl = webserviceUrl;
-      _client = client;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
-      long time = System.currentTimeMillis();
-      Reference resourceRef = new Reference(_webServiceUrl);
-      Request request = new Request(Method.PUT, resourceRef);
-
-      ObjectMapper mapper = new ObjectMapper();
-      StringWriter sw = new StringWriter();
-      try {
-        mapper.writeValue(sw, _updateMap);
-      } catch (Exception e) {
-        LOG.error("", e);
-      }
-
-      request.setEntity(ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
-      // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
-      Response response = _client.handle(request);
-
-      if (response.getStatus().getCode() != Status.SUCCESS_OK.getCode()) {
-        LOG.error("Status : " + response.getStatus());
-      }
-      LOG.info("Using time : " + (System.currentTimeMillis() - time));
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java b/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
deleted file mode 100644
index 0ef7a79..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/restlet/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Restlet server for Helix controller
- * 
- */
-package org.apache.helix.controller.restlet;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 42c4f11..925c52f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -269,7 +269,6 @@ public class ParticipantManagerHelper {
     _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
         _stateMachineEngine);
     _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
-    _manager.addControllerListener(_dataAccessor);
 
     ScheduledTaskStateModelFactory stStateModelFactory =
         new ScheduledTaskStateModelFactory(_messagingService.getExecutor());

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 7527751..e411a72 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -18,6 +18,7 @@ package org.apache.helix.manager.zk;
  * specific language governing permissions and limitations
  * under the License.
  */
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,13 +29,11 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.GroupCommit;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
@@ -42,20 +41,15 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordAssembler;
 import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordUpdater;
-import org.apache.helix.controller.restlet.ZNRecordUpdate;
-import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
-public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener {
+public class ZKHelixDataAccessor implements HelixDataAccessor {
   private static Logger LOG = Logger.getLogger(ZKHelixDataAccessor.class);
   private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
   final InstanceType _instanceType;
   private final String _clusterName;
   private final Builder _propertyKeyBuilder;
-  ZkPropertyTransferClient _zkPropertyTransferClient = null;
   private final GroupCommit _groupCommit = new GroupCommit();
   String _zkPropertyTransferSvcUrl = null;
 
@@ -100,14 +94,6 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     String path = key.getPath();
     int options = constructOptions(type);
 
-    if (type.usePropertyTransferServer()) {
-      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
-        ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
-        _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-        return true;
-      }
-    }
-
     boolean success = false;
     switch (type) {
     case RESOURCEASSIGNMENTS:
@@ -155,20 +141,12 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     case CURRENTSTATES:
       success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
       break;
-    default:
-      if (type.usePropertyTransferServer()) {
-        if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null) {
-          ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
-          _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
-
-          return true;
-        } else {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("getPropertyTransferUrl is null, skip updating the value");
-          }
-          return true;
-        }
+    case STATUSUPDATES:
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Update status. path: " + key.getPath() + ", record: " + value.getRecord());
       }
+      break;
+    default:
       success = _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
       break;
     }
@@ -487,39 +465,4 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
       List<DataUpdater<ZNRecord>> updaters, int options) {
     return _baseDataAccessor.updateChildren(paths, updaters, options);
   }
-
-  public void shutdown() {
-    if (_zkPropertyTransferClient != null) {
-      _zkPropertyTransferClient.shutdown();
-    }
-  }
-
-  @Override
-  public void onControllerChange(NotificationContext changeContext) {
-    LOG.info("Controller has changed");
-    refreshZkPropertyTransferUrl();
-    if (_zkPropertyTransferClient == null) {
-      if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0) {
-        LOG.info("Creating ZkPropertyTransferClient as we get url " + _zkPropertyTransferSvcUrl);
-        _zkPropertyTransferClient =
-            new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
-      }
-    }
-  }
-
-  void refreshZkPropertyTransferUrl() {
-    try {
-      LiveInstance leader = getProperty(keyBuilder().controllerLeader());
-      if (leader != null) {
-        _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
-        LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl + " Controller "
-            + leader.getInstanceName());
-      } else {
-        _zkPropertyTransferSvcUrl = null;
-      }
-    } catch (Exception e) {
-      // LOG.error("", e);
-      _zkPropertyTransferSvcUrl = null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b71304a..d446430 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -548,8 +548,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       // TODO reset user defined handlers only
       resetHandlers();
 
-      _dataAccessor.shutdown();
-
       if (_leaderElectionHandler != null) {
         _leaderElectionHandler.reset();
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index bd37d42..2d7cb3c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -28,6 +28,7 @@ import java.util.TreeMap;
 
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.log4j.Logger;
@@ -35,7 +36,6 @@ import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
-import org.restlet.engine.util.Base64;
 
 import com.google.common.collect.Maps;
 
@@ -141,7 +141,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
       if (rawPayload != null && rawPayload.length > 0) {
         // write rawPayload
         g.writeRaw("\n  ");
-        g.writeStringField("rawPayload", Base64.encode(rawPayload, false));
+        g.writeStringField("rawPayload", new String(Base64.encodeBase64(rawPayload), "UTF-8"));
       }
 
       g.writeRaw("\n");
@@ -226,7 +226,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer {
           }
 
         } else if ("rawPayload".equals(fieldname)) {
-          rawPayload = Base64.decode(jp.getText());
+          rawPayload = Base64.decodeBase64(jp.getText());
         } else {
           throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 88ca1b0..df8b5c9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -31,7 +31,6 @@ import org.apache.helix.PropertyType;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.log4j.Logger;
@@ -134,15 +133,6 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
       leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
       leader.setSessionId(manager.getSessionId());
       leader.setHelixVersion(manager.getVersion());
-      if (ZKPropertyTransferServer.getInstance() != null) {
-        String zkPropertyTransferServiceUrl =
-            ZKPropertyTransferServer.getInstance().getWebserviceUrl();
-        if (zkPropertyTransferServiceUrl != null) {
-          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
-        }
-      } else {
-        LOG.warn("ZKPropertyTransferServer instnace is null");
-      }
       boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
       if (success) {
         return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index 39ded2c..0c6e772 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -353,7 +353,6 @@ public class ZkHelixParticipant implements HelixParticipant {
      */
     _connection.addMessageListener(this, _messagingService.getExecutor(), _clusterId,
         _participantId);
-    _connection.addControllerListener(this, _accessor, _clusterId);
 
     ScheduledTaskStateModelFactory stStateModelFactory =
         new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
@@ -432,7 +431,6 @@ public class ZkHelixParticipant implements HelixParticipant {
      * transition?
      */
     _messagingService.getExecutor().shutdown();
-    _accessor.shutdown();
 
     /**
      * remove live instance ephemeral znode

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
deleted file mode 100644
index 37f8205..0000000
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ /dev/null
@@ -1,224 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
-
-  String _statName = "TestStat@DB=db1";
-  String _stat = "TestStat";
-  String metricName1 = "TestMetric1";
-  String metricName2 = "TestMetric2";
-
-  void setHealthData(int[] val1, int[] val2) {
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixManager manager = _participants[i];
-      ZNRecord record = new ZNRecord(_stat);
-      Map<String, String> valMap = new HashMap<String, String>();
-      valMap.put(metricName1, val1[i] + "");
-      valMap.put(metricName2, val2[i] + "");
-      record.setSimpleField("TimeStamp", new Date().getTime() + "");
-      record.setMapField(_statName, valMap);
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      helixDataAccessor.setProperty(
-          keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
-          new HealthStat(record));
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      LOG.error("sleep interrupted", e);
-    }
-  }
-
-  void setHealthData2(int[] val1) {
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixManager manager = _participants[i];
-      ZNRecord record = new ZNRecord(_stat);
-      Map<String, String> valMap = new HashMap<String, String>();
-      valMap.put(metricName2, val1[i] + "");
-      record.setSimpleField("TimeStamp", new Date().getTime() + "");
-      record.setMapField("TestStat@DB=TestDB;Partition=TestDB_3", valMap);
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      helixDataAccessor.setProperty(
-          keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
-          new HealthStat(record));
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      LOG.error("sleep interrupted", e);
-    }
-  }
-
-  @Test
-  public void testAlertActionDisableNode() throws InterruptedException {
-    // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("healthChange.enabled", "true");
-    _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
-    String alertStr1 =
-        "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)ACTION(DISABLE_INSTANCE)";
-    String alertStr2 =
-        "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(120)ACTION(DISABLE_INSTANCE)";
-    String alertStr3 =
-        "EXP(decay(1.0)(localhost_*.TestStat@DB=TestDB;Partition=*.TestMetric2))CMP(GREATER)CON(160)ACTION(DISABLE_PARTITION)";
-
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr1);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr2);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, alertStr3);
-
-    int[] metrics1 = {
-        10, 15, 22, 12, 16
-    };
-    int[] metrics2 = {
-        22, 115, 22, 163, 16
-    };
-    int[] metrics3 = {
-        0, 0, 0, 0, 0
-    };
-    setHealthData(metrics1, metrics2);
-
-    HelixManager manager = _controller;
-
-    HealthStatsAggregator task = new HealthStatsAggregator(manager);
-    task.aggregate();
-    Thread.sleep(4000);
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    Builder kb = manager.getHelixDataAccessor().keyBuilder();
-    ExternalView externalView =
-        manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
-    // Test the DISABLE_INSTANCE alerts
-    String participant1 = "localhost_" + (START_PORT + 3);
-    String participant2 = "localhost_" + (START_PORT + 2);
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    String isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertFalse(Boolean.parseBoolean(isEnabled));
-
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertFalse(Boolean.parseBoolean(isEnabled));
-
-    for (String partitionName : externalView.getRecord().getMapFields().keySet()) {
-      for (String hostName : externalView.getRecord().getMapField(partitionName).keySet()) {
-        if (hostName.equals(participant1) || hostName.equals(participant2)) {
-          Assert.assertEquals(externalView.getRecord().getMapField(partitionName).get(hostName),
-              "OFFLINE");
-        }
-      }
-    }
-
-    // enable the disabled instances
-    setHealthData(metrics3, metrics3);
-    task.aggregate();
-    Thread.sleep(1000);
-
-    manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
-    manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant1, true);
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            CLUSTER_NAME));
-    Assert.assertTrue(result);
-
-    // Test the DISABLE_PARTITION case
-    int[] metrics4 = {
-        22, 115, 22, 16, 163
-    };
-    setHealthData2(metrics4);
-    task.aggregate();
-
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant1).build();
-    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertTrue(Boolean.parseBoolean(isEnabled));
-
-    // scope = new
-    // ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
-            .forCluster(manager.getClusterName()).forParticipant(participant2).build();
-    isEnabled = configAccessor.get(scope, "HELIX_ENABLED");
-    Assert.assertTrue(Boolean.parseBoolean(isEnabled));
-
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            CLUSTER_NAME));
-    Assert.assertTrue(result);
-    String participant3 = "localhost_" + (START_PORT + 4);
-    externalView = manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
-    Assert.assertTrue(externalView.getRecord().getMapField("TestDB_3").get(participant3)
-        .equalsIgnoreCase("OFFLINE"));
-
-    InstanceConfig nodeConfig =
-        helixDataAccessor.getProperty(keyBuilder.instanceConfig(participant3));
-    Assert.assertTrue(nodeConfig.getRecord()
-        .getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString())
-        .contains("TestDB_3"));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
deleted file mode 100644
index 798ce92..0000000
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ /dev/null
@@ -1,422 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.model.AlertHistory;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
-
-  String _statName = "TestStat@DB=db1";
-  String _stat = "TestStat";
-  String metricName1 = "TestMetric1";
-  String metricName2 = "TestMetric2";
-
-  String _alertStr1 = "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric1))CMP(GREATER)CON(20)";
-  String _alertStr2 =
-      "EXP(decay(1.0)(localhost_*.TestStat@DB=db1.TestMetric2))CMP(GREATER)CON(100)";
-
-  void setHealthData(int[] val1, int[] val2) {
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixManager manager = _participants[i];
-      ZNRecord record = new ZNRecord(_stat);
-      Map<String, String> valMap = new HashMap<String, String>();
-      valMap.put(metricName1, val1[i] + "");
-      valMap.put(metricName2, val2[i] + "");
-      record.setSimpleField("TimeStamp", new Date().getTime() + "");
-      record.setMapField(_statName, valMap);
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      helixDataAccessor.setProperty(
-          keyBuilder.healthReport(manager.getInstanceName(), record.getId()),
-          new HealthStat(record));
-    }
-    try {
-      Thread.sleep(1000);
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted sleep", e);
-    }
-  }
-
-  @Test
-  public void testAlertDisable() throws InterruptedException {
-
-    int[] metrics1 = {
-        10, 15, 22, 24, 16
-    };
-    int[] metrics2 = {
-        22, 115, 22, 141, 16
-    };
-    setHealthData(metrics1, metrics2);
-
-    HelixManager manager = _controller;
-    manager.startTimerTasks();
-
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
-
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
-    Map<String, String> properties = new HashMap<String, String>();
-    properties.put("healthChange.enabled", "false");
-    _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
-    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
-
-    task.aggregate();
-    Thread.sleep(100);
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-
-    Assert.assertEquals(history, null);
-
-    properties.put("healthChange.enabled", "true");
-    _setupTool.getClusterManagementTool().setConfig(scope, properties);
-
-    task.aggregate();
-    Thread.sleep(100);
-
-    history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-    //
-    Assert.assertNotNull(history);
-    Assert.assertEquals(history.getRecord().getMapFields().size(), 1);
-  }
-
-  @Test
-  public void testAlertHistory() throws InterruptedException {
-    int[] metrics1 = {
-        10, 15, 22, 24, 16
-    };
-    int[] metrics2 = {
-        22, 115, 22, 141, 16
-    };
-    setHealthData(metrics1, metrics2);
-
-    HelixManager manager = _controller;
-    for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
-      task.stop();
-    }
-
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
-    _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
-
-    int historySize = 0;
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    HelixProperty property = helixDataAccessor.getProperty(keyBuilder.alertHistory());
-    ZNRecord history = null;
-    if (property != null) {
-      history = property.getRecord();
-      historySize = property.getRecord().getMapFields().size();
-    }
-
-    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
-
-    task.aggregate();
-    Thread.sleep(100);
-
-    history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-    //
-    Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
-    TreeMap<String, Map<String, String>> recordMap = new TreeMap<String, Map<String, String>>();
-    recordMap.putAll(history.getMapFields());
-    Map<String, String> lastRecord = recordMap.firstEntry().getValue();
-    Assert.assertTrue(lastRecord.size() == 4);
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-
-    setHealthData(metrics1, metrics2);
-    task.aggregate();
-    Thread.sleep(100);
-    history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-    // no change
-    Assert.assertEquals(history.getMapFields().size(), 1 + historySize);
-    recordMap = new TreeMap<String, Map<String, String>>();
-    recordMap.putAll(history.getMapFields());
-    lastRecord = recordMap.firstEntry().getValue();
-    Assert.assertTrue(lastRecord.size() == 4);
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-
-    int[] metrics3 = {
-        21, 44, 22, 14, 16
-    };
-    int[] metrics4 = {
-        122, 115, 222, 41, 16
-    };
-    setHealthData(metrics3, metrics4);
-    task.aggregate();
-    Thread.sleep(100);
-    history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-    // new delta should be recorded
-    Assert.assertEquals(history.getMapFields().size(), 2 + historySize);
-    recordMap = new TreeMap<String, Map<String, String>>();
-    recordMap.putAll(history.getMapFields());
-    lastRecord = recordMap.lastEntry().getValue();
-    Assert.assertEquals(lastRecord.size(), 6);
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("ON"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("ON"));
-
-    int[] metrics5 = {
-        0, 0, 0, 0, 0
-    };
-    int[] metrics6 = {
-        0, 0, 0, 0, 0
-    };
-    setHealthData(metrics5, metrics6);
-    task.aggregate();
-
-    for (int i = 0; i < 10; i++) {
-      Thread.sleep(500);
-      history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-      recordMap = new TreeMap<String, Map<String, String>>();
-      recordMap.putAll(history.getMapFields());
-      lastRecord = recordMap.lastEntry().getValue();
-
-      if (history.getMapFields().size() == 3 + historySize && lastRecord.size() == 6) {
-        break;
-      }
-    }
-
-    // reset everything
-    Assert.assertEquals(history.getMapFields().size(), 3 + historySize,
-        "expect history-map-field size is " + (3 + historySize) + ", but was " + history);
-    Assert
-        .assertTrue(lastRecord.size() == 6, "expect last-record size is 6, but was " + lastRecord);
-
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)")
-        .equals("OFF"));
-    Assert.assertTrue(lastRecord.get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)")
-        .equals("OFF"));
-
-    // Size of the history should be 30
-    for (int i = 0; i < 27; i++) {
-      int x = i % 2;
-      int y = (i + 1) % 2;
-      int[] metricsx = {
-          19 + 3 * x, 19 + 3 * y, 19 + 4 * x, 18 + 4 * y, 17 + 5 * y
-      };
-      int[] metricsy = {
-          99 + 3 * x, 99 + 3 * y, 98 + 4 * x, 98 + 4 * y, 97 + 5 * y
-      };
-
-      setHealthData(metricsx, metricsy);
-      task.aggregate();
-      Thread.sleep(100);
-      history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-
-      Assert.assertEquals(history.getMapFields().size(), Math.min(3 + i + 1 + historySize, 30));
-      recordMap = new TreeMap<String, Map<String, String>>();
-      recordMap.putAll(history.getMapFields());
-      lastRecord = recordMap.lastEntry().getValue();
-      if (i == 0) {
-        Assert.assertTrue(lastRecord.size() == 6);
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-      } else {
-        System.out.println(lastRecord.size());
-        Assert.assertEquals(lastRecord.size(), 10);
-        if (x == 0) {
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        } else {
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-          Assert.assertTrue(lastRecord.get(
-              "(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        }
-      }
-    }
-    // size limit is 30
-    for (int i = 0; i < 10; i++) {
-      int x = i % 2;
-      int y = (i + 1) % 2;
-      int[] metricsx = {
-          19 + 3 * x, 19 + 3 * y, 19 + 4 * x, 18 + 4 * y, 17 + 5 * y
-      };
-      int[] metricsy = {
-          99 + 3 * x, 99 + 3 * y, 98 + 4 * x, 98 + 4 * y, 97 + 5 * y
-      };
-
-      setHealthData(metricsx, metricsy);
-      task.aggregate();
-      for (int j = 0; j < 10; j++) {
-        Thread.sleep(100);
-        history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
-        recordMap = new TreeMap<String, Map<String, String>>();
-        recordMap.putAll(history.getMapFields());
-        lastRecord = recordMap.lastEntry().getValue();
-
-        if (history.getMapFields().size() == 30 && lastRecord.size() == 10)
-          break;
-      }
-      Assert.assertEquals(history.getMapFields().size(), 30,
-          "expect history.map-field size is 30, but was " + history);
-      Assert.assertEquals(lastRecord.size(), 10, "expect last-record size is 10, but was "
-          + lastRecord);
-
-      if (x == 0) {
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-      } else {
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12922.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12922.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12920.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12918.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12919.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12921.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12920.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("OFF"));
-        Assert.assertTrue(lastRecord.get(
-            "(localhost_12918.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
-        Assert.assertTrue(lastRecord
-            .get("(localhost_12919.TestStat@DB#db1.TestMetric1)GREATER(20)").equals("OFF"));
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index cc43ecb..330b906 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -30,8 +30,6 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -46,13 +44,11 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
+public class TestAutoRebalance extends ZkStandAloneCMTestBase {
   String db2 = TEST_DB + "2";
   String _tag = "SSDSSD";
 

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 5f9f48c..59e1b27 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -48,7 +48,7 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
   private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
       .getName());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 7bf4f94..33b95c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -57,7 +57,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestCustomizedIdealStateRebalancer extends
-    ZkStandAloneCMTestBaseWithPropertyServerCheck {
+    ZkStandAloneCMTestBase {
   String db2 = TEST_DB + "2";
   static boolean testRebalancerCreated = false;
   static boolean testRebalancerInvoked = false;

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
index be523d0..ca54fb3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -25,7 +25,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisableNode extends ZkStandAloneCMTestBase {
 
   @Test()
   public void testDisableNode() throws Exception {

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index ba7e8e4..c81f6e0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDisablePartition extends ZkStandAloneCMTestBase {
   private static Logger LOG = Logger.getLogger(TestDisablePartition.class);
 
   @Test()

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
index 0d02d12..047ea75 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -26,7 +26,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestDropResource extends ZkStandAloneCMTestBase {
   @Test()
   public void testDropResource() throws Exception {
     // add a resource to be dropped

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 86f1ce4..7df9e8b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -38,7 +38,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestMessagingService extends ZkStandAloneCMTestBase {
   public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
     public static HashSet<String> _processedMsgIds = new HashSet<String>();
 
@@ -168,8 +168,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
               .get("ReplyMessage"));
       if (message.getRecord().getMapField(Message.Attributes.MESSAGE_RESULT.toString())
           .get("ReplyMessage") == null) {
-        int x = 0;
-        x++;
       }
       _replyedMessageContents.add(message.getRecord()
           .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage"));

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index d78bd9d..e6117d6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -64,7 +64,7 @@ import org.codehaus.jackson.map.SerializationConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
 
   class MockAsyncCallback extends AsyncCallback {
     Message _message;
@@ -386,7 +386,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
             PropertyType.STATUSUPDATES);
 
     subPaths = _gZkClient.getChildren(instanceStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
+    Assert.assertTrue(subPaths.size() == 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
       List<String> subsubPaths = _gZkClient.getChildren(nextPath);
@@ -409,7 +409,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     }
 
     subPaths = _gZkClient.getChildren(instanceStatusPath);
-    Assert.assertTrue(subPaths.size() > 0);
+    Assert.assertTrue(subPaths.size() == 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
       List<String> subsubPaths = _gZkClient.getChildren(nextPath);

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
index 2a026c2..e7cef4f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestUserDefRebalancerCompatibility.java
@@ -37,8 +37,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @SuppressWarnings("deprecation")
-public class TestUserDefRebalancerCompatibility extends
-    ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestUserDefRebalancerCompatibility extends ZkStandAloneCMTestBase {
   String db2 = TEST_DB + "2";
   static boolean testRebalancerCreated = false;
   static boolean testRebalancerInvoked = false;


[5/7] git commit: [HELIX-444] add per-participant partition count gauges to helix, rb=21419

Posted by ka...@apache.org.
[HELIX-444] add per-participant partition count gauges to helix, rb=21419


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96aef71c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96aef71c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96aef71c

Branch: refs/heads/master
Commit: 96aef71c899dc1f3956e1211fc1e9a7459a258d1
Parents: 8527729
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 15:56:57 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 14:53:41 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/State.java   |   6 +-
 .../stages/BestPossibleStateCalcStage.java      |   9 +
 .../controller/stages/ClusterDataCache.java     |  42 +---
 .../stages/ExternalViewComputeStage.java        |  23 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java | 182 +++++++++++++--
 .../monitoring/mbeans/InstanceMonitor.java      |   4 +-
 .../mbeans/PerInstanceResourceMonitor.java      | 147 +++++++++++++
 .../mbeans/PerInstanceResourceMonitorMBean.java |  34 +++
 .../monitoring/mbeans/ResourceMonitor.java      |  23 +-
 .../TestClusterStatusMonitorLifecycle.java      |  42 ++--
 .../mbeans/TestClusterStatusMonitor.java        | 220 ++++++++++++-------
 .../monitoring/mbeans/TestResourceMonitor.java  |  59 ++---
 12 files changed, 586 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index 3315987..aa98df2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -47,9 +47,11 @@ public class State {
   @Override
   public boolean equals(Object that) {
     if (that instanceof State) {
-      return this.toString().equals(((State) that).toString());
+      return this.toString().equalsIgnoreCase(((State) that).toString());
     } else if (that instanceof String) {
-      return _state.equals(that);
+      return _state.equalsIgnoreCase(that.toString());
+    } else if (that instanceof HelixDefinedState) {
+      return _state.equalsIgnoreCase(that.toString());
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 644b9f6..2f93b7f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -42,6 +42,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -69,6 +70,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("Cluster");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -79,6 +81,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         compute(cluster, event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+          cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+    }
+
     long endTime = System.currentTimeMillis();
     if (LOG.isInfoEnabled()) {
       LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 0c28bdf..8bcfaae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -79,12 +79,6 @@ public class ClusterDataCache {
 
   boolean _init = true;
 
-  // Map<String, Map<String, HealthStat>> _healthStatMap;
-  // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
-  // private PersistentStats _persistentStats;
-  // private Alerts _alerts;
-  // private AlertStatus _alertStatus;
-
   private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName());
 
   /**
@@ -334,37 +328,6 @@ public class ClusterDataCache {
     return _messageMap;
   }
 
-  // public HealthStat getGlobalStats()
-  // {
-  // return _globalStats;
-  // }
-  //
-  // public PersistentStats getPersistentStats()
-  // {
-  // return _persistentStats;
-  // }
-  //
-  // public Alerts getAlerts()
-  // {
-  // return _alerts;
-  // }
-  //
-  // public AlertStatus getAlertStatus()
-  // {
-  // return _alertStatus;
-  // }
-  //
-  // public Map<String, HealthStat> getHealthStats(String instanceName)
-  // {
-  // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-  // if (map != null)
-  // {
-  // return map;
-  // } else
-  // {
-  // return Collections.emptyMap();
-  // }
-  // }
   /**
    * Provides the state model definition for a given state model
    * @param stateModelDefRef
@@ -375,8 +338,13 @@ public class ClusterDataCache {
   }
 
   /**
+<<<<<<< HEAD
    * Get all state model definitions
    * @return map of name to state model definition
+=======
+   * Provides all state model definitions
+   * @return state model definition map
+>>>>>>> 8d5c27c... [HELIX-444] add per-participant partition count gauges to helix, rb=21419
    */
   public Map<String, StateModelDefinition> getStateModelDefMap() {
     return _stateModelDefMap;

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a46acbd..3086a83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,7 +35,6 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.SchedulerTaskConfig;
@@ -46,6 +45,7 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -66,10 +66,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("Cluster");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
-    if (manager == null || resourceMap == null || cluster == null) {
+    if (manager == null || resourceMap == null || cluster == null || cache == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires ClusterManager|RESOURCES|Cluster");
+          + ". Requires ClusterManager|RESOURCES|Cluster|ClusterDataCache");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@@ -118,15 +119,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       // Update cluster status monitor mbean
       ClusterStatusMonitor clusterStatusMonitor =
           (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
-      if (clusterStatusMonitor != null && currentResource != null) {
-        IdealState idealState = currentResource.getIdealState();
-        if (idealState != null) {
-          StateModelDefId stateModelDefId = idealState.getStateModelDefId();
-          if (stateModelDefId != null
-              && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) {
-            clusterStatusMonitor.onExternalViewChange(view, idealState);
-          }
+      IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+      if (idealState != null) {
+        if (clusterStatusMonitor != null
+            && !idealState.getStateModelDefRef().equalsIgnoreCase(
+                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+          clusterStatusMonitor.setResourceStatus(view,
+              cache._idealStateMap.get(view.getResourceName()));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b468856..99dee75 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -32,8 +33,18 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
@@ -41,15 +52,15 @@ import com.google.common.collect.Sets;
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
 
-  static final String CLUSTER_STATUS_KEY = "ClusterStatus";
+  public static final String CLUSTER_STATUS_KEY = "ClusterStatus";
   static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
-  static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+  public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   static final String CLUSTER_DN_KEY = "cluster";
   static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
 
-  static final String DEFAULT_TAG = "DEFAULT";
+  public static final String DEFAULT_TAG = "DEFAULT";
 
   private final String _clusterName;
   private final MBeanServer _beanServer;
@@ -68,20 +79,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
       new ConcurrentHashMap<String, InstanceMonitor>();
 
+  /**
+   * PerInstanceResource bean map: beanName->bean
+   */
+  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
+      new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
+
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
     try {
-      register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+      register(this, getObjectName(clusterBeanName()));
     } catch (Exception e) {
-      LOG.error("Register self failed.", e);
+      LOG.error("Fail to regiter ClusterStatusMonitor", e);
     }
   }
 
   public ObjectName getObjectName(String name) throws MalformedObjectNameException {
-    return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+    return new ObjectName(String.format("%s: %s", CLUSTER_STATUS_KEY, name));
   }
 
+  // TODO remove getBeanName()?
   // Used by other external JMX consumers like ingraph
   public String getBeanName() {
     return CLUSTER_STATUS_KEY + " " + _clusterName;
@@ -144,10 +162,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
 
     try {
-      LOG.info("Registering " + name.toString());
+      LOG.info("Register MBean: " + name);
       _beanServer.registerMBean(bean, name);
     } catch (Exception e) {
-      LOG.warn("Could not register MBean" + name, e);
+      LOG.warn("Could not register MBean: " + name, e);
     }
   }
 
@@ -158,7 +176,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
         _beanServer.unregisterMBean(name);
       }
     } catch (Exception e) {
-      LOG.warn("Could not unregister MBean" + name, e);
+      LOG.warn("Could not unregister MBean: " + name, e);
     }
   }
 
@@ -227,28 +245,98 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+  /**
+   * Update gauges for resource at instance level
+   * @param bestPossibleStates
+   * @param resourceMap
+   * @param stateModelDefMap
+   */
+  public void setPerInstanceResourceStatus(BestPossibleStateOutput bestPossibleStates,
+      Map<String, InstanceConfig> instanceConfigMap, Map<ResourceId, ResourceConfig> resourceMap,
+      Map<String, StateModelDefinition> stateModelDefMap) {
+
+    // Convert to perInstanceResource beanName->partition->state
+    Map<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>> beanMap =
+        new HashMap<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>>();
+    for (ResourceId resource : bestPossibleStates.getAssignedResources()) {
+      ResourceAssignment assignment = bestPossibleStates.getResourceAssignment(resource);
+      for (PartitionId partition : assignment.getMappedPartitionIds()) {
+        Map<ParticipantId, State> instanceStateMap = assignment.getReplicaMap(partition);
+        for (ParticipantId instance : instanceStateMap.keySet()) {
+          State state = instanceStateMap.get(instance);
+          PerInstanceResourceMonitor.BeanName beanName =
+              new PerInstanceResourceMonitor.BeanName(instance.toString(), resource.toString());
+          if (!beanMap.containsKey(beanName)) {
+            beanMap.put(beanName, new HashMap<PartitionId, State>());
+          }
+          beanMap.get(beanName).put(partition, state);
+        }
+      }
+    }
+    // Unregister beans for per-instance resources that no longer exist
+    Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+        Sets.newHashSet(_perInstanceResourceMap.keySet());
+    toUnregister.removeAll(beanMap.keySet());
+    try {
+      unregisterPerInstanceResources(toUnregister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Fail to unregister per-instance resource from MBean server: " + toUnregister, e);
+    }
+    // Register beans for per-instance resources that are newly configured
+    Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
+    toRegister.removeAll(_perInstanceResourceMap.keySet());
+    Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
+    for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
+      PerInstanceResourceMonitor bean =
+          new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(),
+              beanName.resourceName());
+      StateModelDefId stateModelDefId =
+          resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+              .getStateModelDefId();
+      InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+      bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+          stateModelDefMap.get(stateModelDefId.toString()));
+      monitorsToRegister.add(bean);
+    }
+    try {
+      registerPerInstanceResources(monitorsToRegister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
+    }
+    // Update existing beans
+    for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
+      PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+      StateModelDefId stateModelDefId =
+          resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+              .getStateModelDefId();
+      InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+      bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+          stateModelDefMap.get(stateModelDefId.toString()));
+    }
+  }
+
+  public void setResourceStatus(ExternalView externalView, IdealState idealState) {
     try {
       String resourceName = externalView.getId();
       if (!_resourceMbeanMap.containsKey(resourceName)) {
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
-            bean.updateExternalView(externalView, idealState);
+            bean.updateResource(externalView, idealState);
             registerResources(Arrays.asList(bean));
           }
         }
       }
       ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
       String oldSensorName = bean.getSensorName();
-      bean.updateExternalView(externalView, idealState);
+      bean.updateResource(externalView, idealState);
       String newSensorName = bean.getSensorName();
       if (!oldSensorName.equals(newSensorName)) {
         unregisterResources(Arrays.asList(resourceName));
         registerResources(Arrays.asList(bean));
       }
     } catch (Exception e) {
-      LOG.warn(e);
+      LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e);
     }
   }
 
@@ -264,18 +352,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
       _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
     } catch (Exception e) {
-      LOG.warn("fail to add message queue size to mbean", e);
+      LOG.error("Fail to add message queue size to mbean, instance: " + instanceName, e);
     }
   }
 
   public void reset() {
-    LOG.info("Resetting ClusterStatusMonitor");
+    LOG.info("Reset ClusterStatusMonitor");
     try {
-      for (String resourceName : _resourceMbeanMap.keySet()) {
-        String beanName =
-            CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
-        unregister(getObjectName(beanName));
-      }
+      unregisterResources(_resourceMbeanMap.keySet());
+
       _resourceMbeanMap.clear();
 
       for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
@@ -286,9 +371,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       unregisterInstances(_instanceMbeanMap.keySet());
       _instanceMbeanMap.clear();
 
-      unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+      unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+      unregister(getObjectName(clusterBeanName()));
     } catch (Exception e) {
-      LOG.error("fail to reset ClusterStatusMonitor", e);
+      LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
     }
   }
 
@@ -330,12 +416,60 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _resourceMbeanMap.keySet().removeAll(resources);
   }
 
+  private synchronized void registerPerInstanceResources(
+      Collection<PerInstanceResourceMonitor> monitors) throws MalformedObjectNameException {
+    for (PerInstanceResourceMonitor monitor : monitors) {
+      String instanceName = monitor.getInstanceName();
+      String resourceName = monitor.getResourceName();
+      String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+      register(monitor, getObjectName(beanName));
+      _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName,
+          resourceName), monitor);
+    }
+  }
+
+  private synchronized void unregisterPerInstanceResources(
+      Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+      throws MalformedObjectNameException {
+    for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+      unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
+          beanName.resourceName())));
+    }
+    _perInstanceResourceMap.keySet().removeAll(beanNames);
+  }
+
+  public String clusterBeanName() {
+    return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
+  }
+
+  /**
+   * Build instance bean name
+   * @param instanceName
+   * @return instance bean name
+   */
   private String getInstanceBeanName(String instanceName) {
-    return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+    return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
   }
 
+  /**
+   * Build resource bean name
+   * @param resourceName
+   * @return resource bean name
+   */
   private String getResourceBeanName(String resourceName) {
-    return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+    return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
+  }
+
+  /**
+   * Build per-instance resource bean name:
+   * "cluster={clusterName},instanceName={instanceName},resourceName={resourceName}"
+   * @param instanceName
+   * @param resourceName
+   * @return per-instance resource bean name
+   */
+  public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+    return String.format("%s,%s", clusterBeanName(), new PerInstanceResourceMonitor.BeanName(
+        instanceName, resourceName).toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 1385568..d9875cc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -54,8 +54,8 @@ public class InstanceMonitor implements InstanceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
-        + serializedTags() + "." + _participantName;
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+        serializedTags(), _participantName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
new file mode 100644
index 0000000..476445c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -0,0 +1,147 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+  public static class BeanName {
+    private final String _instanceName;
+    private final String _resourceName;
+
+    public BeanName(String instanceName, String resourceName) {
+      if (instanceName == null || resourceName == null) {
+        throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
+            + ", resourceName: " + resourceName);
+      }
+      _instanceName = instanceName;
+      _resourceName = resourceName;
+    }
+
+    public String instanceName() {
+      return _instanceName;
+    }
+
+    public String resourceName() {
+      return _resourceName;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof BeanName)) {
+        return false;
+      }
+
+      BeanName that = (BeanName) obj;
+      return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+    }
+
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
+          ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+    }
+  }
+
+  private final String _clusterName;
+  private List<String> _tags;
+  private final String _participantName;
+  private final String _resourceName;
+  private long _partitions;
+
+  public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+    _clusterName = clusterName;
+    _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    _participantName = participantName;
+    _resourceName = resourceName;
+    _partitions = 0;
+  }
+
+  @Override
+  public String getSensorName() {
+    return Joiner
+        .on('.')
+        .join(
+            ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+                serializedTags(), _participantName, _resourceName)).toString();
+  }
+
+  private String serializedTags() {
+    return Joiner.on('|').skipNulls().join(_tags).toString();
+  }
+
+  @Override
+  public long getPartitionGauge() {
+    return _partitions;
+  }
+
+  public String getInstanceName() {
+    return _participantName;
+  }
+
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
+   * Update per-instance resource bean
+   * @param stateMap partition->state
+   * @tags tags instance tags
+   * @param stateModelDef
+   */
+  public synchronized void update(Map<PartitionId, State> stateMap, Set<String> tags,
+      StateModelDefinition stateModelDef) {
+    if (tags == null || tags.isEmpty()) {
+      _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    } else {
+      _tags = Lists.newArrayList(tags);
+      Collections.sort(_tags);
+    }
+
+    int cnt = 0;
+    for (State state : stateMap.values()) {
+      // Skip DROPPED and initial state (e.g. OFFLINE)
+      if (state.equals(HelixDefinedState.DROPPED)
+          || state.equals(stateModelDef.getTypedInitialState())) {
+        continue;
+      }
+      cnt++;
+    }
+    _partitions = cnt;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
new file mode 100644
index 0000000..4b544b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
@@ -0,0 +1,34 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A bean that describes the resource on each instance
+ */
+public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
+  /**
+   * Get the number of partitions of the resource in best possible ideal state
+   * for the instance
+   * @return number of partitions
+   */
+  long getPartitionGauge();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d1ba595..4739fab 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,6 +19,7 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.helix.HelixDefinedState;
@@ -30,14 +31,15 @@ import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
 public class ResourceMonitor implements ResourceMonitorMBean {
-  private int _numOfPartitions;
-  int _numOfPartitionsInExternalView;
-  int _numOfErrorPartitions;
-  int _externalViewIdealStateDiff;
-  String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
 
-  String _resourceName, _clusterName;
+  private int _numOfPartitions;
+  private int _numOfPartitionsInExternalView;
+  private int _numOfErrorPartitions;
+  private int _externalViewIdealStateDiff;
+  private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+  private String _resourceName;
+  private String _clusterName;
 
   public ResourceMonitor(String clusterName, String resourceName) {
     _clusterName = clusterName;
@@ -61,15 +63,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
-        + _resourceName;
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+        _tag, _resourceName);
   }
 
   public String getResourceName() {
     return _resourceName;
   }
 
-  public void updateExternalView(ExternalView externalView, IdealState idealState) {
+  public void updateResource(ExternalView externalView, IdealState idealState) {
     if (externalView == null) {
       LOG.warn("external view is null");
       return;
@@ -97,6 +99,9 @@ public class ResourceMonitor implements ResourceMonitorMBean {
     // or list fields (AUDO mode)
     for (PartitionId partitionId : idealState.getPartitionIdSet()) {
       Map<ParticipantId, State> idealRecord = idealState.getParticipantStateMap(partitionId);
+      if (idealRecord == null) {
+        idealRecord = Collections.emptyMap();
+      }
       Map<ParticipantId, State> externalViewRecord = externalView.getStateMap(partitionId);
 
       if (externalViewRecord == null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index a00db67..0981a2e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -37,12 +37,14 @@ import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
 
   MockParticipantManager[] _participants;
   ClusterDistributedController[] _controllers;
@@ -176,12 +178,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     @Override
     public void onMBeanRegistered(MBeanServerConnection server,
         MBeanServerNotification mbsNotification) {
+      LOG.info("Register mbean: " + mbsNotification.getMBeanName());
       _nMbeansRegistered++;
     }
 
     @Override
     public void onMBeanUnRegistered(MBeanServerConnection server,
         MBeanServerNotification mbsNotification) {
+      LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
       _nMbeansUnregistered++;
     }
   }
@@ -196,10 +200,12 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     _participants[0].disconnect();
 
-    // participant goes away. should be no change in number of beans as config is still present
+    // 1 participant goes away
+    // No change in instance/resource mbean
+    // Unregister 1 per-instance resource mbean
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -215,19 +221,25 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     Thread.sleep(1000);
 
     // 1 cluster status monitor, 1 resource monitor, 5 instances
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+    // Unregister 1+4+1 per-instance resource mbean
+    // Register 4 per-instance resource mbean
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
     _participants[0].syncStart();
 
-    // participant goes back. should be no change
+    // 1 participant comes back
+    // No change in instance/resource mbean
+    // Register 1 per-instance resource mbean
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
 
-    // Add a resource, one more mbean registered
+    // Add a resource
+    // Register 1 resource mbean
+    // Register 5 per-instance resource mbean
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
 
@@ -237,14 +249,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
 
-    // remove resource, no change
+    // Remove a resource
+    // No change in instance/resource mbean
+    // Unregister 5 per-instance resource mbean
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index facb4ea..8c9ab01 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,98 +19,162 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.util.ArrayList;
+import java.lang.management.ManagementFactory;
 import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.Mocks;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import java.util.Map;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Maps;
+
 public class TestClusterStatusMonitor {
-  List<String> _instances;
-  List<ZNRecord> _liveInstances;
-  String _db = "DB";
-  String _db2 = "TestDB";
-  int _replicas = 3;
-  int _partitions = 50;
-  ZNRecord _externalView, _externalView2;
-
-  class MockDataAccessor extends Mocks.MockAccessor {
-    public MockDataAccessor() {
-      _instances = new ArrayList<String>();
-      for (int i = 0; i < 5; i++) {
-        String instance = "localhost_" + (12918 + i);
-        _instances.add(instance);
-      }
-      ZNRecord externalView =
-          DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
-              "MASTER", "SLAVE");
-
-      ZNRecord externalView2 =
-          DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, _db2, "MASTER", "SLAVE");
-    }
+  private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+
+  @Test()
+  public void testReportData() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 5;
+    String testDB = "TestDB";
+    String testDB_0 = testDB + "_0";
 
-    public ZNRecord getProperty(PropertyType type, String resource) {
-      if (type == PropertyType.IDEALSTATES || type == PropertyType.EXTERNALVIEW) {
-        if (resource.equals(_db)) {
-          return _externalView;
-        } else if (resource.equals(_db2)) {
-          return _externalView2;
-        }
-      }
-      return null;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+    } catch (Exception e) {
+      Assert.fail("Fail to register ClusterStatusMonitor");
     }
-  }
 
-  class MockHelixManager extends Mocks.MockManager {
-    MockDataAccessor _accessor = new MockDataAccessor();
+    // Test #setPerInstanceResourceStatus()
+    BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
+    ResourceAssignment assignment = new ResourceAssignment(ResourceId.from(testDB));
+    Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+    replicaMap.put(ParticipantId.from("localhost_12918"), State.from("MASTER"));
+    replicaMap.put(ParticipantId.from("localhost_12919"), State.from("SLAVE"));
+    replicaMap.put(ParticipantId.from("localhost_12920"), State.from("SLAVE"));
+    replicaMap.put(ParticipantId.from("localhost_12921"), State.from("OFFLINE"));
+    replicaMap.put(ParticipantId.from("localhost_12922"), State.from("DROPPED"));
+    assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+    bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
 
-    @Override
-    public HelixDataAccessor getHelixDataAccessor() {
-      return _accessor;
+    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      InstanceConfig config = new InstanceConfig(instanceName);
+      instanceConfigMap.put(instanceName, config);
     }
 
-  }
+    Map<ResourceId, ResourceConfig> resourceMap = Maps.newHashMap();
+    ResourceId resourceId = ResourceId.from(testDB);
+    RebalancerConfig rebalancerConfig =
+        new SemiAutoRebalancerConfig.Builder(resourceId)
+            .addPartition(new Partition(PartitionId.from(testDB_0)))
+            .stateModelDefId(StateModelDefId.from("MasterSlave")).build();
+    ResourceConfig resourceConfig =
+        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
+    resourceMap.put(resourceId, resourceConfig);
 
-  @Test()
-  public void TestReportData() {
-    System.out.println("START TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
-    List<String> _instances;
-    List<ZNRecord> _liveInstances = new ArrayList<ZNRecord>();
-    String _db = "DB";
-    int _replicas = 3;
-    int _partitions = 50;
-
-    _instances = new ArrayList<String>();
-    for (int i = 0; i < 5; i++) {
-      String instance = "localhost_" + (12918 + i);
-      _instances.add(instance);
-      ZNRecord metaData = new ZNRecord(instance);
-      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
-          .toString());
-      _liveInstances.add(metaData);
+    Map<String, StateModelDefinition> stateModelDefMap = Maps.newHashMap();
+    StateModelDefinition msStateModelDef =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    stateModelDefMap.put("MasterSlave", msStateModelDef);
+
+    monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+        stateModelDefMap);
+
+    // localhost_12918 should have 1 partition because it's MASTER
+    ObjectName objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+    Object value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(1));
+    value = _server.getAttribute(objName, "SensorName");
+    Assert.assertTrue(value instanceof String);
+    Assert.assertEquals((String) value, String.format("%s.%s.%s.%s.%s",
+        ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, clusterName, ClusterStatusMonitor.DEFAULT_TAG,
+        "localhost_12918", testDB));
+
+    // localhost_12919 should have 1 partition because it's SLAVE
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12919", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(1));
+
+    // localhost_12921 should have 0 partition because it's OFFLINE
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12921", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(0));
+
+    // localhost_12922 should have 0 partition because it's DROPPED
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12922", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(0));
+
+    // Missing localhost_12918 in best possible ideal-state should remove it from mbean
+    replicaMap.remove(ParticipantId.from("localhost_12918"));
+    assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+    bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
+    monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+        stateModelDefMap);
+    try {
+      objName =
+          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+      _server.getMBeanInfo(objName);
+      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
+
+    } catch (InstanceNotFoundException e) {
+      // OK
     }
-    ZNRecord externalView =
-        DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
-            "MASTER", "SLAVE");
 
-    ZNRecord externalView2 =
-        DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, "TestDB", "MASTER", "SLAVE");
+    // Clean up
+    monitor.reset();
+
+    try {
+      objName =
+          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+      _server.getMBeanInfo(objName);
+      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
 
-    List<ZNRecord> externalViews = new ArrayList<ZNRecord>();
-    externalViews.add(externalView);
-    externalViews.add(externalView2);
+    } catch (InstanceNotFoundException e) {
+      // OK
+    }
+
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+      Assert.fail("Fail to unregister ClusterStatusMonitor");
+    } catch (InstanceNotFoundException e) {
+      // OK
+    }
 
-    ClusterStatusMonitor monitor = new ClusterStatusMonitor("cluster1");
-    MockHelixManager manager = new MockHelixManager();
-    NotificationContext context = new NotificationContext(manager);
-    System.out.println("END TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index e8bb4b6..dcca755 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -28,14 +28,13 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.Mocks;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestResourceMonitor {
@@ -106,46 +105,52 @@ public class TestResourceMonitor {
   }
 
   @Test()
-  public void TestReportData() {
-    MockHelixManager manager = new MockHelixManager();
+  public void testReportData() {
+    final int n = 5;
     ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
 
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    ExternalView externalView = helixDataAccessor.getProperty(keyBuilder.externalView(_dbName));
-    IdealState idealState = helixDataAccessor.getProperty(keyBuilder.idealStates(_dbName));
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < n; i++) {
+      String instance = "localhost_" + (12918 + i);
+      instances.add(instance);
+    }
 
-    monitor.updateExternalView(externalView, idealState);
+    ZNRecord idealStateRecord =
+        DefaultTwoStateStrategy.calculateIdealState(instances, _partitions, _replicas, _dbName,
+            "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(idealStateRecord);
+    ExternalView externalView = new ExternalView(idealStateRecord);
 
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
-    monitor.getBeanName();
+    monitor.updateResource(externalView, idealState);
 
-    int n = 4;
-    for (int i = 0; i < n; i++) {
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+    // monitor.getBeanName();
+
+    final int m = n - 1;
+    for (int i = 0; i < m; i++) {
       Map<String, String> map = externalView.getStateMap(_dbName + "_" + 3 * i);
       String key = map.keySet().toArray()[0].toString();
       map.put(key, "ERROR");
       externalView.setStateMap(_dbName + "_" + 3 * i, map);
     }
 
-    monitor.updateExternalView(externalView, idealState);
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), n);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+    monitor.updateResource(externalView, idealState);
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
 
-    n = 5;
     for (int i = 0; i < n; i++) {
       externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
     }
 
-    monitor.updateExternalView(externalView, idealState);
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 3);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+    monitor.updateResource(externalView, idealState);
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
   }
 }


[6/7] git commit: [HELIX-94] Add the ability to enable and disable a resource, rb=20401

Posted by ka...@apache.org.
[HELIX-94] Add the ability to enable and disable a resource, rb=20401


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c1af744a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c1af744a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c1af744a

Branch: refs/heads/master
Commit: c1af744af6088ad98fe2a8a5074f2c56199b6f82
Parents: 96aef71
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 14:56:47 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jul 11 10:04:37 2014 -0700

----------------------------------------------------------------------
 .../helix/webapp/resources/JsonParameters.java  |   4 +
 .../webapp/resources/ResourceGroupResource.java |   7 +
 .../helix/webapp/TestDisableResource.java       |  84 ++++++
 .../main/java/org/apache/helix/HelixAdmin.java  |   7 +
 .../controller/rebalancer/CustomRebalancer.java |   5 +-
 .../rebalancer/FallbackRebalancer.java          |   4 +-
 .../rebalancer/FullAutoRebalancer.java          |   5 +-
 .../rebalancer/SemiAutoRebalancer.java          |   5 +-
 .../config/SemiAutoRebalancerConfig.java        |   2 +-
 .../util/ConstraintBasedAssignment.java         |  26 +-
 .../stages/BestPossibleStateCalcStage.java      |   4 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  84 ++++--
 .../java/org/apache/helix/model/IdealState.java |  21 +-
 .../participant/HelixCustomCodeRunner.java      |  16 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  17 +-
 .../strategy/TestAutoRebalanceStrategy.java     |   2 +-
 .../strategy/TestNewAutoRebalanceStrategy.java  |   2 +-
 .../TestDisableCustomCodeRunner.java            | 252 +++++++++++++++++
 .../helix/integration/TestDisableResource.java  | 267 +++++++++++++++++++
 .../helix/manager/zk/TestZkHelixAdmin.java      |  34 ++-
 .../apache/helix/tools/TestClusterSetup.java    |  39 ++-
 21 files changed, 826 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 5f405d8..19ac71a 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -165,6 +165,10 @@ public class JsonParameters {
       if (!_parameterMap.containsKey(ENABLED)) {
         throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
       }
+    } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+      if (!_parameterMap.containsKey(ENABLED)) {
+        throw new HelixException("Missing Json parameters: '" + ENABLED + "'");
+      }
     } else if (command.equalsIgnoreCase(ClusterSetup.enablePartition)) {
       if (!_parameterMap.containsKey(ENABLED)) {
         throw new HelixException("Missing Json parameters: '" + ENABLED + "'");

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
index 055f64a..6dc721d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceGroupResource.java
@@ -114,6 +114,13 @@ public class ResourceGroupResource extends ServerResource {
         ClusterSetup setupTool = new ClusterSetup(zkClient);
         setupTool.getClusterManagementTool()
             .resetResource(clusterName, Arrays.asList(resourceName));
+      } else if (command.equalsIgnoreCase(ClusterSetup.enableResource)) {
+        jsonParameters.verifyCommand(ClusterSetup.enableResource);
+        boolean enabled = Boolean.parseBoolean(jsonParameters.getParameter(JsonParameters.ENABLED));
+        ZkClient zkClient =
+            (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+        ClusterSetup setupTool = new ClusterSetup(zkClient);
+        setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
       } else {
         throw new HelixException("Unsupported command: " + command + ". Should be one of ["
             + ClusterSetup.resetResource + "]");

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
new file mode 100644
index 0000000..9800179
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestDisableResource.java
@@ -0,0 +1,84 @@
+package org.apache.helix.webapp;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends AdminTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    String instanceUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups/"
+            + "TestDB0";
+
+    // Disable TestDB0
+    Map<String, String> paramMap = new HashMap<String, String>();
+    paramMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableResource);
+    paramMap.put(JsonParameters.ENABLED, Boolean.toString(false));
+    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertFalse(idealState.isEnabled());
+
+    // Re-enable TestDB0
+    paramMap.put(JsonParameters.ENABLED, Boolean.toString(true));
+    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl, paramMap, false);
+
+    idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertTrue(idealState.isEnabled());
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 892bada..dce3893 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -177,6 +177,13 @@ public interface HelixAdmin {
   void enableInstance(String clusterName, String instanceName, boolean enabled);
 
   /**
+   * Disable or enable a resource
+   * @param clusterName
+   * @param resourceName
+   */
+  void enableResource(String clusterName, String resourceName, boolean enabled);
+
+  /**
    * Disable or enable a list of partitions on an instance
    * @param enabled
    * @param clusterName

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 6629bec..4d5c373 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -14,6 +14,7 @@ import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -51,6 +52,8 @@ public class CustomRebalancer implements HelixRebalancer {
       ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     CustomRebalancerConfig config =
         BasicRebalancerConfig.convert(rebalancerConfig, CustomRebalancerConfig.class);
+    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
@@ -65,7 +68,7 @@ public class CustomRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
               .getLiveParticipantMap().keySet(), stateModelDef, config.getPreferenceMap(partition),
-              currentStateMap, disabledInstancesForPartition);
+              currentStateMap, disabledInstancesForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
index 3aa41d7..e00e57c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FallbackRebalancer.java
@@ -160,7 +160,7 @@ public class FallbackRebalancer implements HelixRebalancer {
             ConstraintBasedAssignment.computeCustomizedBestStateForPartition(cluster
                 .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
                 .getParticipantStateMap(partitionId), currentState.getCurrentStateMap(resourceId,
-                partitionId), disabledParticipants);
+                partitionId), disabledParticipants, idealState.isEnabled());
         assignment.addReplicaMap(partitionId, replicaMap);
       }
     } else {
@@ -176,7 +176,7 @@ public class FallbackRebalancer implements HelixRebalancer {
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
                 .getLiveParticipantMap().keySet(), stateModelDef, newIdealState
                 .getPreferenceList(partitionId), currentState.getCurrentStateMap(resourceId,
-                partitionId), disabledParticipants);
+                partitionId), disabledParticipants, idealState.isEnabled());
         assignment.addReplicaMap(partitionId, replicaMap);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
index 13616b3..4bf030b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/FullAutoRebalancer.java
@@ -25,6 +25,7 @@ import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -67,6 +68,8 @@ public class FullAutoRebalancer implements HelixRebalancer {
       ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     FullAutoRebalancerConfig config =
         BasicRebalancerConfig.convert(rebalancerConfig, FullAutoRebalancerConfig.class);
+    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     // Compute a preference list based on the current ideal state
@@ -176,7 +179,7 @@ public class FullAutoRebalancer implements HelixRebalancer {
           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
               liveParticipants.keySet(), stateModelDef, preferenceList,
               currentState.getCurrentStateMap(config.getResourceId(), partition),
-              disabledParticipantsForPartition);
+              disabledParticipantsForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index 51ca463..07f6337 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -15,6 +15,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ResourceCurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -55,6 +56,8 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
     SemiAutoRebalancerConfig config =
         BasicRebalancerConfig.convert(rebalancerConfig, SemiAutoRebalancerConfig.class);
+    IdealState idealState = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    boolean isEnabled = (idealState != null) ? idealState.isEnabled() : true;
     StateModelDefinition stateModelDef =
         cluster.getStateModelMap().get(config.getStateModelDefId());
     if (LOG.isDebugEnabled()) {
@@ -77,7 +80,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
               .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,
-              disabledInstancesForPartition);
+              disabledInstancesForPartition, isEnabled);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
index 727c3df..60e30f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/SemiAutoRebalancerConfig.java
@@ -116,7 +116,7 @@ public final class SemiAutoRebalancerConfig extends PartitionedRebalancerConfig
         Map<ParticipantId, State> emptyCurrentState = Collections.emptyMap();
         Map<ParticipantId, State> initialMap =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, participantSet,
-                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants);
+                stateModelDef, preferenceList, emptyCurrentState, disabledParticipants, true);
         currentMapping.put(partitionId, initialMap);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index 7951784..addd652 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -117,23 +117,24 @@ public class ConstraintBasedAssignment {
    * @param participants participants selected to serve the partition
    * @param disabledParticipants participants that have been disabled for this partition
    * @param initialState the initial state of the resource state model
+   * @param isEnabled true if resource is enabled, false otherwise
    * @return map of participant id to state of dropped and disabled partitions
    */
   public static Map<ParticipantId, State> dropAndDisablePartitions(
       Map<ParticipantId, State> currentStateMap, Collection<ParticipantId> participants,
-      Set<ParticipantId> disabledParticipants, State initialState) {
+      Set<ParticipantId> disabledParticipants, boolean isEnabled, State initialState) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
     // if the resource is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
         if ((participants == null || !participants.contains(participantId))
-            && !disabledParticipants.contains(participantId)) {
+            && !disabledParticipants.contains(participantId) && isEnabled) {
           // if dropped and not disabled, transit to DROPPED
           participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
         } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
             participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipants.contains(participantId)) {
+            && (disabledParticipants.contains(participantId) || !isEnabled)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           participantStateMap.put(participantId, initialState);
         }
@@ -151,16 +152,18 @@ public class ConstraintBasedAssignment {
    * @param currentStateMap
    *          : participant->state for each partition
    * @param disabledParticipantsForPartition
+   * @param isEnabled true if resource is enabled, false if disabled
    * @return
    */
   public static Map<ParticipantId, State> computeAutoBestStateForPartition(
       Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
       StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
-      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+      Map<ParticipantId, State> currentStateMap,
+      Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
     // drop and disable participants if necessary
     Map<ParticipantId, State> participantStateMap =
         dropAndDisablePartitions(currentStateMap, participantPreferenceList,
-            disabledParticipantsForPartition, stateModelDef.getTypedInitialState());
+            disabledParticipantsForPartition, isEnabled, stateModelDef.getTypedInitialState());
 
     // resource is deleted
     if (participantPreferenceList == null) {
@@ -176,7 +179,7 @@ public class ConstraintBasedAssignment {
       if ("N".equals(num)) {
         Set<ParticipantId> liveAndEnabled = new HashSet<ParticipantId>(liveParticipantSet);
         liveAndEnabled.removeAll(disabledParticipantsForPartition);
-        stateCount = liveAndEnabled.size();
+        stateCount = isEnabled ? liveAndEnabled.size() : 0;
       } else if ("R".equals(num)) {
         stateCount = participantPreferenceList.size();
       } else {
@@ -198,7 +201,7 @@ public class ConstraintBasedAssignment {
                       .equals(State.from(HelixDefinedState.ERROR));
 
           if (liveParticipantSet.contains(participantId) && !assigned[i] && notInErrorState
-              && !disabledParticipantsForPartition.contains(participantId)) {
+              && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
             participantStateMap.put(participantId, state);
             count = count + 1;
             assigned[i] = true;
@@ -268,12 +271,13 @@ public class ConstraintBasedAssignment {
    * @param preferenceMap
    * @param currentStateMap
    * @param disabledParticipantsForPartition
+   * @param isEnabled
    * @return
    */
   public static Map<ParticipantId, State> computeCustomizedBestStateForPartition(
       Set<ParticipantId> liveParticipantSet, StateModelDefinition stateModelDef,
       Map<ParticipantId, State> preferenceMap, Map<ParticipantId, State> currentStateMap,
-      Set<ParticipantId> disabledParticipantsForPartition) {
+      Set<ParticipantId> disabledParticipantsForPartition, boolean isEnabled) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
 
     // if the resource is deleted, idealStateMap will be null/empty and
@@ -281,12 +285,12 @@ public class ConstraintBasedAssignment {
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
         if ((preferenceMap == null || !preferenceMap.containsKey(participantId))
-            && !disabledParticipantsForPartition.contains(participantId)) {
+            && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
           // if dropped and not disabled, transit to DROPPED
           participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
         } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
             participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
+            && (disabledParticipantsForPartition.contains(participantId) || !isEnabled)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
           participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
         }
@@ -304,7 +308,7 @@ public class ConstraintBasedAssignment {
               || !currentStateMap.get(participantId).equals(State.from(HelixDefinedState.ERROR));
 
       if (liveParticipantSet.contains(participantId) && notInErrorState
-          && !disabledParticipantsForPartition.contains(participantId)) {
+          && !disabledParticipantsForPartition.contains(participantId) && isEnabled) {
         participantStateMap.put(participantId, preferenceMap.get(participantId));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 2f93b7f..8c34f6b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -120,7 +120,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
           .computeAutoBestStateForPartition(upperBounds, cluster.getLiveParticipantMap().keySet(),
               stateModelDef, null, currentStateOutput.getCurrentStateMap(resourceId, partitionId),
-              disabledParticipantsForPartition));
+              disabledParticipantsForPartition, true));
     }
     return partitionMapping;
   }
@@ -163,7 +163,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       Set<ParticipantId> participants = participantStateMap.keySet();
       Map<ParticipantId, State> droppedAndDisabledMap =
           ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMap, participants,
-              disabledParticipants, initialState);
+              disabledParticipants, true, initialState);
 
       // don't map error participants
       for (ParticipantId participantId : errorParticipants) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6dc5541..da0c80c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants;
@@ -130,7 +131,6 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
     String instanceConfigsPath =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString());
@@ -157,9 +157,6 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public InstanceConfig getInstanceConfig(String clusterName, String instanceName) {
-    // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
-
-    // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
     String instanceConfigPath =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
@@ -168,7 +165,7 @@ public class ZKHelixAdmin implements HelixAdmin {
           + clusterName);
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -182,7 +179,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
     if (!baseAccessor.exists(path, 0)) {
       throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
           + ", instance config does not exist");
@@ -204,13 +201,36 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void enableResource(final String clusterName, final String resourceName,
+      final boolean enabled) {
+    String path = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName
+          + ", ideal-state does not exist");
+    }
+    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
+              + ", ideal-state is null");
+        }
+        IdealState idealState = new IdealState(currentData);
+        idealState.enable(enabled);
+        return idealState.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
   public void enablePartition(final boolean enabled, final String clusterName,
       final String instanceName, final String resourceName, final List<String> partitionNames) {
     String path =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.PARTICIPANT.toString(), instanceName);
 
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     // check instanceConfig exists
     if (!baseAccessor.exists(path, 0)) {
@@ -299,7 +319,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetPartition(String clusterName, String instanceName, String resourceName,
       List<String> partitionNames) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -404,7 +424,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetInstance(String clusterName, List<String> instanceNames) {
     // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -430,7 +450,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void resetResource(String clusterName, List<String> resourceNames) {
     // TODO: not mp-safe
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
@@ -508,7 +528,6 @@ public class ZKHelixAdmin implements HelixAdmin {
     // IDEAL STATE
     _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
     // CONFIGURATIONS
-    // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
     path =
         PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
             ConfigScopeProperty.CLUSTER.toString(), clusterName);
@@ -564,7 +583,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     List<String> instances = _zkClient.getChildren(memberInstancesPath);
     List<String> result = new ArrayList<String>();
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -662,7 +681,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public IdealState getResourceIdealState(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -671,7 +690,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -680,7 +699,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public ExternalView getResourceExternalView(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.externalView(resourceName));
@@ -699,7 +718,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("State model path " + stateModelPath + " already exists.");
     }
 
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
@@ -707,7 +726,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public void dropResource(String clusterName, String resourceName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -722,7 +741,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   @Override
   public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -805,7 +824,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void dropCluster(String clusterName) {
     logger.info("Deleting cluster " + clusterName);
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1067,15 +1086,23 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     int size = (int) file.length();
     byte[] bytes = new byte[size];
-    DataInputStream dis = new DataInputStream(new FileInputStream(file));
-    int read = 0;
-    int numRead = 0;
-    while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
-      read = read + numRead;
+    DataInputStream dis = null;
+    try {
+      dis = new DataInputStream(new FileInputStream(file));
+      int read = 0;
+      int numRead = 0;
+      while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) {
+        read = read + numRead;
+      }
+      return bytes;
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
     }
-    return bytes;
   }
 
+  @Override
   public void addStateModelDef(String clusterName, String stateModelDefName,
       String stateModelDefFile) throws IOException {
     ZNRecord record =
@@ -1091,7 +1118,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void setConstraint(String clusterName, final ConstraintType constraintType,
       final String constraintId, final ConstraintItem constraintItem) {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     Builder keyBuilder = new Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1112,7 +1139,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void removeConstraint(String clusterName, final ConstraintType constraintType,
       final String constraintId) {
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
     Builder keyBuilder = new Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
@@ -1210,7 +1237,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       throw new HelixException("cluster " + clusterName + " instance " + instanceName
           + " is not setup yet");
     }
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
@@ -1238,6 +1265,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
   }
 
+  @Override
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index ed17c5b..dc42a52 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -81,7 +81,8 @@ public class IdealState extends HelixProperty {
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
     REBALANCER_CLASS_NAME,
-    REBALANCER_CONFIG_NAME
+    REBALANCER_CONFIG_NAME,
+    HELIX_ENABLED
   }
 
   public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS";
@@ -821,6 +822,7 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * <<<<<<< HEAD
    * Get the non-Helix simple fields from this property and add them to a UserConfig
    * @param userConfig the user config to update
    */
@@ -950,4 +952,21 @@ public class IdealState extends HelixProperty {
       Map<PartitionId, Map<ParticipantId, State>> participantStateMaps) {
     return ResourceAssignment.stringMapsFromReplicaMaps(participantStateMaps);
   }
+
+  /**
+   * Get if the resource is enabled or not
+   * By default, it's enabled
+   * @return true if enabled; false otherwise
+   */
+  public boolean isEnabled() {
+    return _record.getBooleanField(IdealStateProperty.HELIX_ENABLED.name(), true);
+  }
+
+  /**
+   * Enable/Disable the resource
+   * @param enabled
+   */
+  public void enable(boolean enabled) {
+    _record.setSimpleField(IdealStateProperty.HELIX_ENABLED.name(), Boolean.toString(enabled));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index bddd8d1..2f169cc 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -51,7 +51,7 @@ import org.apache.log4j.Logger;
  *  .invoke(_callback)
  *  .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
  *  .usingLeaderStandbyModel("someUniqueId")
- *  .run()
+ *  .start()
  * </code>
  */
 public class HelixCustomCodeRunner {
@@ -106,6 +106,15 @@ public class HelixCustomCodeRunner {
   }
 
   /**
+   * Get resource name for the custom-code runner
+   * Used for retrieving the external view for the custom-code runner resource
+   * @return resource name for the custom-code runner
+   */
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
    * This method will be invoked when there is a change in any subscribed
    * notificationTypes
    * @throws Exception
@@ -127,8 +136,9 @@ public class HelixCustomCodeRunner {
       // manually add ideal state for participant leader using LeaderStandby
       // model
 
-      zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
-      zkClient.setZkSerializer(new ZNRecordSerializer());
+      zkClient =
+          new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       HelixDataAccessor accessor =
           new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
               zkClient));

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index d97a853..4479f97 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -98,6 +98,8 @@ public class ClusterSetup {
   public static final String addInstanceTag = "addInstanceTag";
   public static final String removeInstanceTag = "removeInstanceTag";
 
+  public static final String enableResource = "enableResource";
+
   // Query info (TBD in V2)
   public static final String listClusterInfo = "listClusterInfo";
   public static final String listInstanceInfo = "listInstanceInfo";
@@ -745,6 +747,11 @@ public class ClusterSetup {
     dropResourceOption.setRequired(false);
     dropResourceOption.setArgName("clusterName resourceName");
 
+    Option enableResourceOption =
+        OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource")
+            .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false")
+            .create();
+
     Option rebalanceOption =
         OptionBuilder.withLongOpt(rebalance).withDescription("Rebalance a resource in a cluster")
             .create();
@@ -795,11 +802,11 @@ public class ClusterSetup {
     partitionInfoOption.setArgName("clusterName resourceName partitionName");
 
     Option enableInstanceOption =
-        OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable a Instance")
+        OptionBuilder.withLongOpt(enableInstance).withDescription("Enable/disable an instance")
             .create();
     enableInstanceOption.setArgs(3);
     enableInstanceOption.setRequired(false);
-    enableInstanceOption.setArgName("clusterName InstanceName true/false");
+    enableInstanceOption.setArgName("clusterName instanceName true/false");
 
     Option enablePartitionOption =
         OptionBuilder.hasArgs().withLongOpt(enablePartition)
@@ -955,6 +962,7 @@ public class ClusterSetup {
     group.addOption(dropInstanceOption);
     group.addOption(swapInstanceOption);
     group.addOption(dropResourceOption);
+    group.addOption(enableResourceOption);
     group.addOption(instanceInfoOption);
     group.addOption(clusterInfoOption);
     group.addOption(resourceInfoOption);
@@ -1258,6 +1266,11 @@ public class ClusterSetup {
 
       setupTool.getClusterManagementTool().enableInstance(clusterName, instanceName, enabled);
       return 0;
+    } else if (cmd.hasOption(enableResource)) {
+      String clusterName = cmd.getOptionValues(enableResource)[0];
+      String resourceName = cmd.getOptionValues(enableResource)[1];
+      boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase());
+      setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled);
     } else if (cmd.hasOption(enablePartition)) {
       String[] args = cmd.getOptionValues(enablePartition);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 7c74035..1322b40 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -239,7 +239,7 @@ public class TestAutoRebalanceStrategy {
         Map<ParticipantId, State> assignment =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
                 liveParticipantSet, _stateModelDef, preferenceList, currentStateMap,
-                disabledParticipantsForPartition);
+                disabledParticipantsForPartition, true);
         mapResult.put(partition, IdealState.stringMapFromParticipantStateMap(assignment));
       }
       return mapResult;

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index 2c26d5d..c8ec90a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -265,7 +265,7 @@ public class TestNewAutoRebalanceStrategy {
         Map<ParticipantId, State> assignment =
             ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds,
                 liveParticipantMap.keySet(), _stateModelDef, participantPreferenceList, replicaMap,
-                disabledParticipantsForPartition);
+                disabledParticipantsForPartition, true);
         mapResult.put(partitionId, assignment);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
new file mode 100644
index 0000000..3223e48
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -0,0 +1,252 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.CustomCodeCallbackHandler;
+import org.apache.helix.participant.HelixCustomCodeRunner;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableCustomCodeRunner extends ZkUnitTestBase {
+
+  private static final int N = 2;
+  private static final int PARTITION_NUM = 1;
+
+  class DummyCallback implements CustomCodeCallbackHandler {
+    private final Map<NotificationContext.Type, Boolean> _callbackInvokeMap =
+        new HashMap<NotificationContext.Type, Boolean>();
+
+    @Override
+    public void onCallback(NotificationContext context) {
+      NotificationContext.Type type = context.getType();
+      _callbackInvokeMap.put(type, Boolean.TRUE);
+    }
+
+    public void reset() {
+      _callbackInvokeMap.clear();
+    }
+
+    public boolean isInitTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.INIT);
+    }
+
+    public boolean isCallbackTypeInvoked() {
+      return _callbackInvokeMap.containsKey(NotificationContext.Type.CALLBACK);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    Map<String, HelixCustomCodeRunner> customCodeRunners =
+        new HashMap<String, HelixCustomCodeRunner>();
+    Map<String, DummyCallback> callbacks =
+        new HashMap<String, DummyCallback>();
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants
+          .put(instanceName, new MockParticipantManager(ZK_ADDR, clusterName, instanceName));
+
+      customCodeRunners.put(instanceName, new HelixCustomCodeRunner(participants.get(instanceName),
+          ZK_ADDR));
+      callbacks.put(instanceName, new DummyCallback());
+
+      customCodeRunners.get(instanceName).invoke(callbacks.get(instanceName))
+          .on(ChangeType.LIVE_INSTANCE)
+          .usingLeaderStandbyModel("TestParticLeader").start();
+      participants.get(instanceName).syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Make sure callback is registered
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final String customCodeRunnerResource =
+        customCodeRunners.get("localhost_12918").getResourceName();
+    ExternalView extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+    Map<String, String> instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+    String leader = null;
+    for (String instance : instanceStates.keySet()) {
+      String state = instanceStates.get(instance);
+      if ("LEADER".equals(state)) {
+        leader = instance;
+        break;
+      }
+    }
+    Assert.assertNotNull(leader);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isInitTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isInitTypeInvoked());
+      }
+      callback.reset();
+    }
+
+    // Disable custom-code runner resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, customCodeRunnerResource, false);
+
+    // Verify that states of custom-code runner are all OFFLINE
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        ExternalView extView =
+            accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+        if (extView == null) {
+          return false;
+        }
+        Set<String> partitionSet = extView.getPartitionSet();
+        if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+          return false;
+        }
+        for (String partition : partitionSet) {
+          Map<String, String> instanceStates = extView.getStateMap(partition);
+          for (String state : instanceStates.values()) {
+            if (!"OFFLINE".equals(state)) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+
+    // Change live-instance should not invoke any custom-code runner
+    LiveInstance fakeInstance = new LiveInstance("fakeInstance");
+    fakeInstance.setSessionId("fakeSessionId");
+    fakeInstance.setHelixVersion("0.6");
+    accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+    Thread.sleep(1000);
+
+    for (DummyCallback callback : callbacks.values()) {
+      Assert.assertFalse(callback.isInitTypeInvoked());
+      Assert.assertFalse(callback.isCallbackTypeInvoked());
+    }
+
+    // Remove fake instance
+    accessor.removeProperty(keyBuilder.liveInstance("fakeInstance"));
+
+    // Re-enable custom-code runner
+    admin.enableResource(clusterName, customCodeRunnerResource, true);
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Verify that custom-invoke is invoked again
+    extView = accessor.getProperty(keyBuilder.externalView(customCodeRunnerResource));
+    instanceStates = extView.getStateMap(customCodeRunnerResource + "_0");
+    leader = null;
+    for (String instance : instanceStates.keySet()) {
+      String state = instanceStates.get(instance);
+      if ("LEADER".equals(state)) {
+        leader = instance;
+        break;
+      }
+    }
+    Assert.assertNotNull(leader);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isInitTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isInitTypeInvoked());
+      }
+      callback.reset();
+    }
+
+    // Add a fake instance should invoke custom-code runner
+    accessor.setProperty(keyBuilder.liveInstance("fakeInstance"), fakeInstance);
+    Thread.sleep(1000);
+    for (String instance : callbacks.keySet()) {
+      DummyCallback callback = callbacks.get(instance);
+      if (instance.equals(leader)) {
+        Assert.assertTrue(callback.isCallbackTypeInvoked());
+      } else {
+        Assert.assertFalse(callback.isCallbackTypeInvoked());
+      }
+    }
+
+    // Clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
new file mode 100644
index 0000000..ea8974d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableResource.java
@@ -0,0 +1,267 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDisableResource extends ZkUnitTestBase {
+  private static final int N = 2;
+  private static final int PARTITION_NUM = 1;
+
+  @Test
+  public void testDisableResourceInSemiAutoMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDisableResourceInFullAutoMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testDisableResourceInCustomMode() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        PARTITION_NUM, // partitions per resource
+        N, // number of nodes
+        2, // replicas
+        "MasterSlave", RebalanceMode.CUSTOMIZED, true); // do rebalance
+
+    // set up custom ideal-state
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.setPartitionState("TestDB0_0", "localhost_12918", "SLAVE");
+    idealState.setPartitionState("TestDB0_0", "localhost_12919", "SLAVE");
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager participants[] = new MockParticipantManager[N];
+    for (int i = 0; i < N; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Disable TestDB0
+    enableResource(clusterName, false);
+    checkExternalView(clusterName);
+
+    // Re-enable TestDB0
+    enableResource(clusterName, true);
+    result =
+        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+            ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // Clean up
+    controller.syncStop();
+    for (int i = 0; i < N; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  private void enableResource(String clusterName, boolean enabled) {
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableResource(clusterName, "TestDB0", enabled);
+  }
+
+  /**
+   * Check all partitions are in OFFLINE state
+   * @param accessor
+   * @throws Exception
+   */
+  private void checkExternalView(String clusterName) throws Exception {
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+
+    // verify that states of TestDB0 are all OFFLINE
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+        ExternalView extView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+        if (extView == null) {
+          return false;
+        }
+        Set<String> partitionSet = extView.getPartitionSet();
+        if (partitionSet == null || partitionSet.size() != PARTITION_NUM) {
+          return false;
+        }
+        for (String partition : partitionSet) {
+          Map<String, String> instanceStates = extView.getStateMap(partition);
+          for (String state : instanceStates.values()) {
+            if (!"OFFLINE".equals(state)) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 89a947f..d3925be 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
@@ -41,6 +43,7 @@ import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -62,7 +65,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
       _gZkClient.deleteRecursive(rootPath);
     }
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
     tool.addCluster(clusterName, true);
@@ -205,7 +208,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
@@ -241,7 +244,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
+    HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
 
@@ -294,4 +297,29 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  @Test
+  public void testDisableResource() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName, true);
+    Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");
+    String resourceName = "TestDB";
+    admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForMasterSlave()));
+    admin.addResource(clusterName, resourceName, 4, "MasterSlave");
+    admin.enableResource(clusterName, resourceName, false);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    Assert.assertFalse(idealState.isEnabled());
+    admin.enableResource(clusterName, resourceName, true);
+    idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
+    Assert.assertTrue(idealState.isEnabled());
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c1af744a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
index d3d6736..a528a20 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterSetup.java
@@ -23,7 +23,10 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Date;
 
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
@@ -36,8 +39,8 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -46,8 +49,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestClusterSetup extends ZkUnitTestBase {
-  private static Logger LOG = Logger.getLogger(TestClusterSetup.class);
-
   protected static final String CLUSTER_NAME = "TestClusterSetup";
   protected static final String TEST_DB = "TestDB";
   protected static final String INSTANCE_PREFIX = "instance_";
@@ -437,4 +438,36 @@ public class TestClusterSetup extends ZkUnitTestBase {
 
   }
 
+  @Test
+  public void testDisableResource() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+    // disable "TestDB0" resource
+    ClusterSetup.processCommandLineArgs(new String[] {
+        "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "false"
+    });
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertFalse(idealState.isEnabled());
+    // enable "TestDB0" resource
+    ClusterSetup.processCommandLineArgs(new String[] {
+        "--zkSvr", ZK_ADDR, "--enableResource", clusterName, "TestDB0", "true"
+    });
+    idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    Assert.assertTrue(idealState.isEnabled());
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
 }


[7/7] git commit: [HELIX-461] Add partitions without top state metric

Posted by ka...@apache.org.
[HELIX-461] Add partitions without top state metric


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6d30c9c5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6d30c9c5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6d30c9c5

Branch: refs/heads/master
Commit: 6d30c9c583b4b8aa1a74244269fa0f9f8ec87a40
Parents: c1af744
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jun 25 10:00:13 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jul 11 10:10:17 2014 -0700

----------------------------------------------------------------------
 .gitignore                                         |  2 ++
 .../stages/ExternalViewComputeStage.java           |  5 ++++-
 .../monitoring/mbeans/ClusterStatusMonitor.java    | 15 ++++++++++++---
 .../helix/monitoring/mbeans/ResourceMonitor.java   | 17 ++++++++++++++++-
 .../monitoring/mbeans/ResourceMonitorMBean.java    |  2 ++
 .../monitoring/mbeans/TestResourceMonitor.java     |  6 +++---
 6 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2eb84c1..2135638 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,6 +24,8 @@ target/
 /recipes/rabbitmq-consumer-group/target/
 .idea
 *.iml
+*.ipr
+*.iws
 .settings/
 out/
 .DS_Store

http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 3086a83..8c6b008 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -50,6 +50,7 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
@@ -124,8 +125,10 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         if (clusterStatusMonitor != null
             && !idealState.getStateModelDefRef().equalsIgnoreCase(
                 DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+          StateModelDefinition stateModelDef =
+              cache.getStateModelDef(idealState.getStateModelDefRef());
           clusterStatusMonitor.setResourceStatus(view,
-              cache._idealStateMap.get(view.getResourceName()));
+              cache._idealStateMap.get(view.getResourceName()), stateModelDef);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 99dee75..1cce342 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -315,21 +316,29 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void setResourceStatus(ExternalView externalView, IdealState idealState) {
+  public void setResourceStatus(ExternalView externalView, IdealState idealState,
+      StateModelDefinition stateModelDef) {
+    String topState = null;
+    if (stateModelDef != null) {
+      List<String> priorityList = stateModelDef.getStatesPriorityList();
+      if (!priorityList.isEmpty()) {
+        topState = priorityList.get(0);
+      }
+    }
     try {
       String resourceName = externalView.getId();
       if (!_resourceMbeanMap.containsKey(resourceName)) {
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
-            bean.updateResource(externalView, idealState);
+            bean.updateResource(externalView, idealState, topState);
             registerResources(Arrays.asList(bean));
           }
         }
       }
       ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
       String oldSensorName = bean.getSensorName();
-      bean.updateResource(externalView, idealState);
+      bean.updateResource(externalView, idealState, topState);
       String newSensorName = bean.getSensorName();
       if (!oldSensorName.equals(newSensorName)) {
         unregisterResources(Arrays.asList(resourceName));

http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 4739fab..ae936b8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.mbeans;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.State;
@@ -30,12 +31,15 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Sets;
+
 public class ResourceMonitor implements ResourceMonitorMBean {
   private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
 
   private int _numOfPartitions;
   private int _numOfPartitionsInExternalView;
   private int _numOfErrorPartitions;
+  private int _numNonTopStatePartitions;
   private int _externalViewIdealStateDiff;
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private String _resourceName;
@@ -57,6 +61,11 @@ public class ResourceMonitor implements ResourceMonitorMBean {
   }
 
   @Override
+  public long getMissingTopStatePartitionGauge() {
+    return _numNonTopStatePartitions;
+  }
+
+  @Override
   public long getDifferenceWithIdealStateGauge() {
     return _externalViewIdealStateDiff;
   }
@@ -71,7 +80,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
     return _resourceName;
   }
 
-  public void updateResource(ExternalView externalView, IdealState idealState) {
+  public void updateResource(ExternalView externalView, IdealState idealState, String topState) {
     if (externalView == null) {
       LOG.warn("external view is null");
       return;
@@ -81,6 +90,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
     if (idealState == null) {
       LOG.warn("ideal state is null for " + resourceName);
       _numOfErrorPartitions = 0;
+      _numNonTopStatePartitions = 0;
       _externalViewIdealStateDiff = 0;
       _numOfPartitionsInExternalView = 0;
       return;
@@ -90,6 +100,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
 
     int numOfErrorPartitions = 0;
     int numOfDiff = 0;
+    Set<PartitionId> topStatePartitions = Sets.newHashSet();
 
     if (_numOfPartitions == 0) {
       _numOfPartitions = idealState.getRecord().getMapFields().size();
@@ -120,11 +131,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
             .equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
           numOfErrorPartitions++;
         }
+        if (topState != null && externalViewRecord.get(host).toString().equalsIgnoreCase(topState)) {
+          topStatePartitions.add(partitionId);
+        }
       }
     }
     _numOfErrorPartitions = numOfErrorPartitions;
     _externalViewIdealStateDiff = numOfDiff;
     _numOfPartitionsInExternalView = externalView.getPartitionIdSet().size();
+    _numNonTopStatePartitions = _numOfPartitions - topStatePartitions.size();
     String tag = idealState.getInstanceGroupTag();
     if (tag == null || tag.equals("") || tag.equals("null")) {
       _tag = ClusterStatusMonitor.DEFAULT_TAG;

http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
index 33b001d..da1e4f8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitorMBean.java
@@ -26,6 +26,8 @@ public interface ResourceMonitorMBean extends SensorNameProvider {
 
   public long getErrorPartitionGauge();
 
+  public long getMissingTopStatePartitionGauge();
+
   public long getDifferenceWithIdealStateGauge();
 
   public long getExternalViewPartitionGauge();

http://git-wip-us.apache.org/repos/asf/helix/blob/6d30c9c5/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index dcca755..caefa8a 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -121,7 +121,7 @@ public class TestResourceMonitor {
     IdealState idealState = new IdealState(idealStateRecord);
     ExternalView externalView = new ExternalView(idealStateRecord);
 
-    monitor.updateResource(externalView, idealState);
+    monitor.updateResource(externalView, idealState, "MASTER");
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -137,7 +137,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(_dbName + "_" + 3 * i, map);
     }
 
-    monitor.updateResource(externalView, idealState);
+    monitor.updateResource(externalView, idealState, "MASTER");
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
     Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
@@ -147,7 +147,7 @@ public class TestResourceMonitor {
       externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
     }
 
-    monitor.updateResource(externalView, idealState);
+    monitor.updateResource(externalView, idealState, "MASTER");
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
     Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);


[3/7] [HELIX-446] Remove ZkPropertyTransfer and restlet dependency from helix-core

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 00537a4..08af37b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -109,11 +109,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
         // System.out.println("participant watch paths: " + watchPaths);
 
-        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-        return watchPaths.size() == 2;
+        // participant should have 1 zk-watcher: 1 for MESSAGE
+        return watchPaths.size() == 1;
       }
     }, 500);
-    Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+    Assert.assertTrue(result, "Participant should have 1 zk-watcher. MESSAGES->HelixTaskExecutor");
 
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
@@ -122,8 +122,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
     Assert.assertEquals(controllerHandlerNb, 9,
         "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
-    Assert.assertEquals(particHandlerNb, 2,
-        "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+    Assert.assertEquals(particHandlerNb, 1,
+        "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
     // expire the session of participant
     System.out.println("Expiring participant session...");
@@ -164,11 +164,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
         // System.out.println("participant watch paths after session expiry: " + watchPaths);
 
-        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-        return watchPaths.size() == 2;
+        // participant should have 1 zk-watcher: 1 for MESSAGE
+        return watchPaths.size() == 1;
       }
     }, 500);
-    Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+    Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
 
     // check handlers
     // printHandlers(controllerManager);
@@ -249,8 +249,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(controllerHandlerNb, 9,
         "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
             + controllerHandlerNb + ", " + printHandlers(controller));
-    Assert.assertEquals(particHandlerNb, 2,
-        "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+    Assert.assertEquals(particHandlerNb, 1,
+        "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
             + particHandlerNb + ", " + printHandlers(participantManager));
 
     // expire controller
@@ -292,11 +292,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
         // System.out.println("participant watch paths after session expiry: " + watchPaths);
 
-        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-        return watchPaths.size() == 2;
+        // participant should have 1 zk-watcher: 1 for MESSAGE
+        return watchPaths.size() == 1;
       }
     }, 500);
-    Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+    Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
 
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
@@ -370,8 +370,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
 
     // check manager#hanlders
-    Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
-        "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+    Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
+        "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
 
     // check zkclient#listeners
     Map<String, Set<IZkDataListener>> dataListeners =
@@ -387,10 +387,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
         + path);
     Assert
-        .assertEquals(
-            childListeners.size(),
-            3,
-            "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+        .assertEquals(childListeners.size(), 2,
+            "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
     path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
@@ -398,20 +396,17 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     Map<String, List<String>> watchPaths =
         ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
     // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
     Assert
-        .assertEquals(
-            watchPaths.get("dataWatches").size(),
-            4,
-            "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
-        "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+        .assertEquals(watchPaths.get("dataWatches").size(), 3,
+            "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+        "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
 
     // expire localhost_12918
     System.out.println("Expire participant: " + participantToExpire.getInstanceName()
@@ -430,8 +425,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert
         .assertEquals(
             participantToExpire.getHandlers().size(),
-            2,
-            "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+            1,
+            "Should have 1 handlers: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
 
     // check zkclient#listeners
     dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
@@ -441,8 +436,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert
         .assertEquals(
             childListeners.size(),
-            3,
-            "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+            2,
+            "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
                 + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
     path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
     Assert.assertEquals(childListeners.get(path).size(), 0,
@@ -451,16 +446,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path),
+        "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
     // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
-    Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
-        "Should have 2 data-watches: CONTROLLER and MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
-        "Should have 2 child-watches: CONTROLLER and MESSAGES");
+    Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+        "Should have 1 data-watches: MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+        "Should have 1 child-watches: MESSAGES");
     Assert
         .assertEquals(watchPaths.get("existWatches").size(), 2,
             "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
@@ -479,10 +474,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
     // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
-    Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
-        "Should have 2 data-watches: CONTROLLER and MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
-        "Should have 2 child-watches: CONTROLLER and MESSAGES");
+    Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+        "Should have 1 data-watches: MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+        "Should have 1 child-watches: MESSAGES");
     Assert
         .assertEquals(
             watchPaths.get("existWatches").size(),

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
deleted file mode 100644
index e065ee3..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.StatusUpdate;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-/**
- * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
- * start 5 dummy participants verify the current states at end
- */
-
-public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase {
-  @Override
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    ZKPropertyTransferServer.PERIOD = 500;
-    ZkPropertyTransferClient.SEND_PERIOD = 500;
-    ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
-    super.beforeClass();
-
-    Thread.sleep(1000);
-    HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder kb = accessor.keyBuilder();
-
-    for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = _participants[i].getInstanceName();
-      List<StatusUpdate> statusUpdates =
-          accessor.getChildValues(kb.stateTransitionStatus(instanceName,
-              _participants[i].getSessionId(), TEST_DB));
-
-      for (int j = 0; j < 10; j++) {
-        statusUpdates =
-            accessor.getChildValues(kb.stateTransitionStatus(instanceName,
-                _participants[i].getSessionId(), TEST_DB));
-        if (statusUpdates.size() == 0) {
-          Thread.sleep(500);
-        } else {
-          break;
-        }
-      }
-      Assert.assertTrue(statusUpdates.size() > 0);
-      for (StatusUpdate update : statusUpdates) {
-        Assert.assertTrue(update.getRecord()
-            .getSimpleField(ZkPropertyTransferClient.USE_PROPERTYTRANSFER).equals("true"));
-        Assert
-            .assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
-      }
-    }
-  }
-
-  @Override
-  @AfterClass
-  public void afterClass() throws Exception {
-    super.afterClass();
-    ZKPropertyTransferServer.getInstance().shutdown();
-    ZKPropertyTransferServer.getInstance().reset();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 8bff2fb..69448dc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -243,8 +243,8 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            2,
-            "Distributed controller should have 2 handlers (message and data-accessor) after lose leadership, but was "
+            1,
+            "Distributed controller should have 1 handler (message) after lose leadership, but was "
                 + handlers.size());
 
     // clean up

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index aa00a8d..f797d1b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -146,8 +146,8 @@ public class TestDistributedControllerManager extends ZkIntegrationTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            2,
-            "Distributed controller should have 2 handlers (message and data-accessor) after lose leadership, but was "
+            1,
+            "Distributed controller should have 1 handler (message) after lose leadership, but was "
                 + handlers.size());
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index 59a3fd9..9b3c32f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -107,11 +107,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
         LOG.debug("participant watch paths: " + watchPaths);
 
-        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-        return watchPaths.size() == 2;
+        // participant should have 1 zk-watcher: 1 for MESSAGE
+        return watchPaths.size() == 1;
       }
     }, 500);
-    Assert.assertTrue(result, "Participant should have 2 zk-watchers.");
+    Assert.assertTrue(result, "Participant should have 1 zk-watcher.");
 
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
@@ -120,8 +120,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
     Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
         "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
-    Assert.assertEquals(particHandlerNb, 2,
-        "HelixParticipant should have 2 (msg+cur-state) callback handlers");
+    Assert.assertEquals(particHandlerNb, 1,
+        "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
     // expire the session of participant
     LOG.debug("Expiring participant session...");
@@ -162,11 +162,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId());
         LOG.debug("participant watch paths after session expiry: " + watchPaths);
 
-        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-        return watchPaths.size() == 2;
+        // participant should have 1 zk-watcher: 1 for MESSAGE
+        return watchPaths.size() == 1;
       }
     }, 500);
-    Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+    Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
 
     // check handlers
     // printHandlers(controllerManager);
@@ -239,8 +239,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
         "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
             + controllerHandlerNb + ", " + TestHelper.printHandlers(controller));
-    Assert.assertEquals(particHandlerNb, 2,
-        "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
+    Assert.assertEquals(particHandlerNb, 1,
+        "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
             + particHandlerNb + ", " + TestHelper.printHandlers(participantManager));
 
     // expire controller
@@ -283,11 +283,11 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId());
         LOG.debug("participant watch paths after session expiry: " + watchPaths);
 
-        // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER
-        return watchPaths.size() == 2;
+        // participant should have 1 zk-watcher: 1 for MESSAGE
+        return watchPaths.size() == 1;
       }
     }, 500);
-    Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry.");
+    Assert.assertTrue(result, "Participant should have 1 zk-watcher after session expiry.");
 
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
@@ -354,8 +354,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
 
     // check manager#hanlders
-    Assert.assertEquals(participantToExpire.getHandlers().size(), 3,
-        "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+    Assert.assertEquals(participantToExpire.getHandlers().size(), 2,
+        "Should have 2 handlers: CURRENTSTATE/{sessionId}, and MESSAGES");
 
     // check zkclient#listeners
     Map<String, Set<IZkDataListener>> dataListeners =
@@ -373,8 +373,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert
         .assertEquals(
             childListeners.size(),
-            3,
-            "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+            2,
+            "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
     path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
@@ -382,8 +382,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path),
+        "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     Map<String, List<String>> watchPaths =
@@ -392,10 +392,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert
         .assertEquals(
             watchPaths.get("dataWatches").size(),
-            4,
-            "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 3,
-        "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+            3,
+            "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
+        "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
 
     // expire localhost_12918
     System.out.println("Expire participant: " + participantToExpire.getInstanceName()
@@ -414,8 +414,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert
         .assertEquals(
             participantToExpire.getHandlers().size(),
-            2,
-            "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+            1,
+            "Should have 1 handler: MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
 
     // check zkclient#listeners
     dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
@@ -425,9 +425,9 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert
         .assertEquals(
             childListeners.size(),
-            3,
-            "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
-                + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
+            2,
+            "Should have 2 paths (CURRENTSTATE/{oldSessionId}, and MESSAGES). "
+                + "MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
     path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
     Assert.assertEquals(childListeners.get(path).size(), 0,
         "Should have no child-listener on path: " + path);
@@ -435,16 +435,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertEquals(childListeners.get(path).size(), 1,
-        "Should have 1 child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path),
+        "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
     LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
-    Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
-        "Should have 2 data-watches: CONTROLLER and MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
-        "Should have 2 child-watches: CONTROLLER and MESSAGES");
+    Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+        "Should have 1 data-watch: MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+        "Should have 1 child-watch: MESSAGES");
     Assert
         .assertEquals(watchPaths.get("existWatches").size(), 2,
             "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
@@ -463,10 +463,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
     LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
-    Assert.assertEquals(watchPaths.get("dataWatches").size(), 2,
-        "Should have 2 data-watches: CONTROLLER and MESSAGES");
-    Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
-        "Should have 2 child-watches: CONTROLLER and MESSAGES");
+    Assert.assertEquals(watchPaths.get("dataWatches").size(), 1,
+        "Should have 1 data-watch: MESSAGES");
+    Assert.assertEquals(watchPaths.get("childWatches").size(), 1,
+        "Should have 1 child-watch: MESSAGES");
     Assert
         .assertEquals(
             watchPaths.get("existWatches").size(),

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index 547e863..c2982b2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -19,13 +19,13 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestLiveInstanceBounce extends ZkStandAloneCMTestBase {
   @Test
   public void testInstanceBounce() throws Exception {
     int handlerSize = _controller.getHandlers().size();

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
deleted file mode 100644
index e01dd07..0000000
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  private static Logger LOG = Logger.getLogger(TestZKPropertyTransferServer.class);
-
-  @Test
-  public void TestControllerChange() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller.syncStop();
-
-    Thread.sleep(1000);
-
-    // kill controller, participant should not know about the svc url
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixDataAccessor accessor = _participants[i].getHelixDataAccessor();
-      ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
-      Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null
-          || zkAccessor._zkPropertyTransferSvcUrl.equals(""));
-    }
-
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    Thread.sleep(1000);
-
-    // create controller again, the svc url is notified to the participants
-    for (int i = 0; i < NODE_NR; i++) {
-      HelixDataAccessor accessor = _participants[i].getHelixDataAccessor();
-      ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
-      Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer
-          .getInstance().getWebserviceUrl()));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/85277291/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
index aeb32f9..a9c028c 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
@@ -19,12 +19,11 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.testng.Assert;
-import org.testng.annotations.Test;
 
-public class TestZkStateChangeListener extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+public class TestZkStateChangeListener extends ZkStandAloneCMTestBase {
   // TODO this test has been covered by TestZkFlapping. check if still needed
   // @Test
   public void testDisconnectHistory() throws Exception {


[2/7] git commit: [HELIX-430] Fix test failures caused by restlet 2.2

Posted by ka...@apache.org.
[HELIX-430] Fix test failures caused by restlet 2.2


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e914edb6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e914edb6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e914edb6

Branch: refs/heads/master
Commit: e914edb6b1796b82234a2555048654f3ae573369
Parents: 208f1fa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue May 20 16:26:09 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 10:51:32 2014 -0700

----------------------------------------------------------------------
 ...x-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy |  4 +-
 .../webapp/TestHelixAdminScenariosRest.java     | 72 ++++++++++++--------
 .../helix-core-0.7.1-incubating-SNAPSHOT.ivy    |  6 +-
 helix-core/pom.xml                              |  4 +-
 pom.xml                                         |  2 +-
 5 files changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy b/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
index 0478f34..35c6158 100644
--- a/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
+++ b/helix-admin-webapp/helix-admin-webapp-0.7.1-incubating-SNAPSHOT.ivy
@@ -38,8 +38,8 @@ under the License.
 	</publications>
 	<dependencies>
 		<dependency org="org.apache.helix" name="helix-core" rev="0.7.1-incubating-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
-		<dependency org="org.restlet" name="org.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
-		<dependency org="com.noelios.restlet" name="com.noelios.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+		<dependency org="org.apache.helix" name="helix-core" rev="0.6.4-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+		<dependency org="org.restlet.jse" name="org.restlet" rev="2.2.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>

http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
index 7f44174..8082f04 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
@@ -44,7 +44,6 @@ import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
-import org.apache.helix.webapp.RestAdminApplication;
 import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
 import org.apache.helix.webapp.resources.InstancesResource.ListInstancesWrapper;
 import org.apache.helix.webapp.resources.JsonParameters;
@@ -68,6 +67,8 @@ import org.testng.annotations.Test;
  * Simulate all the admin tasks needed by using command line tool
  */
 public class TestHelixAdminScenariosRest extends AdminTestBase {
+  private static final int MAX_RETRIES = 5;
+
   RestAdminApplication _adminApp;
   Component _component;
   String _tag1 = "tag1123";
@@ -94,42 +95,53 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
 
   static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
       boolean hasException) throws IOException {
-    Reference resourceRef = new Reference(url);
-
-    Request request = new Request(Method.POST, resourceRef);
-    request.setEntity(
-        JsonParameters.JSON_PARAMETERS + "="
-            + ClusterRepresentationUtil.ObjectToJson(jsonParameters), MediaType.APPLICATION_ALL);
-    Response response = _gClient.handle(request);
-    Representation result = response.getEntity();
-    StringWriter sw = new StringWriter();
-    result.write(sw);
-
-    Assert.assertTrue(response.getStatus().getCode() == Status.SUCCESS_OK.getCode());
-    Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
-    return sw.toString();
+    return assertSuccessPostOperation(url, jsonParameters, null, hasException);
   }
 
   static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
       Map<String, String> extraForm, boolean hasException) throws IOException {
     Reference resourceRef = new Reference(url);
 
-    Request request = new Request(Method.POST, resourceRef);
-    String entity =
-        JsonParameters.JSON_PARAMETERS + "="
-            + ClusterRepresentationUtil.ObjectToJson(jsonParameters);
-    for (String key : extraForm.keySet()) {
-      entity = entity + "&" + (key + "=" + extraForm.get(key));
-    }
-    request.setEntity(entity, MediaType.APPLICATION_ALL);
-    Response response = _gClient.handle(request);
-    Representation result = response.getEntity();
-    StringWriter sw = new StringWriter();
-    result.write(sw);
+    int numRetries = 0;
+    while (numRetries <= MAX_RETRIES) {
+      Request request = new Request(Method.POST, resourceRef);
+
+      if (extraForm != null) {
+        String entity =
+            JsonParameters.JSON_PARAMETERS + "="
+                + ClusterRepresentationUtil.ObjectToJson(jsonParameters);
+        for (String key : extraForm.keySet()) {
+          entity = entity + "&" + (key + "=" + extraForm.get(key));
+        }
+        request.setEntity(entity, MediaType.APPLICATION_ALL);
+      } else {
+        request
+            .setEntity(
+                JsonParameters.JSON_PARAMETERS + "="
+                    + ClusterRepresentationUtil.ObjectToJson(jsonParameters),
+                MediaType.APPLICATION_ALL);
+      }
 
-    Assert.assertTrue(response.getStatus().getCode() == Status.SUCCESS_OK.getCode());
-    Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
-    return sw.toString();
+      Response response = _gClient.handle(request);
+      Representation result = response.getEntity();
+      StringWriter sw = new StringWriter();
+
+      if (result != null) {
+        result.write(sw);
+      }
+
+      int code = response.getStatus().getCode();
+      boolean successCode =
+          code == Status.SUCCESS_NO_CONTENT.getCode() || code == Status.SUCCESS_OK.getCode();
+      if (successCode || numRetries == MAX_RETRIES) {
+        Assert.assertTrue(successCode);
+        Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
+        return sw.toString();
+      }
+      numRetries++;
+    }
+    Assert.fail("Request failed after all retries");
+    return null;
   }
 
   void deleteUrl(String url, boolean hasException) throws IOException {

http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
----------------------------------------------------------------------
diff --git a/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy b/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
index 137fc06..eb37bed 100644
--- a/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
+++ b/helix-core/helix-core-0.7.1-incubating-SNAPSHOT.ivy
@@ -54,11 +54,7 @@ under the License.
     <dependency org="commons-cli" name="commons-cli" rev="1.2" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="commons-math" name="commons-math" rev="2.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="com.github.sgroschupf" name="zkclient" rev="0.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
-    <dependency org="org.apache.camel" name="camel-josql" rev="2.5.0" conf="compile->compile(default);runtime->runtime(default);default->default"/>
-    <dependency org="org.apache.camel" name="camel-core" rev="2.5.0" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
-    <dependency org="net.sf.josql" name="gentlyweb-utils" rev="1.5" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
-    <dependency org="net.sf.josql" name="josql" rev="1.5" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
-    <dependency org="org.fusesource.commonman" name="commons-management" rev="1.0" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+    <dependency org="org.restlet.jse" name="org.restlet" rev="2.2.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
     <dependency org="commons-logging" name="commons-logging-api" rev="1.1" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
     <dependency org="org.restlet" name="org.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
     <dependency org="com.noelios.restlet" name="com.noelios.restlet" rev="1.1.10" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>

http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index a8414f4..b8e7a9a 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -45,7 +45,7 @@ under the License.
       org.apache.zookeeper.txn*;resolution:=optional,
       org.apache.zookeeper*;version="[3.3,4)",
       org.codehaus.jackson*;version="[1.8,2)",
-      org.restlet;version="[2.2.0,3]",
+      org.restlet;version="[2.2.1,3]",
       *
     </osgi.import>
     <osgi.ignore>
@@ -133,7 +133,7 @@ under the License.
     <dependency>
       <groupId>org.restlet.jse</groupId>
       <artifactId>org.restlet</artifactId>
-      <version>2.2.0</version>
+      <version>2.2.1</version>
     </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/helix/blob/e914edb6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index acb8589..8b45e0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -335,7 +335,7 @@ under the License.
       <dependency>
         <groupId>org.restlet.jse</groupId>
         <artifactId>org.restlet</artifactId>
-        <version>2.2.0</version>
+        <version>2.2.1</version>
       </dependency>
       <dependency>
         <groupId>org.apache.helix</groupId>