You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/06/25 07:08:55 UTC

[helix] branch master updated: Merge differences with another branch

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new aa77f5c  Merge differences with another branch
aa77f5c is described below

commit aa77f5c0bfc3cafb6482b1cef4a1689191cbd846
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Tue Jun 25 00:08:44 2019 -0700

    Merge differences with another branch
    
    There are multiple branches against which Helix devs have been doing development work. We wish to consolidate them into one by reconciling all differences. This diff makes such changes. This diff does not contain any changes in logic or functionality.
---
 .../src/main/java/org/apache/helix/HelixAdmin.java |  56 +-
 .../main/java/org/apache/helix/PropertyKey.java    |   2 +-
 .../java/org/apache/helix/PropertyPathBuilder.java |   7 +-
 .../main/java/org/apache/helix/PropertyType.java   |  21 +-
 .../api/config/StateTransitionThrottleConfig.java  |   1 +
 .../stages/StateTransitionThrottleController.java  |  52 +-
 .../controller/strategy/AutoRebalanceStrategy.java | 756 +++++++++++++++++++++
 .../apache/helix/manager/zk/CallbackHandler.java   | 217 +++---
 .../helix/manager/zk/ZkBaseDataAccessor.java       |  42 +-
 .../helix/manager/zk/zookeeper/ZkClient.java       | 315 +++++----
 .../helix/manager/zk/zookeeper/ZkConnection.java   |  18 +-
 .../apache/helix/messaging/CriteriaEvaluator.java  |  26 +-
 .../handling/HelixStateTransitionHandler.java      | 144 ++--
 .../java/org/apache/helix/model/ClusterConfig.java | 110 +--
 .../main/java/org/apache/helix/model/Message.java  |  45 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  55 +-
 .../mbeans/dynamicMBeans/DynamicMBeanProvider.java |  25 +-
 .../org/apache/helix/spectator/RoutingTable.java   |  32 +-
 .../helix/spectator/RoutingTableProvider.java      | 120 ++--
 .../helix/spectator/RoutingTableSnapshot.java      |  22 +-
 .../main/java/org/apache/helix/task/JobConfig.java |   4 +-
 .../helix/task/TaskAssignmentCalculator.java       |   5 +-
 .../java/org/apache/helix/task/TaskConfig.java     |   2 +-
 .../apache/helix/task/TaskStateModelFactory.java   |   9 +-
 .../org/apache/helix/task/UserContentStore.java    |  24 +-
 .../java/org/apache/helix/task/WorkflowConfig.java |   2 +-
 .../helix/task/assigner/TaskAssignResult.java      |   2 +-
 .../org/apache/helix/TestListenerCallback.java     |  24 +-
 .../integration/TestBatchEnableInstances.java      |  49 +-
 .../integration/TestPartitionMovementThrottle.java |   2 +-
 .../TestStateTransitionCancellation.java           |  43 +-
 .../integration/TestStateTransitionThrottle.java   |   2 +-
 .../controller/TestTargetExternalView.java         |  17 +-
 .../helix/integration/task/TestDeleteWorkflow.java |   2 +-
 .../task/TestIndependentTaskRebalancer.java        |   2 +-
 .../integration/task/TestJobAndWorkflowType.java   |   5 +-
 .../helix/integration/task/TestJobFailure.java     |   2 +-
 .../task/TestJobFailureHighThreshold.java          |  30 +-
 .../task/TestJobFailureTaskNotStarted.java         |  62 +-
 .../helix/integration/task/TestJobTimeout.java     |   2 +-
 .../task/TestJobTimeoutTaskNotStarted.java         |  62 +-
 .../integration/task/TestQuotaBasedScheduling.java |   2 +-
 .../integration/task/TestRebalanceRunningTask.java |   5 +-
 .../task/TestTaskAssignmentCalculator.java         |   2 +-
 .../helix/integration/task/TestTaskRebalancer.java |   2 +-
 .../task/TestTaskRebalancerFailover.java           |   7 +-
 .../task/TestTaskRebalancerRetryLimit.java         |   2 +-
 .../helix/integration/task/TestTaskRetryDelay.java |   8 +-
 .../helix/integration/task/TestTaskThrottling.java |   2 +-
 .../task/TestWorkflowJobDependency.java            |  22 +-
 .../integration/task/TestWorkflowTermination.java  | 132 ++--
 .../integration/task/TestWorkflowTimeout.java      |  25 +-
 .../helix/task/TaskSynchronizedTestBase.java       |  31 +-
 .../helix/task/TestAssignableInstanceManager.java  |   2 +-
 ...tAssignableInstanceManagerControllerSwitch.java |   6 +-
 .../task/TestGetLastScheduledTaskExecInfo.java     |   2 +-
 .../helix/task/TestSemiAutoStateTransition.java    |   2 +-
 .../helix/task/assigner/AssignerTestBase.java      |   2 +-
 .../task/assigner/TestAssignableInstance.java      |  10 +-
 .../helix/tools/TestClusterStateVerifier.java      |   6 +-
 .../org/apache/helix/tools/TestHelixAdminCli.java  |   3 -
 .../chooser/helix-list/helix-list.component.html   |   4 +-
 helix-front/pom.xml                                |   9 +-
 helix-rest/pom.xml                                 |   3 +-
 .../server/resources/helix/ResourceAccessor.java   |  56 +-
 .../helix/rest/server/TestResourceAccessor.java    | 112 +--
 pom.xml                                            |  35 +-
 recipes/distributed-lock-manager/pom.xml           |  11 +-
 68 files changed, 1816 insertions(+), 1105 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 6cbcffd..7402c19 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -58,15 +58,13 @@ public interface HelixAdmin {
   InstanceConfig getInstanceConfig(String clusterName, String instanceName);
 
   /**
-   +   * Set the instance config of an existing instance under the given cluster.
-   +   *
-   +   * @param clusterName    the name of the cluster to which this instance belongs.
-   +   * @param instanceName   the name of this instance.
-   +   * @param instanceConfig the new {@link InstanceConfig} that will replace the current one
-   +   *                       associated with this instance.
-   +   *
-   +   * @return true if the operation was successful; false otherwise.
-   +   */
+   * Set the instance config of an existing instance under the given cluster.
+   * @param clusterName the name of the cluster to which this instance belongs.
+   * @param instanceName the name of this instance.
+   * @param instanceConfig the new {@link InstanceConfig} that will replace the current one
+   *          associated with this instance.
+   * @return true if the operation was successful; false otherwise.
+   */
   boolean setInstanceConfig(String clusterName, String instanceName, InstanceConfig instanceConfig);
 
   /**
@@ -77,10 +75,10 @@ public interface HelixAdmin {
   List<String> getResourcesInCluster(String clusterName);
 
   /**
-  * Get a list of resources in a cluster with a tag
-  * @param clusterName
-  * @param tag
-  */
+   * Get a list of resources in a cluster with a tag
+   * @param clusterName
+   * @param tag
+   */
   List<String> getResourcesInClusterWithTag(String clusterName, String tag);
 
   /**
@@ -112,7 +110,8 @@ public interface HelixAdmin {
    * @param numPartitions
    * @param stateModelRef
    */
-  void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef);
+  void addResource(String clusterName, String resourceName, int numPartitions,
+      String stateModelRef);
 
   /**
    * @param clusterName
@@ -129,8 +128,8 @@ public interface HelixAdmin {
    * @param stateModelRef
    * @param rebalancerMode
    */
-  void addResource(String clusterName, String resourceName, int numPartitions,
-      String stateModelRef, String rebalancerMode);
+  void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef,
+      String rebalancerMode);
 
   /**
    * Add a resource to a cluster
@@ -141,8 +140,8 @@ public interface HelixAdmin {
    * @param rebalancerMode
    * @param rebalanceStrategy
    */
-  void addResource(String clusterName, String resourceName, int numPartitions,
-      String stateModelRef, String rebalancerMode, String rebalanceStrategy);
+  void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef,
+      String rebalancerMode, String rebalanceStrategy);
 
   /**
    * Add a resource to a cluster, using a bucket size > 1
@@ -153,8 +152,8 @@ public interface HelixAdmin {
    * @param rebalancerMode
    * @param bucketSize
    */
-  void addResource(String clusterName, String resourceName, int numPartitions,
-      String stateModelRef, String rebalancerMode, int bucketSize);
+  void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef,
+      String rebalancerMode, int bucketSize);
 
   /**
    * Add a resource to a cluster, using a bucket size > 1
@@ -166,9 +165,8 @@ public interface HelixAdmin {
    * @param bucketSize
    * @param maxPartitionsPerInstance
    */
-  void addResource(String clusterName, String resourceName, int numPartitions,
-      String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance);
-
+  void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef,
+      String rebalancerMode, int bucketSize, int maxPartitionsPerInstance);
 
   /**
    * Add a resource to a cluster, using a bucket size > 1
@@ -181,8 +179,8 @@ public interface HelixAdmin {
    * @param bucketSize
    * @param maxPartitionsPerInstance
    */
-  void addResource(String clusterName, String resourceName, int numPartitions,
-      String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+  void addResource(String clusterName, String resourceName, int numPartitions, String stateModelRef,
+      String rebalancerMode, String rebalanceStrategy, int bucketSize,
       int maxPartitionsPerInstance);
 
   /**
@@ -276,8 +274,8 @@ public interface HelixAdmin {
   /**
    * @param clusterName
    * @param enabled
-   * @param reason      set additional string description on why the cluster is disabled when
-   *                    <code>enabled</code> is false.
+   * @param reason set additional string description on why the cluster is disabled when
+   *          <code>enabled</code> is false.
    */
   void enableCluster(String clusterName, boolean enabled, String reason);
 
@@ -543,7 +541,6 @@ public interface HelixAdmin {
   /**
    * Enable/disable batch message mode for specified cluster.
    * By default batch message mode is disabled.
-   *
    * @param clusterName
    * @param enabled
    */
@@ -558,8 +555,6 @@ public interface HelixAdmin {
    */
   void enableBatchMessageMode(String clusterName, String resourceName, boolean enabled);
 
-
-
   /**
    * Get batch disabled instance map (disabled instance -> disabled time) in a cluster. It will
    * include disabled instances and instances in disabled zones
@@ -570,7 +565,6 @@ public interface HelixAdmin {
 
   /**
    * Get list of instances by domain for a cluster
-   *
    * Example : domain could be "helixZoneId=1,rackId=3". All the instances domain contains these
    * two domains will be selected.
    * @param clusterName
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 6350b3c..369e48e 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -822,4 +822,4 @@ public class PropertyKey {
     return _configScope;
   }
 
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index 2bde247..a0de770 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.PropertyType.*;
 
-
 /**
  * Utility mapping properties to their Zookeeper locations
  */
@@ -139,8 +138,8 @@ public class PropertyPathBuilder {
     // RESOURCE
     addEntry(PropertyType.WORKFLOWCONTEXT, 2,
         "/{clusterName}/PROPERTYSTORE/TaskRebalancer/{workflowName}/Context"); // Old
-                                                                               // WorkflowContext
-                                                                               // path
+    // WorkflowContext
+    // path
     addEntry(PropertyType.TASK_CONFIG_ROOT, 1, "/{clusterName}/CONFIGS/TASK");
     addEntry(PropertyType.WORKFLOW_CONFIG, 3,
         "/{clusterName}/CONFIGS/TASK/{workflowName}/{workflowName}");
@@ -380,4 +379,4 @@ public class PropertyPathBuilder {
   public static String maintenance(String clusterName) {
     return String.format("/%s/CONTROLLER/MAINTENANCE", clusterName);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyType.java b/helix-core/src/main/java/org/apache/helix/PropertyType.java
index e234a3b..f879249 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyType.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyType.java
@@ -79,7 +79,6 @@ public enum PropertyType {
   // REST PROPERTIES
   RESTCONFIGS(Type.REST, true, false, false, false, true);
 
-
   // @formatter:on
 
   Type type;
@@ -102,23 +101,23 @@ public enum PropertyType {
     this(type, isPersistent, mergeOnUpdate, false);
   }
 
-  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate,
-      boolean updateOnlyOnExists) {
+  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate, boolean updateOnlyOnExists) {
     this(type, isPersistent, mergeOnUpdate, false, false);
   }
 
-  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate,
-      boolean updateOnlyOnExists, boolean createOnlyIfAbsent) {
+  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate, boolean updateOnlyOnExists,
+      boolean createOnlyIfAbsent) {
     this(type, isPersistent, mergeOnUpdate, updateOnlyOnExists, createOnlyIfAbsent, false);
   }
 
-  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate,
-      boolean updateOnlyOnExists, boolean createOnlyIfAbsent, boolean isCached) {
-    this(type, isPersistent, mergeOnUpdate, updateOnlyOnExists, createOnlyIfAbsent, isCached, false);
+  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate, boolean updateOnlyOnExists,
+      boolean createOnlyIfAbsent, boolean isCached) {
+    this(type, isPersistent, mergeOnUpdate, updateOnlyOnExists, createOnlyIfAbsent, isCached,
+        false);
   }
 
-  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate,
-      boolean updateOnlyOnExists, boolean createOnlyIfAbsent, boolean isCached, boolean isAsyncWrite) {
+  PropertyType(Type type, boolean isPersistent, boolean mergeOnUpdate, boolean updateOnlyOnExists,
+      boolean createOnlyIfAbsent, boolean isCached, boolean isAsyncWrite) {
     this.type = type;
     this.isPersistent = isPersistent;
     this.mergeOnUpdate = mergeOnUpdate;
@@ -214,4 +213,4 @@ public enum PropertyType {
   public boolean isCached() {
     return isCached;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index 548ae31..65791e9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -38,6 +38,7 @@ public class StateTransitionThrottleConfig {
     REBALANCE_TYPE,
     THROTTLE_SCOPE,
     MAX_PARTITION_IN_TRANSITION
+
   }
 
   public enum ThrottleScope {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
index 4a2baa7..ecfe256 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -68,33 +68,33 @@ class StateTransitionThrottleController {
 
     for (StateTransitionThrottleConfig config : throttleConfigs) {
       switch (config.getThrottleScope()) {
-        case CLUSTER:
-          _pendingTransitionAllowedInCluster.put(config.getRebalanceType(),
-              config.getMaxPartitionInTransition());
-          _throttleEnabled = true;
-          break;
-        case RESOURCE:
-          for (String resource : resources) {
-            if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
-              _pendingTransitionAllowedPerResource.put(resource,
-                  new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
-            }
-            _pendingTransitionAllowedPerResource.get(resource).put(config.getRebalanceType(),
-                config.getMaxPartitionInTransition());
+      case CLUSTER:
+        _pendingTransitionAllowedInCluster.put(config.getRebalanceType(),
+            config.getMaxPartitionInTransition());
+        _throttleEnabled = true;
+        break;
+      case RESOURCE:
+        for (String resource : resources) {
+          if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
+            _pendingTransitionAllowedPerResource.put(resource,
+                new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
           }
-          _throttleEnabled = true;
-          break;
-        case INSTANCE:
-          for (String instance : liveInstances) {
-            if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
-              _pendingTransitionAllowedPerInstance.put(instance,
-                  new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
-            }
-            _pendingTransitionAllowedPerInstance.get(instance).put(config.getRebalanceType(),
-                config.getMaxPartitionInTransition());
+          _pendingTransitionAllowedPerResource.get(resource).put(config.getRebalanceType(),
+              config.getMaxPartitionInTransition());
+        }
+        _throttleEnabled = true;
+        break;
+      case INSTANCE:
+        for (String instance : liveInstances) {
+          if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
+            _pendingTransitionAllowedPerInstance.put(instance,
+                new HashMap<StateTransitionThrottleConfig.RebalanceType, Long>());
           }
-          _throttleEnabled = true;
-          break;
+          _pendingTransitionAllowedPerInstance.get(instance).put(config.getRebalanceType(),
+              config.getMaxPartitionInTransition());
+        }
+        _throttleEnabled = true;
+        break;
       }
     }
   }
@@ -254,4 +254,4 @@ class StateTransitionThrottleController {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
new file mode 100644
index 0000000..d445664
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -0,0 +1,756 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * The class was moved to controller.rebalancer.strategy.
+ */
+@Deprecated
+public class AutoRebalanceStrategy {
+
+  private static Logger logger = LoggerFactory.getLogger(AutoRebalanceStrategy.class);
+
+  private final String _resourceName;
+  private final List<String> _partitions;
+  private final LinkedHashMap<String, Integer> _states;
+  private final int _maximumPerNode;
+  private final ReplicaPlacementScheme _placementScheme;
+
+  private Map<String, Node> _nodeMap;
+  private List<Node> _liveNodesList;
+  private Map<Integer, String> _stateMap;
+
+  private Map<Replica, Node> _preferredAssignment;
+  private Map<Replica, Node> _existingPreferredAssignment;
+  private Map<Replica, Node> _existingNonPreferredAssignment;
+  private Set<Replica> _orphaned;
+
+  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode,
+      ReplicaPlacementScheme placementScheme) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+    _maximumPerNode = maximumPerNode;
+    if (placementScheme != null) {
+      _placementScheme = placementScheme;
+    } else {
+      _placementScheme = new DefaultPlacementScheme();
+    }
+  }
+
+  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states) {
+    this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
+  }
+
+  public ZNRecord computePartitionAssignment(final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
+    int numReplicas = countStateReplicas();
+    ZNRecord znRecord = new ZNRecord(_resourceName);
+    if (liveNodes.size() == 0) {
+      return znRecord;
+    }
+    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
+    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+    _nodeMap = new HashMap<String, Node>();
+    _liveNodesList = new ArrayList<Node>();
+
+    for (String id : allNodes) {
+      Node node = new Node(id);
+      node.capacity = 0;
+      node.hasCeilingCapacity = false;
+      _nodeMap.put(id, node);
+    }
+    for (int i = 0; i < liveNodes.size(); i++) {
+      boolean usingCeiling = false;
+      int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
+      if (distRemainder > 0 && targetSize < _maximumPerNode) {
+        targetSize += 1;
+        distRemainder = distRemainder - 1;
+        usingCeiling = true;
+      }
+      Node node = _nodeMap.get(liveNodes.get(i));
+      node.isAlive = true;
+      node.capacity = targetSize;
+      node.hasCeilingCapacity = usingCeiling;
+      _liveNodesList.add(node);
+    }
+
+    // compute states for all replica ids
+    _stateMap = generateStateMap();
+
+    // compute the preferred mapping if all nodes were up
+    _preferredAssignment = computePreferredPlacement(allNodes);
+
+    // logger.info("preferred mapping:"+ preferredAssignment);
+    // from current mapping derive the ones in preferred location
+    // this will update the nodes with their current fill status
+    _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
+
+    // from current mapping derive the ones not in preferred location
+    _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
+
+    // compute orphaned replicas that are not assigned to any node
+    _orphaned = computeOrphaned();
+    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
+      logger.info("orphan = " + _orphaned);
+    }
+
+    moveNonPreferredReplicasToPreferred();
+
+    assignOrphans();
+
+    moveExcessReplicas();
+
+    prepareResult(znRecord);
+    return znRecord;
+  }
+
+  /**
+   * Move replicas assigned to non-preferred nodes if their current node is at capacity
+   * and its preferred node is under capacity.
+   */
+  private void moveNonPreferredReplicasToPreferred() {
+    // iterate through non preferred and see if we can move them to the
+    // preferred location if the donor has more than it should and stealer has
+    // enough capacity
+    Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Replica, Node> entry = iterator.next();
+      Replica replica = entry.getKey();
+      Node donor = entry.getValue();
+      Node receiver = _preferredAssignment.get(replica);
+      if (donor.capacity < donor.currentlyAssigned && receiver.capacity > receiver.currentlyAssigned
+          && receiver.canAdd(replica)) {
+        donor.currentlyAssigned = donor.currentlyAssigned - 1;
+        receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+        donor.nonPreferred.remove(replica);
+        receiver.preferred.add(replica);
+        donor.newReplicas.remove(replica);
+        receiver.newReplicas.add(replica);
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
+   */
+  private void assignOrphans() {
+    // now iterate over nodes and remaining orphaned partitions and assign
+    // partitions randomly
+    // Better to iterate over orphaned partitions first
+    Iterator<Replica> it = _orphaned.iterator();
+    while (it.hasNext()) {
+      Replica replica = it.next();
+      boolean added = false;
+      int startIndex = computeRandomStartIndex(replica);
+      for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+        Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+        if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
+          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+          receiver.nonPreferred.add(replica);
+          receiver.newReplicas.add(replica);
+          added = true;
+          break;
+        }
+      }
+      if (!added) {
+        // try adding the replica by making room for it
+        added = assignOrphanByMakingRoom(replica);
+      }
+      if (added) {
+        it.remove();
+      }
+    }
+    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
+      logger.warn("could not assign nodes to partitions: " + _orphaned);
+    }
+  }
+
+  /**
+   * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
+   * @param replica The replica to assign
+   * @return true if the assignment succeeded, false otherwise
+   */
+  private boolean assignOrphanByMakingRoom(Replica replica) {
+    Node capacityDonor = null;
+    Node capacityAcceptor = null;
+    int startIndex = computeRandomStartIndex(replica);
+    for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+      Node current = _liveNodesList.get(index % _liveNodesList.size());
+      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
+          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
+        // this node has space but cannot accept the node
+        capacityDonor = current;
+      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
+          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
+        // this node would be able to accept the replica if it has ceiling capacity
+        capacityAcceptor = current;
+      }
+      if (capacityDonor != null && capacityAcceptor != null) {
+        break;
+      }
+    }
+    if (capacityDonor != null && capacityAcceptor != null) {
+      // transfer ceiling capacity and add the node
+      capacityAcceptor.steal(capacityDonor, replica);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Move replicas from too-full nodes to nodes that can accept the replicas
+   */
+  private void moveExcessReplicas() {
+    // iterate over nodes and move extra load
+    Iterator<Replica> it;
+    for (Node donor : _liveNodesList) {
+      if (donor.capacity < donor.currentlyAssigned) {
+        Collections.sort(donor.nonPreferred);
+        it = donor.nonPreferred.iterator();
+        while (it.hasNext()) {
+          Replica replica = it.next();
+          int startIndex = computeRandomStartIndex(replica);
+          for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+            if (receiver.canAdd(replica)) {
+              receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+              receiver.nonPreferred.add(replica);
+              donor.currentlyAssigned = donor.currentlyAssigned - 1;
+              it.remove();
+              break;
+            }
+          }
+          if (donor.capacity >= donor.currentlyAssigned) {
+            break;
+          }
+        }
+        if (donor.capacity < donor.currentlyAssigned) {
+          logger.warn("Could not take partitions out of node:" + donor.id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Update a ZNRecord with the results of the rebalancing.
+   * @param znRecord
+   */
+  private void prepareResult(ZNRecord znRecord) {
+    // The map fields are keyed on partition name to a pair of node and state, i.e. it
+    // indicates that the partition with given state is served by that node
+    //
+    // The list fields are also keyed on partition and list all the nodes serving that partition.
+    // This is useful to verify that there is no node serving multiple replicas of the same
+    // partition.
+    Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
+    for (String partition : _partitions) {
+      znRecord.setMapField(partition, new TreeMap<String, String>());
+      znRecord.setListField(partition, new ArrayList<String>());
+      newPreferences.put(partition, new ArrayList<String>());
+    }
+
+    // for preference lists, the rough priority that we want is:
+    // [existing preferred, existing non-preferred, non-existing preferred, non-existing
+    // non-preferred]
+    for (Node node : _liveNodesList) {
+      for (Replica replica : node.preferred) {
+        if (node.newReplicas.contains(replica)) {
+          newPreferences.get(replica.partition).add(node.id);
+        } else {
+          znRecord.getListField(replica.partition).add(node.id);
+        }
+      }
+    }
+    for (Node node : _liveNodesList) {
+      for (Replica replica : node.nonPreferred) {
+        if (node.newReplicas.contains(replica)) {
+          newPreferences.get(replica.partition).add(node.id);
+        } else {
+          znRecord.getListField(replica.partition).add(node.id);
+        }
+      }
+    }
+    normalizePreferenceLists(znRecord.getListFields(), newPreferences);
+
+    // generate preference maps based on the preference lists
+    for (String partition : _partitions) {
+      List<String> preferenceList = znRecord.getListField(partition);
+      int i = 0;
+      for (String participant : preferenceList) {
+        znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+        i++;
+      }
+    }
+  }
+
+  /**
+   * Adjust preference lists to reduce the number of same replicas on an instance. This will
+   * separately normalize two sets of preference lists, and then append the results of the second
+   * set to those of the first. This basically ensures that existing replicas are automatically
+   * preferred.
+   * @param preferenceLists map of (partition --> list of nodes)
+   * @param newPreferences map containing node preferences not consistent with the current
+   *          assignment
+   */
+  private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
+      Map<String, List<String>> newPreferences) {
+
+    Map<String, Map<String, Integer>> nodeReplicaCounts =
+        new HashMap<String, Map<String, Integer>>();
+    for (String partition : preferenceLists.keySet()) {
+      normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
+    }
+    for (String partition : newPreferences.keySet()) {
+      normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
+      preferenceLists.get(partition).addAll(newPreferences.get(partition));
+    }
+  }
+
+  /**
+   * Adjust a single preference list for replica assignment imbalance
+   * @param preferenceList list of node names
+   * @param nodeReplicaCounts map of (node --> state --> count)
+   */
+  private void normalizePreferenceList(List<String> preferenceList,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    List<String> newPreferenceList = new ArrayList<String>();
+    int replicas = Math.min(countStateReplicas(), preferenceList.size());
+
+    // make this a LinkedHashSet to preserve iteration order
+    Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
+    for (int i = 0; i < replicas; i++) {
+      String state = _stateMap.get(i);
+      String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
+      newPreferenceList.add(node);
+      notAssigned.remove(node);
+      Map<String, Integer> counts = nodeReplicaCounts.get(node);
+      counts.put(state, counts.get(state) + 1);
+    }
+    preferenceList.clear();
+    preferenceList.addAll(newPreferenceList);
+  }
+
+  /**
+   * Get the node which hosts the fewest of a given replica
+   * @param state the state
+   * @param nodes nodes to check
+   * @param nodeReplicaCounts current assignment of replicas
+   * @return the node most willing to accept the replica
+   */
+  private String getMinimumNodeForReplica(String state, Set<String> nodes,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    String minimalNode = null;
+    int minimalCount = Integer.MAX_VALUE;
+    for (String node : nodes) {
+      int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
+      if (count < minimalCount) {
+        minimalCount = count;
+        minimalNode = node;
+      }
+    }
+    return minimalNode;
+  }
+
+  /**
+   * Safe check for the number of replicas of a given id assiged to a node
+   * @param state the state to assign
+   * @param node the node to check
+   * @param nodeReplicaCounts a map of node to replica id and counts
+   * @return the number of currently assigned replicas of the given id
+   */
+  private int getReplicaCountForNode(String state, String node,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    if (!nodeReplicaCounts.containsKey(node)) {
+      Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
+      replicaCounts.put(state, 0);
+      nodeReplicaCounts.put(node, replicaCounts);
+      return 0;
+    }
+    Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
+    if (!replicaCounts.containsKey(state)) {
+      replicaCounts.put(state, 0);
+      return 0;
+    }
+    return replicaCounts.get(state);
+  }
+
+  /**
+   * Compute the subset of the current mapping where replicas are not mapped according to their
+   * preferred assignment.
+   * @param currentMapping Current mapping of replicas to nodes
+   * @return The current assignments that do not conform to the preferred assignment
+   */
+  private Map<Replica, Node> computeExistingNonPreferredPlacement(
+      Map<String, Map<String, String>> currentMapping) {
+    Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
+    int count = countStateReplicas();
+    for (String partition : currentMapping.keySet()) {
+      Map<String, String> nodeStateMap = currentMapping.get(partition);
+      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+      for (String nodeId : nodeStateMap.keySet()) {
+        Node node = _nodeMap.get(nodeId);
+        boolean skip = false;
+        for (Replica replica : node.preferred) {
+          if (replica.partition.equals(partition)) {
+            skip = true;
+            break;
+          }
+        }
+        if (skip) {
+          continue;
+        }
+        // check if its in one of the preferred position
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          if (!_preferredAssignment.containsKey(replica)) {
+
+            logger.warn("partitions: " + _partitions);
+            logger.warn("currentMapping.keySet: " + currentMapping.keySet());
+            throw new IllegalArgumentException(
+                "partition: " + replica + " is in currentMapping but not in partitions");
+          }
+
+          if (_preferredAssignment.get(replica).id != node.id
+              && !_existingPreferredAssignment.containsKey(replica)
+              && !existingNonPreferredAssignment.containsKey(replica)) {
+            existingNonPreferredAssignment.put(replica, node);
+            node.nonPreferred.add(replica);
+
+            break;
+          }
+        }
+      }
+    }
+    return existingNonPreferredAssignment;
+  }
+
+  /**
+   * Get a live node index to try first for a replica so that each possible start index is
+   * roughly uniformly assigned.
+   * @param replica The replica to assign
+   * @return The starting node index to try
+   */
+  private int computeRandomStartIndex(final Replica replica) {
+    return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+  }
+
+  /**
+   * Get a set of replicas not currently assigned to any node
+   * @return Unassigned replicas
+   */
+  private Set<Replica> computeOrphaned() {
+    Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
+    for (Replica r : _existingPreferredAssignment.keySet()) {
+      if (orphanedPartitions.contains(r)) {
+        orphanedPartitions.remove(r);
+      }
+    }
+    for (Replica r : _existingNonPreferredAssignment.keySet()) {
+      if (orphanedPartitions.contains(r)) {
+        orphanedPartitions.remove(r);
+      }
+    }
+
+    return orphanedPartitions;
+  }
+
+  /**
+   * Determine the replicas already assigned to their preferred nodes
+   * @param currentMapping Current assignment of replicas to nodes
+   * @return Assignments that conform to the preferred placement
+   */
+  private Map<Replica, Node> computeExistingPreferredPlacement(
+      final Map<String, Map<String, String>> currentMapping) {
+    Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
+    int count = countStateReplicas();
+    for (String partition : currentMapping.keySet()) {
+      Map<String, String> nodeStateMap = currentMapping.get(partition);
+      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+      for (String nodeId : nodeStateMap.keySet()) {
+        Node node = _nodeMap.get(nodeId);
+        node.currentlyAssigned = node.currentlyAssigned + 1;
+        // check if its in one of the preferred position
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          if (_preferredAssignment.containsKey(replica)
+              && !existingPreferredAssignment.containsKey(replica)
+              && _preferredAssignment.get(replica).id == node.id) {
+            existingPreferredAssignment.put(replica, node);
+            node.preferred.add(replica);
+            break;
+          }
+        }
+      }
+    }
+
+    return existingPreferredAssignment;
+  }
+
+  /**
+   * Given a predefined set of all possible nodes, compute an assignment of replicas to
+   * nodes that evenly assigns all replicas to nodes.
+   * @param allNodes Identifiers to all nodes, live and non-live
+   * @return Preferred assignment of replicas
+   */
+  private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
+    Map<Replica, Node> preferredMapping;
+    preferredMapping = new HashMap<Replica, Node>();
+    int partitionId = 0;
+    int numReplicas = countStateReplicas();
+    int count = countStateReplicas();
+    for (String partition : _partitions) {
+      for (int replicaId = 0; replicaId < count; replicaId++) {
+        Replica replica = new Replica(partition, replicaId);
+        String nodeName = _placementScheme.getLocation(partitionId, replicaId, _partitions.size(),
+            numReplicas, allNodes);
+        preferredMapping.put(replica, _nodeMap.get(nodeName));
+      }
+      partitionId = partitionId + 1;
+    }
+    return preferredMapping;
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   * @return
+   */
+  private int countStateReplicas() {
+    int total = 0;
+    for (Integer count : _states.values()) {
+      total += count;
+    }
+    return total;
+  }
+
+  /**
+   * Compute a map of replica ids to state names
+   * @return Map: replica id -> state name
+   */
+  private Map<Integer, String> generateStateMap() {
+    int replicaId = 0;
+    Map<Integer, String> stateMap = new HashMap<Integer, String>();
+    for (String state : _states.keySet()) {
+      Integer count = _states.get(state);
+      for (int i = 0; i < count; i++) {
+        stateMap.put(replicaId, state);
+        replicaId++;
+      }
+    }
+    return stateMap;
+  }
+
+  /**
+   * A Node is an entity that can serve replicas. It has a capacity and knowledge
+   * of replicas assigned to it, so it can decide if it can receive additional replicas.
+   */
+  class Node {
+    public int currentlyAssigned;
+    public int capacity;
+    public boolean hasCeilingCapacity;
+    private final String id;
+    boolean isAlive;
+    private final List<Replica> preferred;
+    private final List<Replica> nonPreferred;
+    private final Set<Replica> newReplicas;
+
+    public Node(String id) {
+      preferred = new ArrayList<Replica>();
+      nonPreferred = new ArrayList<Replica>();
+      newReplicas = new TreeSet<Replica>();
+      currentlyAssigned = 0;
+      isAlive = false;
+      this.id = id;
+    }
+
+    /**
+     * Check if this replica can be legally added to this node
+     * @param replica The replica to test
+     * @return true if the assignment can be made, false otherwise
+     */
+    public boolean canAdd(Replica replica) {
+      if (currentlyAssigned >= capacity) {
+        return false;
+      }
+      return canAddIfCapacity(replica);
+    }
+
+    /**
+     * Check if this replica can be legally added to this node, provided that it has enough
+     * capacity.
+     * @param replica The replica to test
+     * @return true if the assignment can be made, false otherwise
+     */
+    public boolean canAddIfCapacity(Replica replica) {
+      if (!isAlive) {
+        return false;
+      }
+      for (Replica r : preferred) {
+        if (r.partition.equals(replica.partition)) {
+          return false;
+        }
+      }
+      for (Replica r : nonPreferred) {
+        if (r.partition.equals(replica.partition)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Receive a replica by stealing capacity from another Node
+     * @param donor The node that has excess capacity
+     * @param replica The replica to receive
+     */
+    public void steal(Node donor, Replica replica) {
+      donor.hasCeilingCapacity = false;
+      donor.capacity--;
+      hasCeilingCapacity = true;
+      capacity++;
+      currentlyAssigned++;
+      nonPreferred.add(replica);
+      newReplicas.add(replica);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
+          .append("\nnonpreferred:").append(nonPreferred.size());
+      return sb.toString();
+    }
+  }
+
+  /**
+   * A Replica is a combination of a partition of the resource, the state the replica is in
+   * and an identifier signifying a specific replica of a given partition and state.
+   */
+  class Replica implements Comparable<Replica> {
+    private String partition;
+    private int replicaId; // this is a partition-relative id
+    private String format;
+
+    public Replica(String partition, int replicaId) {
+      this.partition = partition;
+      this.replicaId = replicaId;
+      this.format = this.partition + "|" + this.replicaId;
+    }
+
+    @Override
+    public String toString() {
+      return format;
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that instanceof Replica) {
+        return this.format.equals(((Replica) that).format);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.format.hashCode();
+    }
+
+    @Override
+    public int compareTo(Replica that) {
+      if (that instanceof Replica) {
+        return this.format.compareTo(that.format);
+      }
+      return -1;
+    }
+  }
+
+  /**
+   * Interface for providing a custom approach to computing a replica's affinity to a node.
+   */
+  public interface ReplicaPlacementScheme {
+    /**
+     * Initialize global state
+     * @param manager The instance to which this placement is associated
+     */
+    public void init(final HelixManager manager);
+
+    /**
+     * Given properties of this replica, determine the node it would prefer to be served by
+     * @param partitionId The current partition
+     * @param replicaId The current replica with respect to the current partition
+     * @param numPartitions The total number of partitions
+     * @param numReplicas The total number of replicas per partition
+     * @param nodeNames A list of identifiers of all nodes, live and non-live
+     * @return The name of the node that would prefer to serve this replica
+     */
+    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+        final List<String> nodeNames);
+  }
+
+  /**
+   * Compute preferred placements based on a default strategy that assigns replicas to nodes as
+   * evenly as possible while avoiding placing two replicas of the same partition on any node.
+   */
+  public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
+    @Override
+    public void init(final HelixManager manager) {
+      // do nothing since this is independent of the manager
+    }
+
+    @Override
+    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+        final List<String> nodeNames) {
+      int index;
+      if (nodeNames.size() > numPartitions) {
+        // assign replicas in partition order in case there are more nodes than partitions
+        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
+      } else if (nodeNames.size() == numPartitions) {
+        // need a replica offset in case the sizes of these sets are the same
+        index = ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
+            % nodeNames.size();
+      } else {
+        // in all other cases, assigning a replica at a time for each partition is reasonable
+        index = (partitionId + replicaId) % nodeNames.size();
+      }
+      return nodeNames.get(index);
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 969805e..8de6cad 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.HelixConstants.ChangeType.*;
 
-@PreFetch (enabled = false)
+@PreFetch(enabled = false)
 public class CallbackHandler implements IZkChildListener, IZkDataListener {
   private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
 
@@ -106,34 +106,33 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
   // TODO: make this be per _manager or per _listener instaed of per callbackHandler -- Lei
   private CallbackProcessor _batchCallbackProcessor;
-  private boolean _watchChild = true;  // Whether we should subscribe to the child znode's data change.
+  private boolean _watchChild = true; // Whether we should subscribe to the child znode's data
+                                      // change.
 
   // indicated whether this CallbackHandler is ready to serve event callback from ZkClient.
   private boolean _ready = false;
 
   static {
-    SubscribeChangeEventProcessor =
-        new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>("Singleton",
-            "CallbackHanlder-AsycSubscribe") {
-          @Override
-          protected void handleEvent(SubscribeChangeEvent event) {
-            logger.info("Resubscribe change listener to path: " + event.path + ", for listener: "
-                + event.listener + ", watchChild: " + event.watchChild);
-            try {
-              if (event.handler.isReady()) {
-                event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
-              } else {
-                logger.info(
-                    "CallbackHandler is not ready, stop subscribing changes listener to path: "
-                        + event.path + ", for listener: " + event.listener + ", watchChild: "
-                        + event.watchChild);
-              }
-            } catch (Exception e) {
-              logger.error("Failed to resubscribe change to path: " + event.path + " for listener "
-                  + event.listener, e);
-            }
+    SubscribeChangeEventProcessor = new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>(
+        "Singleton", "CallbackHanlder-AsycSubscribe") {
+      @Override
+      protected void handleEvent(SubscribeChangeEvent event) {
+        logger.info("Resubscribe change listener to path: " + event.path + ", for listener: "
+            + event.listener + ", watchChild: " + event.watchChild);
+        try {
+          if (event.handler.isReady()) {
+            event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
+          } else {
+            logger.info("CallbackHandler is not ready, stop subscribing changes listener to path: "
+                + event.path + ", for listener: " + event.listener + ", watchChild: "
+                + event.watchChild);
           }
-        };
+        } catch (Exception e) {
+          logger.error("Failed to resubscribe change to path: " + event.path + " for listener "
+              + event.listener, e);
+        }
+      }
+    };
 
     SubscribeChangeEventProcessor.start();
   }
@@ -187,16 +186,15 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   }
 
   public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
-        Object listener, EventType[] eventTypes, ChangeType changeType,
-        HelixCallbackMonitor monitor) {
+      Object listener, EventType[] eventTypes, ChangeType changeType,
+      HelixCallbackMonitor monitor) {
     if (listener == null) {
       throw new HelixException("listener could not be null");
     }
 
     if (monitor != null && !monitor.getChangeType().equals(changeType)) {
-      throw new HelixException(
-          "The specified callback monitor is for different change type: " + monitor.getChangeType()
-              .name());
+      throw new HelixException("The specified callback monitor is for different change type: "
+          + monitor.getChangeType().name());
     }
 
     _manager = manager;
@@ -221,7 +219,6 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     init();
   }
 
-
   private void parseListenerProperties() {
     BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
     PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
@@ -247,47 +244,48 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
     Class listenerClass = null;
     switch (_changeType) {
-      case IDEAL_STATE:
-        listenerClass = IdealStateChangeListener.class;
-        break;
-      case INSTANCE_CONFIG:
-        if (_listener instanceof ConfigChangeListener) {
-          listenerClass = ConfigChangeListener.class;
-        } else if (_listener instanceof InstanceConfigChangeListener) {
-          listenerClass = InstanceConfigChangeListener.class;
-        }
-        break;
-      case CLUSTER_CONFIG:
-        listenerClass = ClusterConfigChangeListener.class;
-        break;
-      case RESOURCE_CONFIG:
-        listenerClass = ResourceConfigChangeListener.class;
-        break;
-      case CONFIG:
+    case IDEAL_STATE:
+      listenerClass = IdealStateChangeListener.class;
+      break;
+    case INSTANCE_CONFIG:
+      if (_listener instanceof ConfigChangeListener) {
         listenerClass = ConfigChangeListener.class;
-        break;
-      case LIVE_INSTANCE:
-        listenerClass = LiveInstanceChangeListener.class;
-        break;
-      case CURRENT_STATE:
-        listenerClass = CurrentStateChangeListener.class;        ;
-        break;
-      case MESSAGE:
-      case MESSAGES_CONTROLLER:
-        listenerClass = MessageListener.class;
-        break;
-      case EXTERNAL_VIEW:
-      case TARGET_EXTERNAL_VIEW:
-        listenerClass = ExternalViewChangeListener.class;
-        break;
-      case CONTROLLER:
-        listenerClass = ControllerChangeListener.class;
+      } else if (_listener instanceof InstanceConfigChangeListener) {
+        listenerClass = InstanceConfigChangeListener.class;
+      }
+      break;
+    case CLUSTER_CONFIG:
+      listenerClass = ClusterConfigChangeListener.class;
+      break;
+    case RESOURCE_CONFIG:
+      listenerClass = ResourceConfigChangeListener.class;
+      break;
+    case CONFIG:
+      listenerClass = ConfigChangeListener.class;
+      break;
+    case LIVE_INSTANCE:
+      listenerClass = LiveInstanceChangeListener.class;
+      break;
+    case CURRENT_STATE:
+      listenerClass = CurrentStateChangeListener.class;
+      ;
+      break;
+    case MESSAGE:
+    case MESSAGES_CONTROLLER:
+      listenerClass = MessageListener.class;
+      break;
+    case EXTERNAL_VIEW:
+    case TARGET_EXTERNAL_VIEW:
+      listenerClass = ExternalViewChangeListener.class;
+      break;
+    case CONTROLLER:
+      listenerClass = ControllerChangeListener.class;
     }
 
     Method callbackMethod = listenerClass.getMethods()[0];
     try {
-      Method method = _listener.getClass()
-          .getMethod(callbackMethod.getName(), callbackMethod.getParameterTypes());
+      Method method = _listener.getClass().getMethod(callbackMethod.getName(),
+          callbackMethod.getParameterTypes());
       BatchMode batchModeInMethod = method.getAnnotation(BatchMode.class);
       PreFetch preFetchInMethod = method.getAnnotation(PreFetch.class);
       if (batchModeInMethod != null) {
@@ -297,13 +295,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         _preFetchEnabled = preFetchInMethod.enabled();
       }
     } catch (NoSuchMethodException e) {
-      logger.warn(
-          "No method " + callbackMethod.getName() + " defined in listener " + _listener.getClass()
-              .getCanonicalName());
+      logger.warn("No method " + callbackMethod.getName() + " defined in listener "
+          + _listener.getClass().getCanonicalName());
     }
   }
 
-
   public Object getListener() {
     return _listener;
   }
@@ -312,21 +308,21 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     return _path;
   }
 
-  public void enqueueTask(NotificationContext changeContext)
-      throws Exception {
-    //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the callback's immediately.
+  public void enqueueTask(NotificationContext changeContext) throws Exception {
+    // async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the
+    // callback's immediately.
     if (_batchModeEnabled && changeContext.getType() == NotificationContext.Type.CALLBACK) {
       logger.debug("Enqueuing callback");
       if (!isReady()) {
-        logger.info(
-            "CallbackHandler is not ready, ignore change callback from path: "
-                + _path + ", for listener: " + _listener);
+        logger.info("CallbackHandler is not ready, ignore change callback from path: " + _path
+            + ", for listener: " + _listener);
       } else {
         synchronized (this) {
           if (_batchCallbackProcessor != null) {
             _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext);
           } else {
-            throw new HelixException("Failed to process callback in batch mode. Batch Callback Processor does not exist.");
+            throw new HelixException(
+                "Failed to process callback in batch mode. Batch Callback Processor does not exist.");
           }
         }
       }
@@ -346,15 +342,13 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     // This allows the listener to work with one change at a time
     synchronized (_manager) {
       if (logger.isInfoEnabled()) {
-        logger.info(
-            Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:" + _listener
-                + " type: " + type);
+        logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
+            + _listener + " type: " + type);
       }
 
       if (!_expectTypes.contains(type)) {
-        logger.warn(
-            "Callback handler received event in wrong order. Listener: " + _listener + ", path: "
-                + _path + ", expected types: " + _expectTypes + " but was " + type);
+        logger.warn("Callback handler received event in wrong order. Listener: " + _listener
+            + ", path: " + _path + ", expected types: " + _expectTypes + " but was " + type);
         return;
 
       }
@@ -400,12 +394,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         listener.onConfigChange(configs, changeContext);
 
       } else if (_changeType == LIVE_INSTANCE) {
-        LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener) _listener;
+        LiveInstanceChangeListener liveInstanceChangeListener =
+            (LiveInstanceChangeListener) _listener;
         List<LiveInstance> liveInstances = preFetch(_propertyKey);
         liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
 
       } else if (_changeType == CURRENT_STATE) {
-        CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener;
+        CurrentStateChangeListener currentStateChangeListener =
+            (CurrentStateChangeListener) _listener;
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
         List<CurrentState> currentStates = preFetch(_propertyKey);
         currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
@@ -435,9 +431,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
       long end = System.currentTimeMillis();
       if (logger.isInfoEnabled()) {
-        logger.info(
-            Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:" + _listener
-                + " type: " + type + " Took: " + (end - start) + "ms");
+        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
+            + _listener + " type: " + type + " Took: " + (end - start) + "ms");
       }
       if (_monitor != null) {
         _monitor.increaseCallbackCounters(end - start);
@@ -457,13 +452,13 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     if (callbackType == NotificationContext.Type.INIT
         || callbackType == NotificationContext.Type.CALLBACK) {
       if (logger.isDebugEnabled()) {
-        logger.debug(
-            _manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener: " + _listener);
+        logger.debug(_manager.getInstanceName() + " subscribes child-change. path: " + path
+            + ", listener: " + _listener);
       }
       _zkClient.subscribeChildChanges(path, this);
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
-      logger.info(
-          _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener: " + _listener);
+      logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " + path
+          + ", listener: " + _listener);
 
       _zkClient.unsubscribeChildChanges(path, this);
     }
@@ -473,22 +468,21 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     if (callbackType == NotificationContext.Type.INIT
         || callbackType == NotificationContext.Type.CALLBACK) {
       if (logger.isDebugEnabled()) {
-        logger.debug(
-            _manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener: "
-                + _listener);
+        logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " + path
+            + ", listener: " + _listener);
       }
       _zkClient.subscribeDataChanges(path, this);
     } else if (callbackType == NotificationContext.Type.FINALIZE) {
-      logger.info(
-          _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener: "
-              + _listener);
+      logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " + path
+          + ", listener: " + _listener);
 
       _zkClient.unsubscribeDataChanges(path, this);
     }
   }
 
   /** Subscribe Changes in asynchronously */
-  private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path, boolean watchChild) {
+  private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path,
+      boolean watchChild) {
     SubscribeChangeEvent subscribeEvent =
         new SubscribeChangeEvent(this, callbackType, path, watchChild, _listener);
     SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, subscribeEvent);
@@ -496,13 +490,13 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
   private void subscribeForChanges(NotificationContext.Type callbackType, String path,
       boolean watchChild) {
-    logger.info(
-        "Subscribing changes listener to path: " + path + ", type: " + callbackType + ", listener: "
-            + _listener);
+    logger.info("Subscribing changes listener to path: " + path + ", type: " + callbackType
+        + ", listener: " + _listener);
 
     long start = System.currentTimeMillis();
-    if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes
-        .contains(EventType.NodeCreated) || _eventTypes.contains(EventType.NodeDeleted)) {
+    if (_eventTypes.contains(EventType.NodeDataChanged)
+        || _eventTypes.contains(EventType.NodeCreated)
+        || _eventTypes.contains(EventType.NodeDeleted)) {
       logger.info("Subscribing data change listener to path: " + path);
       subscribeDataChange(path, callbackType);
     }
@@ -620,7 +614,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         enqueueTask(changeContext);
       }
     } catch (Exception e) {
-      String msg = "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
+      String msg =
+          "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
       ZKExceptionHandler.getInstance().handle(msg, e);
     }
   }
@@ -732,15 +727,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   }
 
   public String getContent() {
-    return "CallbackHandler{" +
-        "_watchChild=" + _watchChild +
-        ", _preFetchEnabled=" + _preFetchEnabled +
-        ", _batchModeEnabled=" + _batchModeEnabled +
-        ", _path='" + _path + '\'' +
-        ", _listener=" + _listener +
-        ", _changeType=" + _changeType +
-        ", _manager=" + _manager +
-        ", _zkClient=" + _zkClient +
-        '}';
+    return "CallbackHandler{" + "_watchChild=" + _watchChild + ", _preFetchEnabled="
+        + _preFetchEnabled + ", _batchModeEnabled=" + _batchModeEnabled + ", _path='" + _path + '\''
+        + ", _listener=" + _listener + ", _changeType=" + _changeType + ", _manager=" + _manager
+        + ", _zkClient=" + _zkClient + '}';
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 8d932c8..a4dfe21 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -337,10 +337,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     return get(paths, stats, needRead, false);
   }
 
-
   @Override
-  public List<T> get(List<String> paths, List<Stat> stats, int options,
-      boolean throwException) throws HelixException {
+  public List<T> get(List<String> paths, List<Stat> stats, int options, boolean throwException)
+      throws HelixException {
     boolean[] needRead = new boolean[paths.size()];
     Arrays.fill(needRead, true);
 
@@ -401,7 +400,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             stats.set(i, cb._stat);
           }
         } else if (Code.get(cb.getRc()) != Code.NONODE && throwException) {
-          throw new HelixMetaDataAccessException(String.format("Failed to read node %s", paths.get(i)));
+          throw new HelixMetaDataAccessException(
+              String.format("Failed to read node %s", paths.get(i)));
         } else {
           pathFailToRead.put(paths.get(i), cb.getRc());
         }
@@ -442,12 +442,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         return records;
       } catch (HelixMetaDataAccessException e) {
         if (readCount == 0) {
-          throw new HelixMetaDataAccessException(String.format("Failed to get full list of %s", parentPath), e);
+          throw new HelixMetaDataAccessException(
+              String.format("Failed to get full list of %s", parentPath), e);
         }
         try {
           Thread.sleep(retryInterval);
         } catch (InterruptedException interruptedException) {
-          throw new HelixMetaDataAccessException("Fail to interrupt the sleep", interruptedException);
+          throw new HelixMetaDataAccessException("Fail to interrupt the sleep",
+              interruptedException);
         }
       }
     }
@@ -535,7 +537,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   @Override
   public boolean remove(String path, int options) {
     try {
-      // operation will not throw exception  when path successfully deleted or does not exist
+      // operation will not throw exception when path successfully deleted or does not exist
       // despite real error, operation will throw exception when path not empty, and in this
       // case, we try to delete recursively
       _zkClient.delete(path);
@@ -585,8 +587,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         _zkClient.asyncCreate(path, record, mode, cbList[i]);
       }
 
-      List<String> parentPaths =
-          new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
+      List<String> parentPaths = new ArrayList<>(Collections.<String> nCopies(paths.size(), null));
       boolean failOnNoNode = false;
 
       for (int i = 0; i < paths.size(); i++) {
@@ -657,7 +658,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     boolean[] needCreate = new boolean[paths.size()];
     Arrays.fill(needCreate, true);
     List<List<String>> pathsCreated =
-        new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null));
+        new ArrayList<>(Collections.<List<String>> nCopies(paths.size(), null));
 
     long startT = System.nanoTime();
     try {
@@ -674,8 +675,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
+            + (endT - startT) + " ns");
       }
     }
   }
@@ -711,7 +712,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       return success;
     }
 
-    List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null));
+    List<Stat> setStats = new ArrayList<>(Collections.<Stat> nCopies(paths.size(), null));
     SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
     CreateCallbackHandler[] createCbList = null;
     boolean[] needSet = new boolean[paths.size()];
@@ -836,8 +837,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * async update
    * return: updatedData on success or null on fail
    */
-  List<T> update(List<String> paths, List<DataUpdater<T>> updaters,
-      List<List<String>> pathsCreated, List<Stat> stats, int options) {
+  List<T> update(List<String> paths, List<DataUpdater<T>> updaters, List<List<String>> pathsCreated,
+      List<Stat> stats, int options) {
     if (paths == null || paths.size() == 0) {
       LOG.error("paths is null or empty");
       return Collections.emptyList();
@@ -845,7 +846,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
     if (updaters.size() != paths.size()
         || (pathsCreated != null && pathsCreated.size() != paths.size())) {
-      throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
+      throw new IllegalArgumentException(
+          "paths, updaters, and pathsCreated should be of same size");
     }
 
     List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
@@ -1029,8 +1031,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
+            + (endT - startT) + " ns");
       }
     }
   }
@@ -1067,8 +1069,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: "
+            + (endT - startT) + " ns");
       }
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 418140b..5dd000d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -65,7 +65,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper.
+ * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on
+ * nodes in ZooKeeper.
  * WARN: Do not use this class directly, use {@link org.apache.helix.manager.zk.ZkClient} instead.
  */
 public class ZkClient implements Watcher {
@@ -74,8 +75,7 @@ public class ZkClient implements Watcher {
 
   private final IZkConnection _connection;
   private final long _operationRetryTimeoutInMillis;
-  private final Map<String, Set<IZkChildListener>> _childListener =
-      new ConcurrentHashMap<>();
+  private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener =
       new ConcurrentHashMap<>();
   private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<>();
@@ -159,12 +159,12 @@ public class ZkClient implements Watcher {
         long updateTime = Math.max(stat.getCtime(), stat.getMtime());
         if (notificationTime.getAsLong() > updateTime) {
           _monitor.recordDataPropagationLatency(_path, notificationTime.getAsLong() - updateTime);
-        } // else, the node was updated again after the notification. Propagation latency is unavailable.
+        } // else, the node was updated again after the notification. Propagation latency is
+          // unavailable.
       }
     }
   }
 
-
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
@@ -179,11 +179,10 @@ public class ZkClient implements Watcher {
 
     // initiate monitor
     try {
-      if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null && !monitorType
-          .isEmpty()) {
-        _monitor =
-            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly,
-                _eventThread);
+      if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null
+          && !monitorType.isEmpty()) {
+        _monitor = new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName,
+            monitorRootPathOnly, _eventThread);
         _monitor.register();
       } else {
         LOG.info("ZkClient monitor key or type is not provided. Skip monitoring.");
@@ -228,9 +227,8 @@ public class ZkClient implements Watcher {
       listenerEntries.add(listenerEntry);
       if (prefetchEnabled) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Subscribed data changes for " + path + ", listener: " + listener + ", prefetch data: "
-                  + prefetchEnabled);
+          LOG.debug("Subscribed data changes for " + path + ", listener: " + listener
+              + ", prefetch data: " + prefetchEnabled);
         }
       }
     }
@@ -248,16 +246,15 @@ public class ZkClient implements Watcher {
 
     Method callbackMethod = IZkDataListener.class.getMethods()[0];
     try {
-      Method method = dataListener.getClass()
-          .getMethod(callbackMethod.getName(), callbackMethod.getParameterTypes());
+      Method method = dataListener.getClass().getMethod(callbackMethod.getName(),
+          callbackMethod.getParameterTypes());
       PreFetch preFetchInMethod = method.getAnnotation(PreFetch.class);
       if (preFetchInMethod != null) {
         return preFetchInMethod.enabled();
       }
     } catch (NoSuchMethodException e) {
-      LOG.warn(
-          "No method " + callbackMethod.getName() + " defined in listener " + dataListener.getClass()
-              .getCanonicalName());
+      LOG.warn("No method " + callbackMethod.getName() + " defined in listener "
+          + dataListener.getClass().getCanonicalName());
     }
 
     return true;
@@ -304,16 +301,15 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent node.
-   *
    * @param path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createPersistent(String path)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -322,19 +318,19 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent node and set its ACLs.
-   *
    * @param path
    * @param createParents
-   *            if true all parent dirs are created as well and no {@link ZkNodeExistsException} is thrown in case the
-   *            path already exists
+   *          if true all parent dirs are created as well and no {@link ZkNodeExistsException} is
+   *          thrown in case the
+   *          path already exists
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createPersistent(String path, boolean createParents)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -343,21 +339,21 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent node and set its ACLs.
-   *
    * @param path
    * @param acl
-   *            List of ACL permissions to assign to the node
+   *          List of ACL permissions to assign to the node
    * @param createParents
-   *            if true all parent dirs are created as well and no {@link ZkNodeExistsException} is thrown in case the
-   *            path already exists
+   *          if true all parent dirs are created as well and no {@link ZkNodeExistsException} is
+   *          thrown in case the
+   *          path already exists
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createPersistent(String path, boolean createParents, List<ACL> acl)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -379,17 +375,16 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent node.
-   *
    * @param path
    * @param data
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createPersistent(String path, Object data)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -398,18 +393,17 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent node.
-   *
    * @param path
    * @param data
    * @param acl
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createPersistent(String path, Object data, List<ACL> acl) {
     create(path, data, acl, CreateMode.PERSISTENT);
@@ -417,18 +411,17 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent, sequental node.
-   *
    * @param path
    * @param data
    * @return create node's path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public String createPersistentSequential(String path, Object data)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -437,19 +430,18 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a persistent, sequential node and set its ACL.
-   *
    * @param path
    * @param acl
    * @param data
    * @return create node's path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public String createPersistentSequential(String path, Object data, List<ACL> acl)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -458,16 +450,15 @@ public class ZkClient implements Watcher {
 
   /**
    * Create an ephemeral node.
-   *
    * @param path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createEphemeral(final String path)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -476,17 +467,16 @@ public class ZkClient implements Watcher {
 
   /**
    * Create an ephemeral node and set its ACL.
-   *
    * @param path
    * @param acl
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createEphemeral(final String path, final List<ACL> acl)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -495,19 +485,18 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a node.
-   *
    * @param path
    * @param data
    * @param mode
    * @return create node's path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public String create(final String path, Object data, final CreateMode mode)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -516,20 +505,19 @@ public class ZkClient implements Watcher {
 
   /**
    * Create a node with ACL.
-   *
    * @param path
    * @param datat
    * @param acl
    * @param mode
    * @return create node's path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode)
       throws IllegalArgumentException, ZkException {
@@ -564,17 +552,16 @@ public class ZkClient implements Watcher {
 
   /**
    * Create an ephemeral node.
-   *
    * @param path
    * @param data
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createEphemeral(final String path, final Object data)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -583,18 +570,17 @@ public class ZkClient implements Watcher {
 
   /**
    * Create an ephemeral node.
-   *
    * @param path
    * @param data
    * @param acl
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public void createEphemeral(final String path, final Object data, final List<ACL> acl)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -603,18 +589,17 @@ public class ZkClient implements Watcher {
 
   /**
    * Create an ephemeral, sequential node.
-   *
    * @param path
    * @param data
    * @return created path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public String createEphemeralSequential(final String path, final Object data)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -623,19 +608,18 @@ public class ZkClient implements Watcher {
 
   /**
    * Create an ephemeral, sequential node with ACL.
-   *
    * @param path
    * @param data
    * @param acl
    * @return created path
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs
+   *           if any other exception occurs
    */
   public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
@@ -657,7 +641,6 @@ public class ZkClient implements Watcher {
         || event.getType() == Event.EventType.NodeCreated
         || event.getType() == Event.EventType.NodeChildrenChanged;
 
-
     if (event.getType() == Event.EventType.NodeDeleted) {
       if (LOG.isDebugEnabled()) {
         String path = event.getPath();
@@ -685,7 +668,8 @@ public class ZkClient implements Watcher {
       if (stateChanged) {
         getEventLock().getStateChangedCondition().signalAll();
 
-        // If the session expired we have to signal all conditions, because watches might have been removed and
+        // If the session expired we have to signal all conditions, because watches might have been
+        // removed and
         // there is no guarantee that those
         // conditions will be signaled at all after an Expired event
         // TODO PVo write a test for this
@@ -748,10 +732,8 @@ public class ZkClient implements Watcher {
     }
   }
 
-
   /**
    * Counts number of children for the given path.
-   *
    * @param path
    * @return number of children or 0 if path does not exist.
    */
@@ -767,7 +749,6 @@ public class ZkClient implements Watcher {
     return exists(path, hasListeners(path));
   }
 
-
   protected boolean exists(final String path, final boolean watch) {
     long startT = System.currentTimeMillis();
     try {
@@ -872,7 +853,8 @@ public class ZkClient implements Watcher {
     for (final IZkStateListener stateListener : _stateListener) {
       _eventThread.send(new ZkEvent("New session event sent to " + stateListener) {
 
-        @Override public void run() throws Exception {
+        @Override
+        public void run() throws Exception {
           stateListener.handleNewSession();
         }
       });
@@ -883,7 +865,8 @@ public class ZkClient implements Watcher {
     for (final IZkStateListener stateListener : _stateListener) {
       _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) {
 
-        @Override public void run() throws Exception {
+        @Override
+        public void run() throws Exception {
           stateListener.handleStateChanged(state);
         }
       });
@@ -895,7 +878,8 @@ public class ZkClient implements Watcher {
       _eventThread
           .send(new ZkEvent("Session establishment error(" + error + ") sent to " + stateListener) {
 
-            @Override public void run() throws Exception {
+            @Override
+            public void run() throws Exception {
               stateListener.handleSessionEstablishmentError(error);
             }
           });
@@ -967,7 +951,8 @@ public class ZkClient implements Watcher {
         || event.getType() == EventType.NodeDeleted) {
       Set<IZkChildListener> childListeners = _childListener.get(path);
       if (childListeners != null && !childListeners.isEmpty()) {
-        // TODO recording child changed event propagation latency as well. Note this change will introduce additional ZK access.
+        // TODO recording child changed event propagation latency as well. Note this change will
+        // introduce additional ZK access.
         fireChildChangedEvents(path, childListeners);
       }
     }
@@ -987,9 +972,8 @@ public class ZkClient implements Watcher {
       final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       // Trigger listener callbacks
       for (final IZkDataListenerEntry listener : listeners) {
-        _eventThread.send(new ZkEvent(
-            "Data of " + path + " changed sent to " + listener.getDataListener()
-                + " prefetch data: " + listener.isPrefetchData()) {
+        _eventThread.send(new ZkEvent("Data of " + path + " changed sent to "
+            + listener.getDataListener() + " prefetch data: " + listener.isPrefetchData()) {
           @Override
           public void run() throws Exception {
             // Reinstall watch before listener callbacks to check the znode status
@@ -1032,7 +1016,8 @@ public class ZkClient implements Watcher {
           public void run() throws Exception {
             // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)), OptionalLong.empty());
+              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)),
+                  OptionalLong.empty());
             }
             List<String> children = null;
             if (pathStatRecord.pathExists()) {
@@ -1120,18 +1105,17 @@ public class ZkClient implements Watcher {
   }
 
   /**
-   *
    * @param <T>
    * @param callable
    * @return result of Callable
    * @throws ZkInterruptedException
-   *             if operation was interrupted, or a required reconnection got interrupted
+   *           if operation was interrupted, or a required reconnection got interrupted
    * @throws IllegalArgumentException
-   *             if called from anything except the ZooKeeper event thread
+   *           if called from anything except the ZooKeeper event thread
    * @throws ZkException
-   *             if any ZooKeeper exception occurred
+   *           if any ZooKeeper exception occurred
    * @throws RuntimeException
-   *             if any other exception occurs from invoking the Callable
+   *           if any other exception occurs from invoking the Callable
    */
   public <T> T retryUntilConnected(final Callable<T> callable)
       throws IllegalArgumentException, ZkException {
@@ -1172,8 +1156,8 @@ public class ZkClient implements Watcher {
         }
         // before attempting a retry, check whether retry timeout has elapsed
         if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) {
-          throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" + _operationRetryTimeoutInMillis
-              + " milli seconds)");
+          throw new ZkTimeoutException("Operation cannot be retried because of retry timeout ("
+              + _operationRetryTimeoutInMillis + " milli seconds)");
         }
       }
     } finally {
@@ -1197,10 +1181,11 @@ public class ZkClient implements Watcher {
   }
 
   /**
-   * Returns a mutex all zookeeper events are synchronized aginst. So in case you need to do something without getting
-   * any zookeeper event interruption synchronize against this mutex. Also all threads waiting on this mutex object
+   * Returns a mutex all zookeeper events are synchronized aginst. So in case you need to do
+   * something without getting
+   * any zookeeper event interruption synchronize against this mutex. Also all threads waiting on
+   * this mutex object
    * will be notified on an event.
-   *
    * @return the mutex.
    */
   public ZkLock getEventLock() {
@@ -1271,12 +1256,13 @@ public class ZkClient implements Watcher {
     return (T) _pathBasedZkSerializer.deserialize(data, path);
   }
 
-  @SuppressWarnings("unchecked") public <T extends Object> T readData(String path) {
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T readData(String path) {
     return (T) readData(path, false);
   }
 
-  @SuppressWarnings("unchecked") public <T extends Object> T readData(String path,
-      boolean returnNullIfPathNotExists) {
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) {
     T data = null;
     try {
       data = (T) readData(path, null);
@@ -1300,7 +1286,8 @@ public class ZkClient implements Watcher {
     try {
       data = retryUntilConnected(new Callable<byte[]>() {
 
-        @Override public byte[] call() throws Exception {
+        @Override
+        public byte[] call() throws Exception {
           return getConnection().readData(path, stat, watch);
         }
       });
@@ -1336,19 +1323,22 @@ public class ZkClient implements Watcher {
   }
 
   /**
-   * Updates data of an existing znode. The current content of the znode is passed to the {@link DataUpdater} that is
-   * passed into this method, which returns the new content. The new content is only written back to ZooKeeper if
-   * nobody has modified the given znode in between. If a concurrent change has been detected the new data of the
-   * znode is passed to the updater once again until the new contents can be successfully written back to ZooKeeper.
-   *
+   * Updates data of an existing znode. The current content of the znode is passed to the
+   * {@link DataUpdater} that is
+   * passed into this method, which returns the new content. The new content is only written back to
+   * ZooKeeper if
+   * nobody has modified the given znode in between. If a concurrent change has been detected the
+   * new data of the
+   * znode is passed to the updater once again until the new contents can be successfully written
+   * back to ZooKeeper.
    * @param <T>
    * @param path
-   *            The path of the znode.
+   *          The path of the znode.
    * @param updater
-   *            Updater that creates the new contents.
+   *          Updater that creates the new contents.
    */
-  @SuppressWarnings("unchecked") public <T extends Object> void updateDataSerialized(String path,
-      DataUpdater<T> updater) {
+  @SuppressWarnings("unchecked")
+  public <T extends Object> void updateDataSerialized(String path, DataUpdater<T> updater) {
     Stat stat = new Stat();
     boolean retry;
     do {
@@ -1373,7 +1363,8 @@ public class ZkClient implements Watcher {
       final byte[] data = serialize(datat, path);
       checkDataSizeLimit(data);
       final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
-        @Override public Object call() throws Exception {
+        @Override
+        public Object call() throws Exception {
           return getConnection().writeDataReturnStat(path, data, expectedVersion);
         }
       });
@@ -1394,14 +1385,15 @@ public class ZkClient implements Watcher {
     return writeDataReturnStat(path, datat, expectedVersion);
   }
 
-
   public void asyncCreate(final String path, Object datat, final CreateMode mode,
       final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
     final byte[] data = (datat == null ? null : serialize(datat, path));
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+      @Override
+      public Object call() throws Exception {
+        ((ZkConnection) getConnection()).getZookeeper().create(path, data,
+            ZooDefs.Ids.OPEN_ACL_UNSAFE,
             // Arrays.asList(DEFAULT_ACL),
             mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
                 data == null ? 0 : data.length, false));
@@ -1416,7 +1408,8 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     final byte[] data = serialize(datat, path);
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
+      @Override
+      public Object call() throws Exception {
         ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
                 data == null ? 0 : data.length, false));
@@ -1428,7 +1421,8 @@ public class ZkClient implements Watcher {
   public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
+      @Override
+      public Object call() throws Exception {
         ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
         return null;
@@ -1439,7 +1433,8 @@ public class ZkClient implements Watcher {
   public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
+      @Override
+      public Object call() throws Exception {
         ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
         return null;
@@ -1450,7 +1445,8 @@ public class ZkClient implements Watcher {
   public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
+      @Override
+      public Object call() throws Exception {
         ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
         return null;
@@ -1468,7 +1464,8 @@ public class ZkClient implements Watcher {
 
   public void watchForData(final String path) {
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
+      @Override
+      public Object call() throws Exception {
         getConnection().exists(path, true);
         return null;
       }
@@ -1477,16 +1474,17 @@ public class ZkClient implements Watcher {
 
   /**
    * Installs a child watch for the given path.
-   *
    * @param path
-   * @return the current children of the path or null if the zk node with the given path doesn't exist.
+   * @return the current children of the path or null if the zk node with the given path doesn't
+   *         exist.
    */
   public List<String> watchForChilds(final String path) {
     if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
       throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
     }
     return retryUntilConnected(new Callable<List<String>>() {
-      @Override public List<String> call() throws Exception {
+      @Override
+      public List<String> call() throws Exception {
         exists(path, true);
         try {
           return getChildren(path, true);
@@ -1499,15 +1497,16 @@ public class ZkClient implements Watcher {
   }
 
   /**
-   * Add authentication information to the connection. This will be used to identify the user and check access to
+   * Add authentication information to the connection. This will be used to identify the user and
+   * check access to
    * nodes protected by ACLs
-   *
    * @param scheme
    * @param auth
    */
   public void addAuthInfo(final String scheme, final byte[] auth) {
     retryUntilConnected(new Callable<Object>() {
-      @Override public Object call() throws Exception {
+      @Override
+      public Object call() throws Exception {
         getConnection().addAuthInfo(scheme, auth);
         return null;
       }
@@ -1516,15 +1515,14 @@ public class ZkClient implements Watcher {
 
   /**
    * Connect to ZooKeeper.
-   *
    * @param maxMsToWaitUntilConnected
    * @param watcher
    * @throws ZkInterruptedException
-   *             if the connection timed out due to thread interruption
+   *           if the connection timed out due to thread interruption
    * @throws ZkTimeoutException
-   *             if the connection timed out
+   *           if the connection timed out
    * @throws IllegalStateException
-   *             if the connection timed out due to thread interruption
+   *           if the connection timed out due to thread interruption
    */
   public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
@@ -1591,7 +1589,6 @@ public class ZkClient implements Watcher {
 
   /**
    * Close the client.
-   *
    * @throws ZkInterruptedException
    */
   public void close() throws ZkInterruptedException {
@@ -1662,8 +1659,8 @@ public class ZkClient implements Watcher {
 
   public boolean isConnectionClosed() {
     IZkConnection connection = getConnection();
-    return (connection == null || connection.getZookeeperState() == null ||
-        !connection.getZookeeperState().isAlive());
+    return (connection == null || connection.getZookeeperState() == null
+        || !connection.getZookeeperState().isAlive());
   }
 
   public void setShutdownTrigger(boolean triggerState) {
@@ -1694,7 +1691,8 @@ public class ZkClient implements Watcher {
 
     return retryUntilConnected(new Callable<List<OpResult>>() {
 
-      @Override public List<OpResult> call() throws Exception {
+      @Override
+      public List<OpResult> call() throws Exception {
         return getConnection().multi(ops);
       }
     });
@@ -1719,7 +1717,8 @@ public class ZkClient implements Watcher {
   }
 
   // operations to update monitor's counters
-  private void record(String path, byte[] data, long startTimeMilliSec, ZkClientMonitor.AccessType accessType) {
+  private void record(String path, byte[] data, long startTimeMilliSec,
+      ZkClientMonitor.AccessType accessType) {
     if (_monitor != null) {
       int dataSize = (data != null) ? data.length : 0;
       _monitor.record(path, dataSize, startTimeMilliSec, accessType);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
index 6f1e880..07397ed 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
@@ -103,12 +103,14 @@ public class ZkConnection implements IZkConnection {
   }
 
   @Override
-  public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException {
+  public String create(String path, byte[] data, CreateMode mode)
+      throws KeeperException, InterruptedException {
     return _zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
   }
 
   @Override
-  public String create(String path, byte[] data, List<ACL> acl, CreateMode mode) throws KeeperException, InterruptedException {
+  public String create(String path, byte[] data, List<ACL> acl, CreateMode mode)
+      throws KeeperException, InterruptedException {
     return _zk.create(path, data, acl, mode);
   }
 
@@ -123,12 +125,14 @@ public class ZkConnection implements IZkConnection {
   }
 
   @Override
-  public List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException {
+  public List<String> getChildren(final String path, final boolean watch)
+      throws KeeperException, InterruptedException {
     return _zk.getChildren(path, watch);
   }
 
   @Override
-  public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException {
+  public byte[] readData(String path, Stat stat, boolean watch)
+      throws KeeperException, InterruptedException {
     return _zk.getData(path, watch, stat);
   }
 
@@ -137,12 +141,14 @@ public class ZkConnection implements IZkConnection {
   }
 
   @Override
-  public void writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
+  public void writeData(String path, byte[] data, int version)
+      throws KeeperException, InterruptedException {
     _zk.setData(path, data, version);
   }
 
   @Override
-  public Stat writeDataReturnStat(String path, byte[] data, int version) throws KeeperException, InterruptedException {
+  public Stat writeDataReturnStat(String path, byte[] data, int version)
+      throws KeeperException, InterruptedException {
     return _zk.setData(path, data, version);
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
index a2c3139..32d6894 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
@@ -56,9 +56,8 @@ public class CriteriaEvaluator {
 
   /**
    * Examine persisted data to match wildcards in {@link Criteria}
-   *
    * @param recipientCriteria Criteria specifying the message destinations
-   * @param accessor          connection to the persisted data
+   * @param accessor connection to the persisted data
    * @return map of evaluated criteria
    */
   public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,
@@ -98,9 +97,10 @@ public class CriteriaEvaluator {
     Set<String> liveParticipants = accessor.getChildValuesMap(keyBuilder.liveInstances()).keySet();
     List<ZNRecordRow> result = Lists.newArrayList();
     for (ZNRecordRow row : allRows) {
-      // The participant instance name is stored in the return value of either getRecordId() or getMapSubKey()
-      if (rowMatches(recipientCriteria, row) &&
-          (liveParticipants.contains(row.getRecordId()) || liveParticipants.contains(row.getMapSubKey()))) {
+      // The participant instance name is stored in the return value of either getRecordId() or
+      // getMapSubKey()
+      if (rowMatches(recipientCriteria, row) && (liveParticipants.contains(row.getRecordId())
+          || liveParticipants.contains(row.getMapSubKey()))) {
         result.add(row);
       }
     }
@@ -110,13 +110,13 @@ public class CriteriaEvaluator {
     // deduplicate and convert the matches into the required format
     for (ZNRecordRow row : result) {
       Map<String, String> resultRow = new HashMap<String, String>();
-      resultRow.put("instanceName", !recipientCriteria.getInstanceName().equals("") ?
-          (!Strings.isNullOrEmpty(row.getMapSubKey()) ? row.getMapSubKey() : row.getRecordId()) :
-          "");
-      resultRow.put("resourceName", !recipientCriteria.getResource().equals("") ? row.getRecordId()
-          : "");
-      resultRow.put("partitionName", !recipientCriteria.getPartition().equals("") ? row.getMapKey()
+      resultRow.put("instanceName", !recipientCriteria.getInstanceName().equals("")
+          ? (!Strings.isNullOrEmpty(row.getMapSubKey()) ? row.getMapSubKey() : row.getRecordId())
           : "");
+      resultRow.put("resourceName",
+          !recipientCriteria.getResource().equals("") ? row.getRecordId() : "");
+      resultRow.put("partitionName",
+          !recipientCriteria.getPartition().equals("") ? row.getMapKey() : "");
       resultRow.put("partitionState",
           !recipientCriteria.getPartitionState().equals("") ? row.getMapValue() : "");
       selected.add(resultRow);
@@ -136,8 +136,8 @@ public class CriteriaEvaluator {
     String resourceName = normalizePattern(criteria.getResource());
     String partitionName = normalizePattern(criteria.getPartition());
     String partitionState = normalizePattern(criteria.getPartitionState());
-    return (stringMatches(instanceName, Strings.nullToEmpty(row.getMapSubKey())) ||
-            stringMatches(instanceName, Strings.nullToEmpty(row.getRecordId())))
+    return (stringMatches(instanceName, Strings.nullToEmpty(row.getMapSubKey()))
+        || stringMatches(instanceName, Strings.nullToEmpty(row.getRecordId())))
         && stringMatches(resourceName, Strings.nullToEmpty(row.getRecordId()))
         && stringMatches(partitionName, Strings.nullToEmpty(row.getMapKey()))
         && stringMatches(partitionState, Strings.nullToEmpty(row.getMapValue()));
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 3b4fa01..769969a 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -92,9 +92,8 @@ public class HelixStateTransitionHandler extends MessageHandler {
 
   void preHandleMessage() throws Exception {
     if (!_message.isValid()) {
-      String errorMessage =
-          "Invalid Message, ensure that message: " + _message + " has all the required fields: "
-              + Arrays.toString(Message.Attributes.values());
+      String errorMessage = "Invalid Message, ensure that message: " + _message
+          + " has all the required fields: " + Arrays.toString(Message.Attributes.values());
 
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
           _manager);
@@ -102,11 +101,10 @@ public class HelixStateTransitionHandler extends MessageHandler {
       throw new HelixException(errorMessage);
     }
 
-    logger.info(
-        "handling message: " + _message.getMsgId() + " transit " + _message.getResourceName()
-            + "." + _message.getPartitionName() + "|" + _message.getPartitionNames() + " from:"
-            + _message.getFromState() + " to:" + _message.getToState() + ", relayedFrom: "
-            + _message.getRelaySrcHost());
+    logger.info("handling message: " + _message.getMsgId() + " transit "
+        + _message.getResourceName() + "." + _message.getPartitionName() + "|"
+        + _message.getPartitionNames() + " from:" + _message.getFromState() + " to:"
+        + _message.getToState() + ", relayedFrom: " + _message.getRelaySrcHost());
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 
@@ -118,8 +116,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
     // getting current state from state model will provide most up-to-date
     // current state. In case current state is null, partition is in initial
     // state and we are setting it in current state
-    String state = _stateModel.getCurrentState() != null
-        ? _stateModel.getCurrentState()
+    String state = _stateModel.getCurrentState() != null ? _stateModel.getCurrentState()
         : _currentStateDelta.getState(partitionName);
 
     // Set start time right before invoke client logic
@@ -130,34 +127,29 @@ public class HelixStateTransitionHandler extends MessageHandler {
       // To state equals current state, we can just ignore the message
       err = new HelixDuplicatedStateTransitionException(
           String.format("Partition %s current state is same as toState (%s->%s) from message.",
-              partitionName, fromState, toState)
-      );
+              partitionName, fromState, toState));
     } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
       // If current state is neither toState nor fromState in message, there is a problem
-      err = new HelixStateMismatchException(
-          String.format(
-              "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
-              state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName())
-      );
+      err = new HelixStateMismatchException(String.format(
+          "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
+          state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
     }
 
     if (err != null) {
-      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(), _manager);
+      _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(),
+          _manager);
       logger.error(err.getMessage());
       throw err;
     }
 
     // Reset the REQUESTED_STATE property if it exists.
-    try
-    {
+    try {
       String instance = _manager.getInstanceName();
       String sessionId = _message.getTgtSessionId();
       String resource = _message.getResourceName();
       ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
-      PropertyKey key = accessor.keyBuilder().currentState(instance,
-                                                           sessionId,
-                                                           resource,
-                                                           bucketizer.getBucketName(partitionName));
+      PropertyKey key = accessor.keyBuilder().currentState(instance, sessionId, resource,
+          bucketizer.getBucketName(partitionName));
       ZNRecord rec = new ZNRecord(resource);
       Map<String, String> map = new TreeMap<String, String>();
       map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
@@ -170,22 +162,19 @@ public class HelixStateTransitionHandler extends MessageHandler {
 
       // Update the ZK current state of the node
       if (!accessor.updateProperty(key, currStateUpdate)) {
-        logger.error(
-            "Fails to persist current state back to ZK for resource " + resource + " partition: "
-                + partitionName);
+        logger.error("Fails to persist current state back to ZK for resource " + resource
+            + " partition: " + partitionName);
       }
-    }
-    catch (Exception e)
-    {
-      logger.error("Error when removing " +
-                       CurrentState.CurrentStateProperty.REQUESTED_STATE.name() +  " from current state.", e);
-      StateTransitionError error = new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
+    } catch (Exception e) {
+      logger.error("Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name()
+          + " from current state.", e);
+      StateTransitionError error =
+          new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
       _stateModel.rollbackOnError(_message, _notificationContext, error);
-      _statusUpdateUtil.logError(_message,
-                                 HelixStateTransitionHandler.class,
-                                 e,
-                                 "Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() +  " from current state.",
-                                 _manager);
+      _statusUpdateUtil.logError(
+          _message, HelixStateTransitionHandler.class, e, "Error when removing "
+              + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() + " from current state.",
+          _manager);
     }
   }
 
@@ -267,8 +256,8 @@ public class HelixStateTransitionHandler extends MessageHandler {
           } else {
             // State transition interrupted but not caused by timeout. Keep the current
             // state in this case
-            logger
-                .error("State transition interrupted but not timeout. Not updating state. Partition : "
+            logger.error(
+                "State transition interrupted but not timeout. Not updating state. Partition : "
                     + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
             return;
           }
@@ -286,15 +275,13 @@ public class HelixStateTransitionHandler extends MessageHandler {
 
     try {
       // Update the ZK current state of the node
-      PropertyKey key =
-          keyBuilder.currentState(instanceName, sessionId, resource,
-              bucketizer.getBucketName(partitionKey));
+      PropertyKey key = keyBuilder.currentState(instanceName, sessionId, resource,
+          bucketizer.getBucketName(partitionKey));
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
         // normal message
         if (!accessor.updateProperty(key, _currentStateDelta)) {
-          throw new HelixException(
-              "Fails to persist current state back to ZK for resource " + resource + " partition: "
-                  + _message.getPartitionName());
+          throw new HelixException("Fails to persist current state back to ZK for resource "
+              + resource + " partition: " + _message.getPartitionName());
         }
       } else {
         // sub-message of a batch message
@@ -361,17 +348,16 @@ public class HelixStateTransitionHandler extends MessageHandler {
           e = (InterruptedException) e.getCause();
         }
 
-        if (e instanceof HelixRollbackException || (e.getCause() != null
-            && e.getCause() instanceof HelixRollbackException)) {
+        if (e instanceof HelixRollbackException
+            || (e.getCause() != null && e.getCause() instanceof HelixRollbackException)) {
           // TODO : Support cancel to any state
-          logger.info(
-              "Rollback happened of state transition on resource \"" + _message.getResourceName()
-                  + "\" partition \"" + _message.getPartitionName() + "\" from \"" + _message
-                  .getFromState() + "\" to \"" + _message.getToState() + "\"");
+          logger.info("Rollback happened of state transition on resource \""
+              + _message.getResourceName() + "\" partition \"" + _message.getPartitionName()
+              + "\" from \"" + _message.getFromState() + "\" to \"" + _message.getToState() + "\"");
           taskResult.setCancelled(true);
         } else {
-          _statusUpdateUtil
-              .logError(message, HelixStateTransitionHandler.class, e, errorMessage, manager);
+          _statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e, errorMessage,
+              manager);
           taskResult.setSuccess(false);
           taskResult.setMessage(e.toString());
           taskResult.setException(e);
@@ -388,9 +374,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
     }
   }
 
-  private void invoke(HelixManager manager, NotificationContext context,
-      HelixTaskResult taskResult, Message message) throws IllegalAccessException,
-      InvocationTargetException, InterruptedException, HelixRollbackException {
+  private void invoke(HelixManager manager, NotificationContext context, HelixTaskResult taskResult,
+      Message message) throws IllegalAccessException, InvocationTargetException,
+      InterruptedException, HelixRollbackException {
     _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
         "Message handling invoking", manager);
 
@@ -398,20 +384,15 @@ public class HelixStateTransitionHandler extends MessageHandler {
     Method methodToInvoke = null;
     String fromState = message.getFromState();
     String toState = message.getToState();
-    methodToInvoke =
-        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
-                                                       fromState,
-                                                       toState,
-                                                       new Class[] { Message.class,
-                                                           NotificationContext.class });
+    methodToInvoke = _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
+        fromState, toState, new Class[] {
+            Message.class, NotificationContext.class
+        });
     if (methodToInvoke != null) {
-      logger.info(String.format("Instance %s, partition %s received state transition from %s to %s on session %s, message id: %s",
-                                message.getTgtName(),
-                                message.getPartitionName(),
-                                message.getFromState(),
-                                message.getToState(),
-                                message.getTgtSessionId(),
-                                message.getMsgId()));
+      logger.info(String.format(
+          "Instance %s, partition %s received state transition from %s to %s on session %s, message id: %s",
+          message.getTgtName(), message.getPartitionName(), message.getFromState(),
+          message.getToState(), message.getTgtSessionId(), message.getMsgId()));
 
       if (_cancelled) {
         throw new HelixRollbackException(String.format(
@@ -420,14 +401,9 @@ public class HelixStateTransitionHandler extends MessageHandler {
             message.getToState(), message.getTgtSessionId(), message.getMsgId()));
       }
 
-      if (_cancelled) {
-        throw new HelixRollbackException(String.format(
-            "Instance %s, partition %s state transition from %s to %s on session %s has been cancelled",
-            message.getTgtName(), message.getPartitionName(), message.getFromState(),
-            message.getToState(), message.getTgtSessionId()));
-      }
-
-      Object result = methodToInvoke.invoke(_stateModel, new Object[] { message, context });
+      Object result = methodToInvoke.invoke(_stateModel, new Object[] {
+          message, context
+      });
       taskResult.setSuccess(true);
       String resultStr;
       if (result == null || result instanceof Void) {
@@ -437,14 +413,12 @@ public class HelixStateTransitionHandler extends MessageHandler {
       }
       taskResult.setInfo(resultStr);
     } else {
-      String errorMessage =
-          "Unable to find method for transition from " + fromState + " to " + toState + " in "
-              + _stateModel.getClass();
+      String errorMessage = "Unable to find method for transition from " + fromState + " to "
+          + toState + " in " + _stateModel.getClass();
       logger.error(errorMessage);
       taskResult.setSuccess(false);
       taskResult.setInfo(errorMessage);
-      _statusUpdateUtil
-          .logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
+      _statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
     }
   }
 
@@ -479,7 +453,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
             keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName),
             currentStateDelta)) {
           logger.error("Fails to persist ERROR current state to ZK for resource " + resourceName
-                  + " partition: " + partition);
+              + " partition: " + partition);
         }
       }
     } finally {
@@ -493,4 +467,4 @@ public class HelixStateTransitionHandler extends MessageHandler {
   public void onTimeout() {
     _isTimeout = true;
   }
-};
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 713b18a..5efecc9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -69,8 +69,8 @@ public class ClusterConfig extends HelixProperty {
 
     TARGET_EXTERNALVIEW_ENABLED,
     @Deprecated // ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE will take
-        // precedence if it is set
-        ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state
+    // precedence if it is set
+    ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state
     // transition if the number of partitons that need
     // recovery exceeds this limitation
     ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance
@@ -78,10 +78,6 @@ public class ClusterConfig extends HelixProperty {
     // partitons that need recovery or in
     // error exceeds this limitation
     DISABLED_INSTANCES,
-    VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
-    VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is
-    // ViewClusterSourceConfig JSON string
-    VIEW_CLUSTER_REFRESH_PERIOD, // In second
 
     // Specifies job types and used for quota allocation
     QUOTA_TYPES
@@ -109,26 +105,12 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Instantiate with a pre-populated record
-   *
    * @param record a ZNRecord corresponding to a cluster configuration
    */
   public ClusterConfig(ZNRecord record) {
     super(record);
   }
 
-  public void setViewCluster() {
-    _record.setBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), true);
-  }
-
-  /**
-   * Whether this cluster is a ViewCluster
-   * @return
-   */
-  public boolean isViewCluster() {
-    return _record
-        .getBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), false);
-  }
-
   /**
    * Set task quota type with the ratio of this quota.
    * @param quotaType String
@@ -138,8 +120,8 @@ public class ClusterConfig extends HelixProperty {
     if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
       _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>());
     }
-    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name())
-        .put(quotaType, Integer.toString(quotaRatio));
+    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType,
+        Integer.toString(quotaRatio));
   }
 
   /**
@@ -152,8 +134,7 @@ public class ClusterConfig extends HelixProperty {
     if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
       _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>());
     }
-    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name())
-        .put(quotaType, quotaRatio);
+    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType, quotaRatio);
   }
 
   /**
@@ -182,7 +163,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get all task quota and their ratios
-   *
    * @return a task quota -> quota ratio mapping
    */
   public Map<String, String> getTaskQuotaRatioMap() {
@@ -199,22 +179,7 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
-   * Set view cluster max refresh period
-   * @param refreshPeriod refresh period in second
-   */
-  public void setViewClusterRefreshPeriod(int refreshPeriod) {
-    _record.setIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
-        refreshPeriod);
-  }
-
-  public int getViewClusterRefershPeriod() {
-    return _record.getIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
-        DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD);
-  }
-
-  /**
    * Whether to persist best possible assignment in a resource's idealstate.
-   *
    * @return
    */
   public Boolean isPersistBestPossibleAssignment() {
@@ -224,54 +189,58 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Enable/Disable persist best possible assignment in a resource's idealstate.
-   * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and {@link #setPersistIntermediateAssignment(Boolean)}
+   * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and
+   * {@link #setPersistIntermediateAssignment(Boolean)}
    * are set to true, the IntermediateAssignment will be persisted into IdealState's map field.
    * By default, it is DISABLED if not set.
    * @return
    */
   public void setPersistBestPossibleAssignment(Boolean enable) {
     if (enable == null) {
-      _record.getSimpleFields().remove(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString());
+      _record.getSimpleFields()
+          .remove(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString());
     } else {
-      _record.setBooleanField(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString(), enable);
+      _record.setBooleanField(ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.toString(),
+          enable);
     }
   }
 
   /**
    * Whether to persist IntermediateAssignment in a resource's idealstate.
-   *
    * @return
    */
   public Boolean isPersistIntermediateAssignment() {
-    return _record
-        .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false);
+    return _record.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(),
+        false);
   }
 
   /**
    * Enable/Disable persist IntermediateAssignment in a resource's idealstate.
-   * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and {@link #setPersistIntermediateAssignment(Boolean)}
+   * CAUTION: if both {@link #setPersistBestPossibleAssignment(Boolean)} and
+   * {@link #setPersistIntermediateAssignment(Boolean)}
    * are set to true, the IntermediateAssignment will be persisted into IdealState's map field.
    * By default, it is DISABLED if not set.
    * @return
    */
   public void setPersistIntermediateAssignment(Boolean enable) {
     if (enable == null) {
-      _record.getSimpleFields().remove(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString());
+      _record.getSimpleFields()
+          .remove(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString());
     } else {
-      _record.setBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), enable);
+      _record.setBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(),
+          enable);
     }
   }
 
   public Boolean isPipelineTriggersDisabled() {
-    return _record
-        .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
+    return _record.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(),
+        false);
   }
 
   /**
    * Enable/disable topology aware rebalacning. If enabled, both {@link #setTopology(String)} and
    * {@link #setFaultZoneType(String)} should be set.
    * By default, this is DISABLED if not set.
-   *
    * @param enabled
    */
   public void setTopologyAwareEnabled(boolean enabled) {
@@ -297,7 +266,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get cluster topology.
-   *
    * @return
    */
   public String getTopology() {
@@ -314,7 +282,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get cluster fault zone type.
-   *
    * @return
    */
   public String getFaultZoneType() {
@@ -324,7 +291,6 @@ public class ClusterConfig extends HelixProperty {
   /**
    * Set the delayed rebalance time, this applies only when {@link #isDelayRebalaceEnabled()} is
    * true.
-   *
    * @param milliseconds
    */
   public void setRebalanceDelayTime(long milliseconds) {
@@ -338,7 +304,6 @@ public class ClusterConfig extends HelixProperty {
   /**
    * Disable/enable delay rebalance.
    * By default, this is ENABLED if not set.
-   *
    * @param enabled
    */
   public void setDelayRebalaceEnabled(boolean enabled) {
@@ -347,7 +312,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Whether Delay rebalance is enabled for this cluster.
-   *
    * @return
    */
   public boolean isDelayRebalaceEnabled() {
@@ -377,7 +341,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Set the maximum number of partitions that an instance can serve in this cluster.
-   *
    * @param maxPartitionsPerInstance the maximum number of partitions supported
    */
   public void setMaxPartitionsPerInstance(int maxPartitionsPerInstance) {
@@ -387,7 +350,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get the maximum number of partitions an instance can serve in this cluster.
-   *
    * @return the partition capacity of an instance for this resource, or Integer.MAX_VALUE
    */
   public int getMaxPartitionsPerInstance() {
@@ -395,9 +357,9 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
-   * Set the max offline instances allowed for the cluster. If number of pff-line or disabled instances
-   *  in the cluster reach this limit, Helix will pause the cluster.
-   *
+   * Set the max offline instances allowed for the cluster. If number of pff-line or disabled
+   * instances
+   * in the cluster reach this limit, Helix will pause the cluster.
    * @param maxOfflineInstancesAllowed
    */
   public void setMaxOfflineInstancesAllowed(int maxOfflineInstancesAllowed) {
@@ -407,7 +369,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get the max offline instances allowed for the cluster.
-   *
    * @return
    */
   public int getMaxOfflineInstancesAllowed() {
@@ -492,7 +453,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get a list StateTransitionThrottleConfig set for this cluster.
-   *
    * @return
    */
   public List<StateTransitionThrottleConfig> getStateTransitionThrottleConfigs() {
@@ -516,7 +476,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Set StateTransitionThrottleConfig for this cluster.
-   *
    * @param throttleConfigs
    */
   public void setStateTransitionThrottleConfigs(
@@ -531,19 +490,17 @@ public class ClusterConfig extends HelixProperty {
     }
 
     if (!configStrs.isEmpty()) {
-      _record
-          .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs);
+      _record.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(),
+          configStrs);
     }
   }
 
   /**
    * Set the missing top state duration threshold
-   *
    * If top-state hand off duration is greater than this threshold, Helix will count that handoff
    * as failed and report it with missingtopstate metrics. If this thresold is not set,
    * Long.MAX_VALUE will be used as the default value, which means no top-state hand-off will be
    * treated as failure.
-   *
    * @param durationThreshold
    */
   public void setMissTopStateDurationThreshold(long durationThreshold) {
@@ -637,7 +594,8 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
-   * Get the threshold for the number of partitions needing recovery or in error. Default value is set at
+   * Get the threshold for the number of partitions needing recovery or in error. Default value is
+   * set at
    * Integer.MAX_VALUE to allow recovery rebalance and load rebalance to happen in the same pipeline
    * cycle. If the number of partitions needing recovery is greater than this threshold, recovery
    * balance will take precedence and load balance will not happen during this cycle.
@@ -650,14 +608,16 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
-   * Set the threshold for the number of partitions needing recovery or in error. Default value is set at
+   * Set the threshold for the number of partitions needing recovery or in error. Default value is
+   * set at
    * Integer.MAX_VALUE to allow recovery rebalance and load rebalance to happen in the same pipeline
    * cycle. If the number of partitions needing recovery is greater than this threshold, recovery
    * balance will take precedence and load balance will not happen during this cycle.
    * @param recoveryPartitionThreshold
    */
   public void setErrorOrRecoveryPartitionThresholdForLoadBalance(int recoveryPartitionThreshold) {
-    _record.setIntField(ClusterConfigProperty.ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
+    _record.setIntField(
+        ClusterConfigProperty.ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
         recoveryPartitionThreshold);
   }
 
@@ -680,7 +640,6 @@ public class ClusterConfig extends HelixProperty {
   /**
    * Whether the P2P state transition message is enabled for all resources in this cluster. By
    * default it is disabled if not set.
-   *
    * @return
    */
   public boolean isP2PMessageEnabled() {
@@ -692,7 +651,6 @@ public class ClusterConfig extends HelixProperty {
    * message can reduce the top-state replica unavailable time during top-state handoff period. This
    * only applies for those resources with state models that only have a single top-state replica,
    * such as MasterSlave or LeaderStandy models. By default P2P message is disabled if not set.
-   *
    * @param enabled
    */
   public void enableP2PMessage(boolean enabled) {
@@ -701,7 +659,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get IdealState rules defined in the cluster config.
-   *
    * @return
    */
   public Map<String, Map<String, String>> getIdealStateRules() {
@@ -731,10 +688,9 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get the name of this resource
-   *
    * @return the instance name
    */
   public String getClusterName() {
     return _record.getId();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 607e967..348aefd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -110,9 +110,10 @@ public class Message extends HelixProperty {
   }
 
   // default expiry time period for a relay message.
-  public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000;  //5 second
+  public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000; // 5 second
 
-  // This field is not persisted in zk/znode, i.e, the value will only be changed in local cached copy of the message.
+  // This field is not persisted in zk/znode, i.e, the value will only be changed in local cached
+  // copy of the message.
   // Currently, the field is only used for invalidating messages in controller's message cache.
   private boolean _expired = false;
 
@@ -318,7 +319,7 @@ public class Message extends HelixProperty {
    * @param msgState {@link MessageState}
    */
   public void setMsgState(MessageState msgState) { // HACK: The "tolowerCase()" call is to make the
-                                                   // change backward compatible
+    // change backward compatible
     _record.setSimpleField(Attributes.MSG_STATE.toString(), msgState.toString().toLowerCase());
   }
 
@@ -433,7 +434,6 @@ public class Message extends HelixProperty {
 
   /**
    * Set the resource group associated with this message
-   *
    * @param resourceGroupName resource group name to set
    */
   public void setResourceGroupName(String resourceGroupName) {
@@ -442,7 +442,6 @@ public class Message extends HelixProperty {
 
   /**
    * Get the resource group name associated with this message
-   *
    * @return resource group name
    */
   public String getResourceGroupName() {
@@ -451,7 +450,6 @@ public class Message extends HelixProperty {
 
   /**
    * Set the resource tag associated with this message
-   *
    * @param resourceTag resource tag to set
    */
   public void setResourceTag(String resourceTag) {
@@ -460,7 +458,6 @@ public class Message extends HelixProperty {
 
   /**
    * Get the resource tag associated with this message
-   *
    * @return resource tag
    */
   public String getResourceTag() {
@@ -652,8 +649,8 @@ public class Message extends HelixProperty {
   public static Message createReplyMessage(Message srcMessage, String instanceName,
       Map<String, String> taskResultMap) {
     if (srcMessage.getCorrelationId() == null) {
-      throw new HelixException("Message " + srcMessage.getMsgId()
-          + " does not contain correlation id");
+      throw new HelixException(
+          "Message " + srcMessage.getMsgId() + " does not contain correlation id");
     }
     Message replyMessage = new Message(MessageType.TASK_REPLY, UUID.randomUUID().toString());
     replyMessage.setCorrelationId(srcMessage.getCorrelationId());
@@ -700,8 +697,8 @@ public class Message extends HelixProperty {
   /**
    * Get the completion time of previous task associated with this message.
    * This applies only when this is a relay message,
-   * which specified the completion time of the task running on the participant that sent this relay message.
-   *
+   * which specified the completion time of the task running on the participant that sent this relay
+   * message.
    * @return
    */
   public long getRelayTime() {
@@ -711,8 +708,8 @@ public class Message extends HelixProperty {
   /**
    * Set the completion time of previous task associated with this message.
    * This applies only when this is a relay message,
-   * which specified the completion time of the task running on the participant that sent this relay message.
-   *
+   * which specified the completion time of the task running on the participant that sent this relay
+   * message.
    * @param completionTime
    */
   public void setRelayTime(long completionTime) {
@@ -721,10 +718,8 @@ public class Message extends HelixProperty {
 
   /**
    * Attach a relayed message and its destination participant to this message.
-   *
    * WARNNING: only content in SimpleFields of relayed message will be carried over and sent,
    * all contents in either ListFields or MapFields will be ignored.
-   *
    * @param instance destination participant name
    * @param message relayed message.
    */
@@ -739,8 +734,7 @@ public class Message extends HelixProperty {
     messageInfo.put(Attributes.RELAY_MSG_ID.name(), message.getId());
     messageInfo.put(Attributes.MSG_SUBTYPE.name(), MessageType.RELAYED_MESSAGE.name());
     messageInfo.put(Attributes.RELAY_FROM.name(), getTgtName());
-    messageInfo
-        .put(Attributes.EXPIRY_PERIOD.name(), String.valueOf(RELAY_MESSAGE_DEFAULT_EXPIRY));
+    messageInfo.put(Attributes.EXPIRY_PERIOD.name(), String.valueOf(RELAY_MESSAGE_DEFAULT_EXPIRY));
     _record.setMapField(instance, messageInfo);
     _record.setListField(Attributes.RELAY_PARTICIPANTS.name(),
         Lists.newArrayList(relayParticipants));
@@ -748,7 +742,6 @@ public class Message extends HelixProperty {
 
   /**
    * Get relay message attached for the given instance.
-   *
    * @param instance
    * @return null if no message for the instance
    */
@@ -776,7 +769,6 @@ public class Message extends HelixProperty {
 
   /**
    * Get all relay messages attached to this message as a map (instance->message).
-   *
    * @return map of instanceName->message, empty map if none.
    */
   public Map<String, Message> getRelayMessages() {
@@ -796,7 +788,6 @@ public class Message extends HelixProperty {
 
   /**
    * Whether there are any relay message attached to this message.
-   *
    * @return
    */
   public boolean hasRelayMessages() {
@@ -816,12 +807,10 @@ public class Message extends HelixProperty {
 
   /**
    * Whether a message is expired.
-   *
    * A message is expired if:
-   *   1) creationTime + expiryPeriod > current time
-   *   or
-   *   2) relayTime + expiryPeriod > current time iff it is relay message.
-   *
+   * 1) creationTime + expiryPeriod > current time
+   * or
+   * 2) relayTime + expiryPeriod > current time iff it is relay message.
    * @return
    */
   public boolean isExpired() {
@@ -846,7 +835,6 @@ public class Message extends HelixProperty {
 
   /**
    * Set a message to expired.
-   *
    * !! CAUTION: The expired field is not persisted into ZNODE,
    * i.e, set this field will only change its value in its local cache version,
    * not the one on ZK, even ZkClient.Set(Message) is called to persist it into ZK.
@@ -859,7 +847,6 @@ public class Message extends HelixProperty {
 
   /**
    * Get the expiry period (in milliseconds)
-   *
    * @return
    */
   public long getExpiryPeriod() {
@@ -871,7 +858,6 @@ public class Message extends HelixProperty {
    * A message will be expired after this period of time from either its 1) creationTime or 2)
    * relayTime if it is relay message.
    * Default is -1 if it is not set.
-   *
    * @param expiry
    */
   public void setExpiryPeriod(long expiry) {
@@ -880,7 +866,8 @@ public class Message extends HelixProperty {
 
   /**
    * Get the source cluster name
-   * @return the source cluster from where the message was sent or null if the message was sent locally
+   * @return the source cluster from where the message was sent or null if the message was sent
+   *         locally
    */
   public String getSrcClusterName() {
     return _record.getStringField(Attributes.SRC_CLUSTER.name(), null);
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 1d778ee..5e8c17a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -55,7 +55,6 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusMonitor.class);
 
@@ -99,8 +98,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   /**
    * PerInstanceResource monitor map: beanName->monitor
    */
-  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>
-      _perInstanceResourceMonitorMap = new ConcurrentHashMap<>();
+  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMonitorMap =
+      new ConcurrentHashMap<>();
 
   private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap = new ConcurrentHashMap<>();
 
@@ -250,7 +249,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       for (String instanceName : toRegister) {
         InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
         bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName));
+            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
+            !disabledInstanceSet.contains(instanceName));
         monitorsToRegister.add(bean);
       }
       try {
@@ -273,7 +273,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
           InstanceMonitor bean = _instanceMonitorMap.get(instanceName);
           String oldSensorName = bean.getSensorName();
           bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-              oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName));
+              oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
+              !disabledInstanceSet.contains(instanceName));
 
           // If the sensor name changed, re-register the bean so that listeners won't miss it
           String newSensorName = bean.getSensorName();
@@ -292,7 +293,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
   /**
    * Update the duration of handling a cluster event in a certain phase.
-   *
    * @param phase
    * @param duration
    */
@@ -376,8 +376,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       Map<String, StateModelDefinition> stateModelDefMap) {
 
     // Convert to perInstanceResource beanName->partition->state
-    Map<PerInstanceResourceMonitor.BeanName, Map<Partition, String>> beanMap =
-        new HashMap<>();
+    Map<PerInstanceResourceMonitor.BeanName, Map<Partition, String>> beanMap = new HashMap<>();
     Set<String> resourceSet = new HashSet<>(bestPossibleStates.resourceSet());
     for (String resource : resourceSet) {
       Map<Partition, Map<String, String>> partitionStateMap =
@@ -386,7 +385,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
         Map<String, String> instanceStateMap = partitionStateMap.get(partition);
         for (String instance : instanceStateMap.keySet()) {
           String state = instanceStateMap.get(instance);
-          PerInstanceResourceMonitor.BeanName beanName = new PerInstanceResourceMonitor.BeanName(instance, resource);
+          PerInstanceResourceMonitor.BeanName beanName =
+              new PerInstanceResourceMonitor.BeanName(instance, resource);
           if (!beanMap.containsKey(beanName)) {
             beanMap.put(beanName, new HashMap<Partition, String>());
           }
@@ -396,8 +396,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
     synchronized (_perInstanceResourceMonitorMap) {
       // Unregister beans for per-instance resources that no longer exist
-      Set<PerInstanceResourceMonitor.BeanName> toUnregister = Sets.newHashSet(
-          _perInstanceResourceMonitorMap.keySet());
+      Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+          Sets.newHashSet(_perInstanceResourceMonitorMap.keySet());
       toUnregister.removeAll(beanMap.keySet());
       try {
         unregisterPerInstanceResources(toUnregister);
@@ -409,11 +409,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       toRegister.removeAll(_perInstanceResourceMonitorMap.keySet());
       Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
       for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
-        PerInstanceResourceMonitor bean =
-            new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(), beanName.resourceName());
+        PerInstanceResourceMonitor bean = new PerInstanceResourceMonitor(_clusterName,
+            beanName.instanceName(), beanName.resourceName());
         String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
         InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
-        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()), stateModelDefMap.get(stateModelDefName));
+        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+            stateModelDefMap.get(stateModelDefName));
         monitorsToRegister.add(bean);
       }
       try {
@@ -426,7 +427,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
         PerInstanceResourceMonitor bean = _perInstanceResourceMonitorMap.get(beanName);
         String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
         InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
-        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()), stateModelDefMap.get(stateModelDefName));
+        bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+            stateModelDefMap.get(stateModelDefName));
       }
     }
   }
@@ -472,8 +474,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void updateMissingTopStateDurationStats(String resourceName,
-      long totalDuration, long helixLatency, boolean isGraceful, boolean succeeded) {
+  public void updateMissingTopStateDurationStats(String resourceName, long totalDuration,
+      long helixLatency, boolean isGraceful, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
@@ -567,6 +569,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
     }
   }
+
   public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to) {
     updateWorkflowCounters(workflowConfig, to, -1L);
   }
@@ -708,7 +711,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  private void unregisterInstances(Collection<String> instances) throws MalformedObjectNameException {
+  private void unregisterInstances(Collection<String> instances)
+      throws MalformedObjectNameException {
     synchronized (_instanceMonitorMap) {
       for (String instanceName : instances) {
         String beanName = getInstanceBeanName(instanceName);
@@ -770,30 +774,31 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  private void unregisterAllPerInstanceResources()
-      throws MalformedObjectNameException {
+  private void unregisterAllPerInstanceResources() throws MalformedObjectNameException {
     synchronized (_perInstanceResourceMonitorMap) {
       unregisterPerInstanceResources(_perInstanceResourceMonitorMap.keySet());
     }
   }
 
-  private void unregisterPerInstanceResources(Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+  private void unregisterPerInstanceResources(
+      Collection<PerInstanceResourceMonitor.BeanName> beanNames)
       throws MalformedObjectNameException {
     synchronized (_perInstanceResourceMonitorMap) {
       for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
-        unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
+        unregister(getObjectName(
+            getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
       }
       _perInstanceResourceMonitorMap.keySet().removeAll(beanNames);
     }
   }
 
-  private void registerWorkflow(WorkflowMonitor workflowMonitor) throws MalformedObjectNameException {
+  private void registerWorkflow(WorkflowMonitor workflowMonitor)
+      throws MalformedObjectNameException {
     String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType());
     register(workflowMonitor, getObjectName(workflowBeanName));
   }
 
-  private void unregisterAllWorkflowsMonitor()
-      throws MalformedObjectNameException {
+  private void unregisterAllWorkflowsMonitor() throws MalformedObjectNameException {
     synchronized (_perTypeWorkflowMonitorMap) {
       Iterator<Map.Entry<String, WorkflowMonitor>> workflowIter =
           _perTypeWorkflowMonitorMap.entrySet().iterator();
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 59adf67..6277c49 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -62,11 +62,10 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
 
   /**
    * Instantiates a new Dynamic MBean provider.
-   *
    * @param dynamicMetrics Dynamic Metrics that are exposed by this provider
-   * @param description    the MBean description
-   * @param domain         the MBean domain name
-   * @param keyValuePairs  the MBean object name components
+   * @param description the MBean description
+   * @param domain the MBean domain name
+   * @param keyValuePairs the MBean object name components
    */
   protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description, String domain, String... keyValuePairs) throws JMException {
@@ -76,10 +75,9 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
 
   /**
    * Instantiates a new Dynamic MBean provider.
-   *
    * @param dynamicMetrics Dynamic Metrics that are exposed by this provider
-   * @param description    the MBean description
-   * @param objectName     the proposed MBean ObjectName
+   * @param description the MBean description
+   * @param objectName the proposed MBean ObjectName
    */
   protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description, ObjectName objectName) throws JMException {
@@ -100,8 +98,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
 
   /**
    * Update the Dynamic MBean provider with new metric list.
-   *
-   * @param description    description of the MBean
+   * @param description description of the MBean
    * @param dynamicMetrics the DynamicMetrics
    */
   private void updateAttributtInfos(Collection<DynamicMetric<?, ?>> dynamicMetrics,
@@ -139,8 +136,9 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
     }
 
     _mBeanInfo = new MBeanInfo(getClass().getName(), description, attributeInfos,
-        new MBeanConstructorInfo[] { constructorInfo }, new MBeanOperationInfo[0],
-        new MBeanNotificationInfo[0]);
+        new MBeanConstructorInfo[] {
+            constructorInfo
+        }, new MBeanOperationInfo[0], new MBeanNotificationInfo[0]);
   }
 
   /**
@@ -191,9 +189,8 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   }
 
   @Override
-  public void setAttribute(Attribute attribute)
-      throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException,
-             ReflectionException {
+  public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
+      InvalidAttributeValueException, MBeanException, ReflectionException {
     // All MBeans are readonly
     return;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index 48c1fc3..c6a28f5 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -53,14 +53,15 @@ class RoutingTable {
   private final Collection<ExternalView> _externalViews;
 
   public RoutingTable() {
-    this(Collections.<ExternalView>emptyList(), Collections.<InstanceConfig>emptyList(),
-        Collections.<LiveInstance>emptyList());
+    this(Collections.<ExternalView> emptyList(), Collections.<InstanceConfig> emptyList(),
+        Collections.<LiveInstance> emptyList());
   }
 
   public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
-    // TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to refresh according to the currentStateMap. - jjwang
-    this(Collections.<ExternalView>emptyList(), instanceConfigs, liveInstances);
+    // TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to
+    // refresh according to the currentStateMap. - jjwang
+    this(Collections.<ExternalView> emptyList(), instanceConfigs, liveInstances);
     refresh(currentStateMap);
   }
 
@@ -125,8 +126,8 @@ class RoutingTable {
         }
 
         Map<String, CurrentState> currentStates = Collections.emptyMap();
-        if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName)
-            .containsKey(sessionId)) {
+        if (currentStateMap.containsKey(instanceName)
+            && currentStateMap.get(instanceName).containsKey(sessionId)) {
           currentStates = currentStateMap.get(instanceName).get(sessionId);
         }
 
@@ -195,10 +196,8 @@ class RoutingTable {
 
   /**
    * returns all instances for all resources in {resource group} that are in a specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
@@ -216,10 +215,8 @@ class RoutingTable {
   /**
    * returns all instances for resources contains any given tags in {resource group} that are in a
    * specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
@@ -268,14 +265,11 @@ class RoutingTable {
   /**
    * returns the instances for {resource group,partition} pair in all resources belongs to the given
    * resource group that are in a specific {state}.
-   *
    * The return results aggregate all partition states from all the resources in the given resource
    * group.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
@@ -320,19 +314,16 @@ class RoutingTable {
   /**
    * returns the instances for {resource group,partition} pair contains any of the given tags
    * that are in a specific {state}.
-   *
    * Find all resources belongs to the given resource group that have any of the given resource tags
    * and return the aggregated partition states from all these resources.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
    * @param resourceTags
-   *
    * @return empty list if there is no instance in a given state
    */
-  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName,
-      String state, List<String> resourceTags) {
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+      String partitionName, String state, List<String> resourceTags) {
     ResourceGroupInfo resourceGroupInfo = getResourceGroup(resourceGroupName);
     List<InstanceConfig> instanceList = null;
     if (resourceGroupInfo != null) {
@@ -413,7 +404,8 @@ class RoutingTable {
       tagToResourceMap = new HashMap<>();
     }
 
-    public void addEntry(String resourceTag, String stateUnitKey, String state, InstanceConfig config) {
+    public void addEntry(String resourceTag, String stateUnitKey, String state,
+        InstanceConfig config) {
       // add the new entry to the aggregated resource info
       aggregatedResourceInfo.addEntry(stateUnitKey, state, config);
 
@@ -494,4 +486,4 @@ class RoutingTable {
           return config1.getId().compareTo(config2.getId());
         }
       };
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index ee0e638..cf666f8 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -63,7 +63,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class RoutingTableProvider
     implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
-               LiveInstanceChangeListener, CurrentStateChangeListener {
+    LiveInstanceChangeListener, CurrentStateChangeListener {
   private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
   private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes
   private final AtomicReference<RoutingTable> _routingTableRef;
@@ -82,7 +82,6 @@ public class RoutingTableProvider
   private ExecutorService _reportExecutor;
   private Future _reportingTask = null;
 
-
   public RoutingTableProvider() {
     this(null);
   }
@@ -98,7 +97,6 @@ public class RoutingTableProvider
 
   /**
    * Initialize an instance of RoutingTableProvider
-   *
    * @param helixManager
    * @param sourceDataType
    * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise
@@ -126,40 +124,40 @@ public class RoutingTableProvider
 
     if (_helixManager != null) {
       switch (_sourceDataType) {
-        case EXTERNALVIEW:
-          try {
-            _helixManager.addExternalViewChangeListener(this);
-          } catch (Exception e) {
-            shutdown();
-            logger.error("Failed to attach ExternalView Listener to HelixManager!");
-            throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
-          }
-          break;
-
-        case TARGETEXTERNALVIEW:
-          // Check whether target external has been enabled or not
-          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
-              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
-            shutdown();
-            throw new HelixException("Target External View is not enabled!");
-          }
+      case EXTERNALVIEW:
+        try {
+          _helixManager.addExternalViewChangeListener(this);
+        } catch (Exception e) {
+          shutdown();
+          logger.error("Failed to attach ExternalView Listener to HelixManager!");
+          throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+        }
+        break;
+
+      case TARGETEXTERNALVIEW:
+        // Check whether target external has been enabled or not
+        if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+            _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
+          shutdown();
+          throw new HelixException("Target External View is not enabled!");
+        }
 
-          try {
-            _helixManager.addTargetExternalViewChangeListener(this);
-          } catch (Exception e) {
-            shutdown();
-            logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
-            throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
-                e);
-          }
-          break;
+        try {
+          _helixManager.addTargetExternalViewChangeListener(this);
+        } catch (Exception e) {
+          shutdown();
+          logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
+          throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
+              e);
+        }
+        break;
 
-        case CURRENTSTATES:
-          // CurrentState change listeners will be added later in LiveInstanceChange call.
-          break;
+      case CURRENTSTATES:
+        // CurrentState change listeners will be added later in LiveInstanceChange call.
+        break;
 
-        default:
-          throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
+      default:
+        throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
       }
 
       try {
@@ -215,19 +213,19 @@ public class RoutingTableProvider
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
       switch (_sourceDataType) {
-        case EXTERNALVIEW:
-          _helixManager.removeListener(keyBuilder.externalViews(), this);
-          break;
-        case TARGETEXTERNALVIEW:
-          _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
-          break;
-        case CURRENTSTATES:
-          NotificationContext context = new NotificationContext(_helixManager);
-          context.setType(NotificationContext.Type.FINALIZE);
-          updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
-          break;
-        default:
-          break;
+      case EXTERNALVIEW:
+        _helixManager.removeListener(keyBuilder.externalViews(), this);
+        break;
+      case TARGETEXTERNALVIEW:
+        _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
+        break;
+      case CURRENTSTATES:
+        NotificationContext context = new NotificationContext(_helixManager);
+        context.setType(NotificationContext.Type.FINALIZE);
+        updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
+        break;
+      default:
+        break;
       }
     }
   }
@@ -503,11 +501,12 @@ public class RoutingTableProvider
           try {
             // add current-state listeners for new sessions
             manager.addCurrentStateChangeListener(this, instanceName, session);
-            logger.info("{} added current-state listener for instance: {}, session: {}, listener: {}",
+            logger.info(
+                "{} added current-state listener for instance: {}, session: {}, listener: {}",
                 manager.getInstanceName(), instanceName, session, this);
           } catch (Exception e) {
-            logger.error("Fail to add current state listener for instance: {} with session: {}", instanceName, session,
-                e);
+            logger.error("Fail to add current state listener for instance: {} with session: {}",
+                instanceName, session, e);
           }
         }
       }
@@ -600,7 +599,8 @@ public class RoutingTableProvider
           throw new HelixException("HelixManager is null for router update event.");
         }
         if (!manager.isConnected()) {
-          logger.error(String.format("HelixManager is not connected for router update event: %s", event));
+          logger.error(
+              String.format("HelixManager is not connected for router update event: %s", event));
           throw new HelixException("HelixManager is not connected for router update event.");
         }
 
@@ -620,7 +620,8 @@ public class RoutingTableProvider
           refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
               _dataCache.getLiveInstances().values());
 
-          recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
+          recordPropagationLatency(System.currentTimeMillis(),
+              _dataCache.getCurrentStateSnapshot());
           break;
         default:
           logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
@@ -635,13 +636,17 @@ public class RoutingTableProvider
      * Report current state to routing table propagation latency
      * This method is not threadsafe. Take care of _reportingTask atomicity if use in multi-threads.
      */
-    private void recordPropagationLatency(final long currentTime, final CurrentStateSnapshot currentStateSnapshot) {
-      // Note that due to the extra mem footprint introduced by currentStateSnapshot ref, we restrict running report task count to be 1.
+    private void recordPropagationLatency(final long currentTime,
+        final CurrentStateSnapshot currentStateSnapshot) {
+      // Note that due to the extra mem footprint introduced by currentStateSnapshot ref, we
+      // restrict running report task count to be 1.
       // Any parallel tasks will be skipped. So the reporting metric data is sampled.
       if (_reportingTask == null || _reportingTask.isDone()) {
         _reportingTask = _reportExecutor.submit(new Callable<Object>() {
-          @Override public Object call() {
-            // getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to avoid performance impact.
+          @Override
+          public Object call() {
+            // getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to
+            // avoid performance impact.
             Map<PropertyKey, Map<String, Long>> currentStateEndTimeMap =
                 currentStateSnapshot.getNewCurrentStateEndTimes();
             for (PropertyKey key : currentStateEndTimeMap.keySet()) {
@@ -654,7 +659,8 @@ public class RoutingTableProvider
                       "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
                       key.toString(), partition, endTime, currentTime - endTime);
                 } else {
-                  // Verbose log in case currentTime < endTime. This could be the case that Router clock is slower than the participant clock.
+                  // Verbose log in case currentTime < endTime. This could be the case that Router
+                  // clock is slower than the participant clock.
                   logger.trace(
                       "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
                       key.toString(), partition, endTime, currentTime - endTime);
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
index 1995c8d..2439906 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
@@ -27,7 +27,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 
 /**
- * The snapshot of RoutingTable information.  It is immutable, it reflects the routing table
+ * The snapshot of RoutingTable information. It is immutable, it reflects the routing table
  * information at the time it is generated.
  */
 public class RoutingTableSnapshot {
@@ -39,10 +39,8 @@ public class RoutingTableSnapshot {
 
   /**
    * returns all instances for {resource} that are in a specific {state}.
-   *
    * @param resourceName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
@@ -51,11 +49,9 @@ public class RoutingTableSnapshot {
 
   /**
    * returns the instances for {resource,partition} pair that are in a specific {state}
-   *
    * @param resourceName
    * @param partitionName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
@@ -66,10 +62,8 @@ public class RoutingTableSnapshot {
   /**
    * returns all instances for resources contains any given tags in {resource group} that are in a
    * specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
@@ -79,10 +73,8 @@ public class RoutingTableSnapshot {
 
   /**
    * returns all instances for all resources in {resource group} that are in a specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty set if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
@@ -94,11 +86,9 @@ public class RoutingTableSnapshot {
    * resource group that are in a specific {state}.
    * The return results aggregate all partition states from all the resources in the given resource
    * group.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
@@ -111,23 +101,20 @@ public class RoutingTableSnapshot {
    * are in a specific {state}.
    * Find all resources belongs to the given resource group that have any of the given resource tags
    * and return the aggregated partition states from all these resources.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
    * @param resourceTags
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
       String partitionName, String state, List<String> resourceTags) {
-    return _routingTable
-        .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, partitionName, state,
+        resourceTags);
   }
 
   /**
    * Return all liveInstances in the cluster now.
-   *
    * @return
    */
   public Collection<LiveInstance> getLiveInstances() {
@@ -136,7 +123,6 @@ public class RoutingTableSnapshot {
 
   /**
    * Return all instance's config in this cluster.
-   *
    * @return
    */
   public Collection<InstanceConfig> getInstanceConfigs() {
@@ -158,4 +144,4 @@ public class RoutingTableSnapshot {
   public Collection<ExternalView> getExternalViews() {
     return _routingTable.getExternalViews();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index b4fa131..ab352ec 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -97,7 +97,7 @@ public class JobConfig extends ResourceConfig {
     /**
      * The maximum number of times Helix will intentionally move a failing task
      */
-        MaxForcedReassignmentsPerTask,
+    MaxForcedReassignmentsPerTask,
     /**
      * The number of concurrent tasks that are allowed to run on an instance.
      */
@@ -799,4 +799,4 @@ public class JobConfig extends ResourceConfig {
       return Arrays.asList(vals);
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index 62ed935..0ce0c3d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -10,11 +10,9 @@ import java.util.Set;
 import java.util.SortedSet;
 import org.apache.helix.task.assigner.AssignableInstance;
 
-
 public abstract class TaskAssignmentCalculator {
   /**
    * Get all the partitions/tasks that belong to this job.
-   *
    * @param jobCfg the task configuration
    * @param jobCtx the task context
    * @param workflowCfg the workflow configuration
@@ -28,7 +26,6 @@ public abstract class TaskAssignmentCalculator {
 
   /**
    * Compute an assignment of tasks to instances
-   *
    * @param currStateOutput the current state of the instances
    * @param prevAssignment the previous task partition assignment
    * @param instances the instances
@@ -66,4 +63,4 @@ public abstract class TaskAssignmentCalculator {
     }
     return workflowType;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index f080f6d..55d1a2f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -230,4 +230,4 @@ public class TaskConfig {
       return new TaskConfig(command, rawConfigMap, taskId, targetPartition);
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index abfe988..14e0b43 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -50,7 +50,9 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
     this(manager, taskFactoryRegistry,
         Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
           private AtomicInteger threadId = new AtomicInteger(0);
-          @Override public Thread newThread(Runnable r) {
+
+          @Override
+          public Thread newThread(Runnable r) {
             return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
           }
         }));
@@ -62,7 +64,8 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
     _taskFactoryRegistry = taskFactoryRegistry;
     _taskExecutor = taskExecutor;
     _timerTaskExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-      @Override public Thread newThread(Runnable r) {
+      @Override
+      public Thread newThread(Runnable r) {
         return new Thread(r, "TaskStateModelFactory-timeTask_thread");
       }
     });
@@ -87,7 +90,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
     }
     _taskExecutor.shutdown();
     _timerTaskExecutor.shutdown();
-    if (_monitor != null ) {
+    if (_monitor != null) {
       _monitor.unregister();
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
index ba80e88..674d1dd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
+++ b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java
@@ -75,17 +75,17 @@ public abstract class UserContentStore {
    */
   public void putUserContent(String key, String value, Scope scope) {
     switch (scope) {
-      case WORKFLOW:
-        TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
-        break;
-      case JOB:
-        TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
-        break;
-      case TASK:
-        TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
-        break;
-      default:
-        throw new HelixException("Invalid scope : " + scope.name());
+    case WORKFLOW:
+      TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value);
+      break;
+    case JOB:
+      TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value);
+      break;
+    case TASK:
+      TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value);
+      break;
+    default:
+      throw new HelixException("Invalid scope : " + scope.name());
     }
   }
 
@@ -102,4 +102,4 @@ public abstract class UserContentStore {
     return TaskUtil.getUserContent(_manager.getHelixPropertyStore(), key, scope, _workflowName,
         _jobName, _taskName);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index aae0c01..bff0a65 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -641,4 +641,4 @@ public class WorkflowConfig extends ResourceConfig {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
index 05622e8..fd95292 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
@@ -144,4 +144,4 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
     sb.append("}");
     return sb.toString();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
index 1e8bbc3..f4d0473 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
@@ -46,7 +46,7 @@ public class TestListenerCallback extends ZkUnitTestBase {
       _configSize = configs.size();
     }
 
-    public void reset () {
+    public void reset() {
       _configChanged = false;
       _configSize = 0;
     }
@@ -69,8 +69,7 @@ public class TestListenerCallback extends ZkUnitTestBase {
     }
 
     @Override
-    public void onClusterConfigChange(ClusterConfig clusterConfig,
-        NotificationContext context) {
+    public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
       _clusterConfigChanged = true;
       _clusterConfig = clusterConfig;
     }
@@ -82,7 +81,7 @@ public class TestListenerCallback extends ZkUnitTestBase {
       _resourceConfigs = resourceConfigs;
     }
 
-    public void reset () {
+    public void reset() {
       _instanceConfigChanged = false;
       _resourceConfigChanged = false;
       _clusterConfigChanged = false;
@@ -92,7 +91,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
     }
   }
 
-
   int _numNodes = 3;
   int _numResources = 4;
   HelixManager _manager;
@@ -112,8 +110,8 @@ public class TestListenerCallback extends ZkUnitTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    _manager = HelixManagerFactory
-        .getZKHelixManager(_clusterName, "localhost", InstanceType.SPECTATOR, ZK_ADDR);
+    _manager = HelixManagerFactory.getZKHelixManager(_clusterName, "localhost",
+        InstanceType.SPECTATOR, ZK_ADDR);
 
     _manager.connect();
   }
@@ -144,8 +142,7 @@ public class TestListenerCallback extends ZkUnitTestBase {
     _manager.addResourceConfigChangeListener(listener);
     Assert.assertTrue(listener._resourceConfigChanged,
         "Should get initial resourceConfig callback invoked");
-    Assert.assertEquals(listener._resourceConfigs.size(), 0,
-        "Instance Config size does not match");
+    Assert.assertEquals(listener._resourceConfigs.size(), 0, "Instance Config size does not match");
 
     // test change content
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -170,7 +167,6 @@ public class TestListenerCallback extends ZkUnitTestBase {
         "Should get clusterConfig callback invoked since we change clusterConfig");
     Assert.assertNotNull(listener._clusterConfig, "Cluster Config size should not be null");
 
-
     String resourceName = "TestDB_0";
     value = new HelixProperty(resourceName);
     value._record.setSimpleField("" + System.currentTimeMillis(), "newValue");
@@ -200,14 +196,14 @@ public class TestListenerCallback extends ZkUnitTestBase {
 
     listener.reset();
     _manager.addConfigChangeListener(listener, ConfigScopeProperty.RESOURCE);
-    Assert
-        .assertTrue(listener._configChanged, "Should get initial resourceConfig callback invoked");
+    Assert.assertTrue(listener._configChanged,
+        "Should get initial resourceConfig callback invoked");
     Assert.assertEquals(listener._configSize, 0, "Resource Config size does not match");
 
     listener.reset();
     _manager.addConfigChangeListener(listener, ConfigScopeProperty.PARTICIPANT);
-    Assert
-        .assertTrue(listener._configChanged, "Should get initial instanceConfig callback invoked");
+    Assert.assertTrue(listener._configChanged,
+        "Should get initial instanceConfig callback invoked");
     Assert.assertEquals(listener._configSize, _numNodes, "Instance Config size does not match");
 
     // test change content
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
index 6a5a304..ec837f8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
@@ -1,5 +1,24 @@
 package org.apache.helix.integration;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.Arrays;
 import java.util.Map;
 import org.apache.helix.ConfigAccessor;
@@ -23,10 +42,10 @@ public class TestBatchEnableInstances extends TaskTestBase {
     _accessor = new ConfigAccessor(_gZkClient);
   }
 
-  @Test (enabled = false)
+  @Test(enabled = false)
   public void testOldEnableDisable() throws InterruptedException {
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        _participants[0].getInstanceName(), false);
     Assert.assertTrue(_clusterVerifier.verify());
 
     ExternalView externalView = _gSetupTool.getClusterManagementTool()
@@ -35,11 +54,11 @@ public class TestBatchEnableInstances extends TaskTestBase {
     for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) {
       Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
     }
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), true);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        _participants[0].getInstanceName(), true);
   }
 
-  @Test (enabled = false)
+  @Test(enabled = false)
   public void testBatchEnableDisable() throws InterruptedException {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
         Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()),
@@ -58,10 +77,10 @@ public class TestBatchEnableInstances extends TaskTestBase {
         true);
   }
 
-  @Test (enabled = false)
+  @Test(enabled = false)
   public void testOldDisableBatchEnable() throws InterruptedException {
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        _participants[0].getInstanceName(), false);
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
         Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()),
         true);
@@ -77,17 +96,17 @@ public class TestBatchEnableInstances extends TaskTestBase {
       }
     }
     Assert.assertTrue(numOfFirstHost > 0);
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), true);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        _participants[0].getInstanceName(), true);
   }
 
-  @Test (enabled = false)
+  @Test(enabled = false)
   public void testBatchDisableOldEnable() throws InterruptedException {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
         Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()),
         false);
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), true);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        _participants[0].getInstanceName(), true);
     Thread.sleep(2000);
 
     ExternalView externalView = _gSetupTool.getClusterManagementTool()
@@ -105,4 +124,4 @@ public class TestBatchEnableInstances extends TaskTestBase {
         Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()),
         true);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
index dbc8f90..5d1c8ea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -423,4 +423,4 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
index 8e669ac..1f9ec5f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionCancellation.java
@@ -91,9 +91,8 @@ public class TestStateTransitionCancellation extends TaskTestBase {
     Thread.sleep(2000);
 
     // Disable the resource
-    _gSetupTool.getClusterManagementTool()
-        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false);
-
+    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
+        WorkflowGenerator.DEFAULT_TGT_DB, false);
 
     // Wait for pipeline reaching final stage
     Assert.assertTrue(_verifier.verifyByPolling());
@@ -115,15 +114,15 @@ public class TestStateTransitionCancellation extends TaskTestBase {
 
     // Reenable resource
     stateCleanUp();
-    _gSetupTool.getClusterManagementTool()
-        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, true);
+    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
+        WorkflowGenerator.DEFAULT_TGT_DB, true);
 
     // Wait for assignment done
     Thread.sleep(2000);
 
     // Disable the resource
-    _gSetupTool.getClusterManagementTool()
-        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, false);
+    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
+        WorkflowGenerator.DEFAULT_TGT_DB, false);
 
     // Wait for pipeline reaching final stage
     Thread.sleep(2000L);
@@ -144,8 +143,8 @@ public class TestStateTransitionCancellation extends TaskTestBase {
 
     // Reenable resource
     stateCleanUp();
-    _gSetupTool.getClusterManagementTool()
-        .enableResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, true);
+    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
+        WorkflowGenerator.DEFAULT_TGT_DB, true);
 
     // Wait for assignment done
     Thread.sleep(2000);
@@ -182,7 +181,8 @@ public class TestStateTransitionCancellation extends TaskTestBase {
     InternalMockDelayMSStateModel._cancelledStatic = false;
   }
 
-  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR"
+  @StateModelInfo(initialState = "OFFLINE", states = {
+      "MASTER", "SLAVE", "ERROR"
   })
   public static class InternalMockDelayMSStateModel extends StateModel {
     private static Logger LOG = LoggerFactory.getLogger(MockDelayMSStateModel.class);
@@ -196,8 +196,8 @@ public class TestStateTransitionCancellation extends TaskTestBase {
       _cancelledFirstTime = true;
     }
 
-    @Transition(to = "SLAVE", from = "OFFLINE") public void onBecomeSlaveFromOffline(
-        Message message, NotificationContext context) {
+    @Transition(to = "SLAVE", from = "OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
       if (_delay > 0) {
         try {
           Thread.sleep(_delay);
@@ -208,8 +208,9 @@ public class TestStateTransitionCancellation extends TaskTestBase {
       LOG.info("Become SLAVE from OFFLINE");
     }
 
-    @Transition(to = "MASTER", from = "SLAVE") public void onBecomeMasterFromSlave(Message message,
-        NotificationContext context) throws InterruptedException, HelixRollbackException {
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context)
+        throws InterruptedException, HelixRollbackException {
       if (_cancelledFirstTime && _delay < 0) {
         while (!_cancelledStatic) {
           Thread.sleep(Math.abs(1000L));
@@ -220,18 +221,18 @@ public class TestStateTransitionCancellation extends TaskTestBase {
       LOG.error("Become MASTER from SLAVE");
     }
 
-    @Transition(to = "SLAVE", from = "MASTER") public void onBecomeSlaveFromMaster(Message message,
-        NotificationContext context) {
+    @Transition(to = "SLAVE", from = "MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
       LOG.info("Become Slave from Master");
     }
 
-    @Transition(to = "OFFLINE", from = "SLAVE") public void onBecomeOfflineFromSlave(
-        Message message, NotificationContext context) {
+    @Transition(to = "OFFLINE", from = "SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
       LOG.info("Become OFFLINE from SLAVE");
     }
 
-    @Transition(to = "DROPPED", from = "OFFLINE") public void onBecomeDroppedFromOffline(
-        Message message, NotificationContext context) {
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
       LOG.info("Become DROPPED FROM OFFLINE");
     }
 
@@ -292,4 +293,4 @@ public class TestStateTransitionCancellation extends TaskTestBase {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionThrottle.java
index f1628f6..6162879 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionThrottle.java
@@ -225,4 +225,4 @@ public class TestStateTransitionThrottle extends ZkTestBase {
       return currentState != null && !currentState.getPartitionStateMap().isEmpty();
     }, timeout);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
index deca2ea..e9d6d8a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java
@@ -36,6 +36,7 @@ public class TestTargetExternalView extends TaskTestBase {
 
   private ConfigAccessor _configAccessor;
   private HelixDataAccessor _accessor;
+
   @BeforeClass
   public void beforeClass() throws Exception {
     _numDbs = 3;
@@ -50,8 +51,7 @@ public class TestTargetExternalView extends TaskTestBase {
   @Test
   public void testTargetExternalViewEnable() throws InterruptedException {
     // Before enable target external view
-    Assert
-        .assertFalse(_gZkClient.exists(_accessor.keyBuilder().targetExternalViews().getPath()));
+    Assert.assertFalse(_gZkClient.exists(_accessor.keyBuilder().targetExternalViews().getPath()));
 
     ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.enableTargetExternalView(true);
@@ -59,12 +59,12 @@ public class TestTargetExternalView extends TaskTestBase {
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
     _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, _testDbs.get(0), 3);
 
-    ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    ZkHelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(verifier.verifyByPolling());
 
-    Assert
-        .assertEquals(_accessor.getChildNames(_accessor.keyBuilder().targetExternalViews()).size(),
-            3);
+    Assert.assertEquals(
+        _accessor.getChildNames(_accessor.keyBuilder().targetExternalViews()).size(), 3);
 
     List<ExternalView> targetExternalViews =
         _accessor.getChildValues(_accessor.keyBuilder().externalViews());
@@ -78,7 +78,8 @@ public class TestTargetExternalView extends TaskTestBase {
     }
 
     // Disable one instance to see whether the target external views changes.
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        _participants[0].getInstanceName(), false);
 
     Assert.assertTrue(verifier.verifyByPolling());
     Thread.sleep(1000);
@@ -92,4 +93,4 @@ public class TestTargetExternalView extends TaskTestBase {
           idealStates.get(i).getRecord().getListFields());
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
index 13248a8..5479ae6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -205,4 +205,4 @@ public class TestDeleteWorkflow extends TaskTestBase {
     Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME,
         TaskUtil.getNamespacedJobName(jobQueueName, "job1")));
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7f9a654..788f704 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -341,4 +341,4 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
     public void cancel() {
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
index e01eaef..f9c5106 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
@@ -52,10 +52,9 @@ public class TestJobAndWorkflowType extends TaskTestBase {
     _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
     String fetchedJobType =
         _driver.getJobConfig(String.format("%s_%s", jobName, jobName)).getJobType();
-    String fetchedWorkflowType =
-        _driver.getWorkflowConfig(jobName).getWorkflowType();
+    String fetchedWorkflowType = _driver.getWorkflowConfig(jobName).getWorkflowType();
 
     Assert.assertEquals(fetchedJobType, DEFAULT_TYPE);
     Assert.assertEquals(fetchedWorkflowType, DEFAULT_TYPE);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
index 3447e4b..f9ec15a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
@@ -148,4 +148,4 @@ public final class TestJobFailure extends TaskSynchronizedTestBase {
     }
     return targetPartitionConfigs;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureHighThreshold.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureHighThreshold.java
index 07f9182..43b536e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureHighThreshold.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureHighThreshold.java
@@ -38,7 +38,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 public class TestJobFailureHighThreshold extends TaskSynchronizedTestBase {
 
   private static final String DB_NAME = WorkflowGenerator.DEFAULT_TGT_DB;
@@ -65,30 +64,33 @@ public class TestJobFailureHighThreshold extends TaskSynchronizedTestBase {
   }
 
   /**
-   * Number of instance is equal to number of failure threshold, thus job failure mechanism needs to consider given up
-   * tasks that no longer exist on the instance, not only given up tasks currently reported on CurrentState.
+   * Number of instance is equal to number of failure threshold, thus job failure mechanism needs to
+   * consider given up
+   * tasks that no longer exist on the instance, not only given up tasks currently reported on
+   * CurrentState.
    */
   @Test
   public void testHighThreshold() throws InterruptedException {
     final String WORKFLOW_NAME = "testWorkflow";
     final String JOB_NAME = "testJob";
 
-    JobConfig.Builder jobBuilder = new JobConfig.Builder()
-        .setWorkflow(WORKFLOW_NAME)
-        .setTargetResource(DB_NAME)
-        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-        .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name()))
-        .setFailureThreshold(1);
-    Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_NAME)
-        .addJob(JOB_NAME, jobBuilder);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setWorkflow(WORKFLOW_NAME).setTargetResource(DB_NAME)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND)
+            .setJobCommandConfigMap(
+                ImmutableMap.of(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name()))
+            .setFailureThreshold(1);
+    Workflow.Builder workflowBuilder =
+        new Workflow.Builder(WORKFLOW_NAME).addJob(JOB_NAME, jobBuilder);
     _driver.start(workflowBuilder.build());
 
     _driver.pollForJobState(WORKFLOW_NAME, TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME),
         TaskState.FAILED);
     _driver.pollForWorkflowState(WORKFLOW_NAME, TaskState.FAILED);
 
-    JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME));
+    JobContext jobContext =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(WORKFLOW_NAME, JOB_NAME));
     int countAborted = 0;
     int countNoState = 0;
     for (int pId : jobContext.getPartitionSet()) {
@@ -104,4 +106,4 @@ public class TestJobFailureHighThreshold extends TaskSynchronizedTestBase {
     Assert.assertEquals(countAborted, 2); // Failure threshold is 1, so 2 tasks aborted.
     Assert.assertEquals(countNoState, 3); // Other 3 tasks are not scheduled at all.
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureTaskNotStarted.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureTaskNotStarted.java
index 79e892a..658d52a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureTaskNotStarted.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailureTaskNotStarted.java
@@ -52,7 +52,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 public class TestJobFailureTaskNotStarted extends TaskSynchronizedTestBase {
   private static final String DB_NAME = WorkflowGenerator.DEFAULT_TGT_DB;
   private static final String UNBALANCED_DB_NAME = "UnbalancedDB";
@@ -61,7 +60,7 @@ public class TestJobFailureTaskNotStarted extends TaskSynchronizedTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _participants =  new MockParticipantManager[_numNodes];
+    _participants = new MockParticipantManager[_numNodes];
     _numDbs = 1;
     _numNodes = 2;
     _numPartitions = 2;
@@ -81,19 +80,22 @@ public class TestJobFailureTaskNotStarted extends TaskSynchronizedTestBase {
     clusterConfig.stateTransitionCancelEnabled(true);
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
-    _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
   protected void startParticipantsWithStuckTaskStateModelFactory() {
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override public Task createNewTask(TaskCallbackContext context) {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
         return new MockTask(context);
       }
     });
 
-    List<String> instances = _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME);
+    List<String> instances =
+        _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME);
 
     _participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instances.get(0));
     StateMachineEngine stateMachine = _participants[0].getStateMachineEngine();
@@ -120,42 +122,42 @@ public class TestJobFailureTaskNotStarted extends TaskSynchronizedTestBase {
     final String FAIL_JOB_NAME = "failJob";
 
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
-    final int numTask = configAccessor.getClusterConfig(CLUSTER_NAME).getMaxConcurrentTaskPerInstance();
+    final int numTask =
+        configAccessor.getClusterConfig(CLUSTER_NAME).getMaxConcurrentTaskPerInstance();
 
-    // Tasks targeting the unbalanced DB, the instance is setup to stuck on INIT->RUNNING, so it takes all threads
+    // Tasks targeting the unbalanced DB, the instance is setup to stuck on INIT->RUNNING, so it
+    // takes all threads
     // on that instance.
-    JobConfig.Builder blockJobBuilder = new JobConfig.Builder()
-        .setWorkflow(BLOCK_WORKFLOW_NAME)
+    JobConfig.Builder blockJobBuilder = new JobConfig.Builder().setWorkflow(BLOCK_WORKFLOW_NAME)
         .setTargetResource(UNBALANCED_DB_NAME)
         .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-        .setCommand(MockTask.TASK_COMMAND)
-        .setNumConcurrentTasksPerInstance(numTask);
+        .setCommand(MockTask.TASK_COMMAND).setNumConcurrentTasksPerInstance(numTask);
 
-    Workflow.Builder blockWorkflowBuilder = new Workflow.Builder(BLOCK_WORKFLOW_NAME)
-        .addJob("blockJob", blockJobBuilder);
+    Workflow.Builder blockWorkflowBuilder =
+        new Workflow.Builder(BLOCK_WORKFLOW_NAME).addJob("blockJob", blockJobBuilder);
     _driver.start(blockWorkflowBuilder.build());
 
     Assert.assertTrue(TaskTestUtil.pollForAllTasksBlock(_manager.getHelixDataAccessor(),
         _blockedParticipant.getInstanceName(), numTask, 10000));
 
-    // Now, all HelixTask threads are stuck at INIT->RUNNING for task state transition(user task can't be submitted)
+    // Now, all HelixTask threads are stuck at INIT->RUNNING for task state transition(user task
+    // can't be submitted)
     // New tasks assigned to the instance won't start INIT->RUNNING transition at all.
 
     // A to-be-failed job, 2 tasks, 1 stuck and 1 fail, making the job fail.
-    JobConfig.Builder failJobBuilder = new JobConfig.Builder()
-        .setWorkflow(FAIL_WORKFLOW_NAME)
-        .setTargetResource(DB_NAME)
-        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-        .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name()));
+    JobConfig.Builder failJobBuilder =
+        new JobConfig.Builder().setWorkflow(FAIL_WORKFLOW_NAME).setTargetResource(DB_NAME)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(
+                ImmutableMap.of(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name()));
 
-    Workflow.Builder failWorkflowBuilder = new Workflow.Builder(FAIL_WORKFLOW_NAME)
-        .addJob(FAIL_JOB_NAME, failJobBuilder);
+    Workflow.Builder failWorkflowBuilder =
+        new Workflow.Builder(FAIL_WORKFLOW_NAME).addJob(FAIL_JOB_NAME, failJobBuilder);
 
     _driver.start(failWorkflowBuilder.build());
 
-    _driver.pollForJobState(FAIL_WORKFLOW_NAME, TaskUtil.getNamespacedJobName(FAIL_WORKFLOW_NAME, FAIL_JOB_NAME),
-        TaskState.FAILED);
+    _driver.pollForJobState(FAIL_WORKFLOW_NAME,
+        TaskUtil.getNamespacedJobName(FAIL_WORKFLOW_NAME, FAIL_JOB_NAME), TaskState.FAILED);
     _driver.pollForWorkflowState(FAIL_WORKFLOW_NAME, TaskState.FAILED);
 
     JobContext jobContext =
@@ -182,16 +184,18 @@ public class TestJobFailureTaskNotStarted extends TaskSynchronizedTestBase {
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, UNBALANCED_DB_NAME, 1);
 
     // Set preference list to put all partitions to one instance.
-    IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
-        UNBALANCED_DB_NAME);
+    IdealState idealState = _gSetupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, UNBALANCED_DB_NAME);
     Set<String> partitions = idealState.getPartitionSet();
     for (String partition : partitions) {
-      idealState.setPreferenceList(partition, Lists.newArrayList(_blockedParticipant.getInstanceName()));
+      idealState.setPreferenceList(partition,
+          Lists.newArrayList(_blockedParticipant.getInstanceName()));
     }
     idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
 
-    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, UNBALANCED_DB_NAME, idealState);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, UNBALANCED_DB_NAME,
+        idealState);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling(10000, 100));
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
index 9b171ff..15fb21f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
@@ -173,4 +173,4 @@ public final class TestJobTimeout extends TaskSynchronizedTestBase {
       Assert.assertNull(jobContext.getPartitionState(pId));
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
index c309b18..0b2bcf3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeoutTaskNotStarted.java
@@ -45,7 +45,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
 
   @BeforeClass
@@ -54,7 +53,7 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
     _numNodes = 1;
     _numPartitions = 50;
     _numReplicas = 1;
-    _participants =  new MockParticipantManager[_numNodes];
+    _participants = new MockParticipantManager[_numNodes];
     _gSetupTool.addCluster(CLUSTER_NAME, true);
     setupParticipants();
     setupDBs();
@@ -79,7 +78,8 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
   protected void startParticipantsWithStuckTaskStateModelFactory() {
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
     taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override public Task createNewTask(TaskCallbackContext context) {
+      @Override
+      public Task createNewTask(TaskCallbackContext context) {
         return new MockTask(context);
       }
     });
@@ -106,15 +106,13 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
     final String TIMEOUT_JOB_2 = "timeoutJob2";
 
     // 50 blocking tasks
-    JobConfig.Builder blockJobBuilder = new JobConfig.Builder()
-        .setWorkflow(BLOCK_WORKFLOW_NAME)
-        .setTargetResource(DB_NAME)
-        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-        .setCommand(MockTask.TASK_COMMAND)
-        .setNumConcurrentTasksPerInstance(_numPartitions);
-
-    Workflow.Builder blockWorkflowBuilder = new Workflow.Builder(BLOCK_WORKFLOW_NAME)
-        .addJob("blockJob", blockJobBuilder);
+    JobConfig.Builder blockJobBuilder =
+        new JobConfig.Builder().setWorkflow(BLOCK_WORKFLOW_NAME).setTargetResource(DB_NAME)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setNumConcurrentTasksPerInstance(_numPartitions);
+
+    Workflow.Builder blockWorkflowBuilder =
+        new Workflow.Builder(BLOCK_WORKFLOW_NAME).addJob("blockJob", blockJobBuilder);
     _driver.start(blockWorkflowBuilder.build());
 
     int numOfParticipantThreads = 40;
@@ -123,18 +121,22 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
     // Now, the HelixTask threadpool is full and blocked by blockJob.
     // New tasks assigned to the instance won't be assigned at all.
 
-    // 2 timeout jobs, first one timeout, but won't block the second one to run, the second one also timeout.
-    JobConfig.Builder timeoutJobBuilder = new JobConfig.Builder()
-        .setWorkflow(TIMEOUT_WORKFLOW_NAME)
-        .setTargetResource(DB_NAME)
-        .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-        .setCommand(MockTask.TASK_COMMAND)
-        .setNumConcurrentTasksPerInstance(_numPartitions)
-        .setTimeout(3000); // Wait a bit so that tasks are already assigned to the job (and will be cancelled)
+    // 2 timeout jobs, first one timeout, but won't block the second one to run, the second one also
+    // timeout.
+    JobConfig.Builder timeoutJobBuilder =
+        new JobConfig.Builder().setWorkflow(TIMEOUT_WORKFLOW_NAME).setTargetResource(DB_NAME)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setNumConcurrentTasksPerInstance(_numPartitions)
+            .setTimeout(3000); // Wait a bit so that tasks are already assigned to the job (and will
+                               // be cancelled)
 
     WorkflowConfig.Builder timeoutWorkflowConfigBuilder =
-        new WorkflowConfig.Builder(TIMEOUT_WORKFLOW_NAME).setFailureThreshold(
-            1); // workflow ignores first job's timeout and schedule second job and succeed.
+        new WorkflowConfig.Builder(TIMEOUT_WORKFLOW_NAME).setFailureThreshold(1); // workflow
+                                                                                  // ignores first
+                                                                                  // job's timeout
+                                                                                  // and schedule
+                                                                                  // second job and
+                                                                                  // succeed.
 
     Workflow.Builder timeoutWorkflowBuilder = new Workflow.Builder(TIMEOUT_WORKFLOW_NAME)
         .setWorkflowConfig(timeoutWorkflowConfigBuilder.build())
@@ -146,13 +148,14 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
         .addParentChildDependency(TIMEOUT_JOB_1, TIMEOUT_JOB_2);
 
     _driver.start(timeoutWorkflowBuilder.build());
-    _driver.pollForJobState(TIMEOUT_WORKFLOW_NAME, TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_1),
-        TaskState.TIMED_OUT);
-    _driver.pollForJobState(TIMEOUT_WORKFLOW_NAME, TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_2),
-        TaskState.TIMED_OUT);
+    _driver.pollForJobState(TIMEOUT_WORKFLOW_NAME,
+        TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_1), TaskState.TIMED_OUT);
+    _driver.pollForJobState(TIMEOUT_WORKFLOW_NAME,
+        TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_2), TaskState.TIMED_OUT);
     _driver.pollForWorkflowState(TIMEOUT_WORKFLOW_NAME, TaskState.FAILED);
 
-    JobContext jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_1));
+    JobContext jobContext =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_1));
     for (int pId : jobContext.getPartitionSet()) {
       if (jobContext.getAssignedParticipant(pId) != null) {
         // All tasks stuck at INIT->RUNNING, and state transition cancelled and marked TASK_ABORTED
@@ -160,7 +163,8 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
       }
     }
 
-    jobContext = _driver.getJobContext(TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_2));
+    jobContext =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(TIMEOUT_WORKFLOW_NAME, TIMEOUT_JOB_2));
     for (int pId : jobContext.getPartitionSet()) {
       if (jobContext.getAssignedParticipant(pId) != null) {
         // All tasks stuck at INIT->RUNNING, and state transition cancelled and marked TASK_ABORTED
@@ -168,4 +172,4 @@ public class TestJobTimeoutTaskNotStarted extends TaskSynchronizedTestBase {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index b5e1461..7f39331 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -710,4 +710,4 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
   private String generateInfoMessageForDebugging(String instanceName, String quotaType) {
     return String.format("Instance: %s, quotaType: %s", instanceName, quotaType);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
index 3b5970e..018821e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
@@ -128,7 +128,8 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase {
     startParticipant(_initialNumNodes);
 
     JobConfig.Builder jobBuilder =
-        new JobConfig.Builder().setWorkflow(WORKFLOW).setNumberOfTasks(10) // should be enough for
+        new JobConfig.Builder().setWorkflow(WORKFLOW).setNumberOfTasks(10)
+            // should be enough for
             // consistent hashing to
             // place tasks on
             // different instances
@@ -295,4 +296,4 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase {
     // Running tasks are also rebalanced
     Assert.assertTrue(checkTasksOnDifferentInstances());
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignmentCalculator.java
index baa8910..49c9d2b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignmentCalculator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignmentCalculator.java
@@ -228,4 +228,4 @@ public class TestTaskAssignmentCalculator extends TaskTestBase {
       return new TaskResult(TaskResult.Status.COMPLETED, "");
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 411c0e0..73d9a4f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -255,4 +255,4 @@ public class TestTaskRebalancer extends TaskTestBase {
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index 01b86ec..b338954 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -52,9 +52,8 @@ public class TestTaskRebalancerFailover extends TaskTestBase {
 
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
-    JobConfig.Builder job =
-        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job: " + job1Name);
     _driver.enqueueJob(queueName, job1Name, job);
@@ -124,4 +123,4 @@ public class TestTaskRebalancerFailover extends TaskTestBase {
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index b2908df..d8632f9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -65,4 +65,4 @@ public class TestTaskRebalancerRetryLimit extends TaskTestBase {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
index a4aa904..2e94f81 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRetryDelay.java
@@ -36,7 +36,8 @@ public class TestTaskRetryDelay extends TaskTestBase {
     super.beforeClass();
   }
 
-  @Test public void testTaskRetryWithDelay() throws Exception {
+  @Test
+  public void testTaskRetryWithDelay() throws Exception {
     String jobResource = TestHelper.getTestMethodName();
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
@@ -57,7 +58,8 @@ public class TestTaskRetryDelay extends TaskTestBase {
     Assert.assertTrue(finishedTime - startTime >= 2000L);
   }
 
-  @Test public void testTaskRetryWithoutDelay() throws Exception {
+  @Test
+  public void testTaskRetryWithoutDelay() throws Exception {
     String jobResource = TestHelper.getTestMethodName();
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
@@ -77,4 +79,4 @@ public class TestTaskRetryDelay extends TaskTestBase {
     // It should finished at less than 2 sec
     Assert.assertTrue(finishedTime - startTime <= 2000L);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
index 5b1b92b..cb4f897 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
@@ -192,4 +192,4 @@ public class TestTaskThrottling extends TaskTestBase {
           PARTICIPANT_PREFIX + "_" + (_startPort + i), instanceConfig);
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
index d3e7eef..f9d0ead 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowJobDependency.java
@@ -56,7 +56,8 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     for (int i = 0; i < _numDbs; i++) {
       // Let each job delay for 2 secs.
       JobConfig.Builder jobConfig = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-          .setTargetResource(_testDbs.get(i)).setTargetPartitionStates(Sets.newHashSet("SLAVE","MASTER"))
+          .setTargetResource(_testDbs.get(i))
+          .setTargetPartitionStates(Sets.newHashSet("SLAVE", "MASTER"))
           .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
       String jobName = "job" + _testDbs.get(i);
       builder.addJob(jobName, jobConfig);
@@ -75,9 +76,8 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     // Update the start time range.
     for (String jobName : workflow.getJobConfigs().keySet()) {
       JobContext context = _driver.getJobContext(jobName);
-      LOG.info(String
-          .format("JOB: %s starts from %s finishes at %s.", jobName, context.getStartTime(),
-              context.getFinishTime()));
+      LOG.info(String.format("JOB: %s starts from %s finishes at %s.", jobName,
+          context.getStartTime(), context.getFinishTime()));
 
       // Find job start time range.
       startTime = Math.max(context.getStartTime(), startTime);
@@ -101,7 +101,8 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     builder.addParentChildDependency("job" + _testDbs.get(0), "job" + _testDbs.get(1));
     for (int i = 0; i < 2; i++) {
       JobConfig.Builder jobConfig = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-          .setTargetResource(_testDbs.get(i)).setTargetPartitionStates(Sets.newHashSet("SLAVE","MASTER"))
+          .setTargetResource(_testDbs.get(i))
+          .setTargetPartitionStates(Sets.newHashSet("SLAVE", "MASTER"))
           .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
       String jobName = "job" + _testDbs.get(i);
       builder.addJob(jobName, jobConfig);
@@ -114,11 +115,10 @@ public class TestWorkflowJobDependency extends TaskTestBase {
     // Wait until the workflow completes
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
 
-
-    JobContext context1 = _driver
-        .getJobContext(TaskUtil.getNamespacedJobName(workflowName, "job" + _testDbs.get(0)));
-    JobContext context2 = _driver
-        .getJobContext(TaskUtil.getNamespacedJobName(workflowName, "job" + _testDbs.get(1)));
+    JobContext context1 =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "job" + _testDbs.get(0)));
+    JobContext context2 =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "job" + _testDbs.get(1)));
     Assert.assertTrue(context2.getStartTime() - context1.getFinishTime() >= 0L);
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index 0ce908f..f31f2df 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -1,5 +1,24 @@
 package org.apache.helix.integration.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.lang.management.ManagementFactory;
@@ -37,19 +56,14 @@ public class TestWorkflowTermination extends TaskTestBase {
     super.beforeClass();
   }
 
-  private JobConfig.Builder createJobConfigBuilder(String workflow, boolean shouldJobFail, long timeoutMs) {
+  private JobConfig.Builder createJobConfigBuilder(String workflow, boolean shouldJobFail,
+      long timeoutMs) {
     String taskState = shouldJobFail ? TaskState.FAILED.name() : TaskState.COMPLETED.name();
-    return new JobConfig.Builder()
-        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+    return new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
-        .setWorkflow(workflow)
-        .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(
-            ImmutableMap.of(
-                MockTask.JOB_DELAY, Long.toString(timeoutMs),
-                MockTask.TASK_RESULT_STATUS, taskState
-            )
-        );
+        .setWorkflow(workflow).setCommand(MockTask.TASK_COMMAND)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, Long.toString(timeoutMs),
+            MockTask.TASK_RESULT_STATUS, taskState));
   }
 
   @Test
@@ -60,14 +74,9 @@ public class TestWorkflowTermination extends TaskTestBase {
     JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 50);
     jobBuilder.setWorkflow(workflowName);
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
-        .setWorkflowConfig(
-            new WorkflowConfig.Builder(workflowName)
-                .setTimeout(timeout)
-                .setWorkFlowType(WORKFLOW_TYPE)
-                .build()
-        )
-        .addJob(JOB_NAME, jobBuilder)
-        .setExpiry(workflowExpiry);
+        .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(timeout)
+            .setWorkFlowType(WORKFLOW_TYPE).build())
+        .addJob(JOB_NAME, jobBuilder).setExpiry(workflowExpiry);
     _driver.start(workflowBuilder.build());
 
     // Timeout is longer than job finish so workflow status should be COMPLETED
@@ -81,7 +90,8 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     ObjectName objectName = getWorkflowMBeanObjectName(workflowName);
     Assert.assertEquals((long) beanServer.getAttribute(objectName, "SuccessfulWorkflowCount"), 1);
-    Assert.assertTrue((long) beanServer.getAttribute(objectName, "MaximumWorkflowLatencyGauge") > 0);
+    Assert
+        .assertTrue((long) beanServer.getAttribute(objectName, "MaximumWorkflowLatencyGauge") > 0);
     Assert.assertTrue((long) beanServer.getAttribute(objectName, "WorkflowLatencyCount") > 0);
 
   }
@@ -97,16 +107,10 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     // Create a workflow where job2 depends on job1. Workflow would timeout before job1 finishes
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
-        .setWorkflowConfig(
-            new WorkflowConfig.Builder(workflowName)
-                .setTimeout(timeout)
-                .setWorkFlowType(WORKFLOW_TYPE)
-                .build()
-        )
-        .addJob(JOB_NAME, jobBuilder)
-        .addJob(notStartedJobName, jobBuilder)
-        .addParentChildDependency(JOB_NAME, notStartedJobName)
-        .setExpiry(workflowExpiry);
+        .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(timeout)
+            .setWorkFlowType(WORKFLOW_TYPE).build())
+        .addJob(JOB_NAME, jobBuilder).addJob(notStartedJobName, jobBuilder)
+        .addParentChildDependency(JOB_NAME, notStartedJobName).setExpiry(workflowExpiry);
 
     _driver.start(workflowBuilder.build());
 
@@ -114,7 +118,8 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     // Running job should be marked as timeout
     // and job not started should not appear in workflow context
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, JOB_NAME), 10000L, TaskState.TIMED_OUT);
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, JOB_NAME), 10000L,
+        TaskState.TIMED_OUT);
 
     WorkflowContext context = _driver.getWorkflowContext(workflowName);
     Assert.assertNull(context.getJobState(notStartedJobName));
@@ -136,16 +141,10 @@ public class TestWorkflowTermination extends TaskTestBase {
     JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 5000);
     jobBuilder.setWorkflow(workflowName);
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
-        .setWorkflowConfig(
-            new WorkflowConfig.Builder(workflowName)
-                .setTimeout(timeout)
-                .setWorkFlowType(WORKFLOW_TYPE)
-                .build()
-        )
-        .addJob(JOB_NAME, jobBuilder)
-        .addJob(notStartedJobName, jobBuilder)
-        .addParentChildDependency(JOB_NAME, notStartedJobName)
-        .setExpiry(workflowExpiry);
+        .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(timeout)
+            .setWorkFlowType(WORKFLOW_TYPE).build())
+        .addJob(JOB_NAME, jobBuilder).addJob(notStartedJobName, jobBuilder)
+        .addParentChildDependency(JOB_NAME, notStartedJobName).setExpiry(workflowExpiry);
 
     _driver.start(workflowBuilder.build());
 
@@ -179,14 +178,10 @@ public class TestWorkflowTermination extends TaskTestBase {
     // Make jobs run success
     JobConfig.Builder jobBuilder = createJobConfigBuilder(queueName, false, 10);
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(queueName);
-    jobQueue.setWorkflowConfig(
-        new WorkflowConfig.Builder(queueName)
-            .setTimeout(timeout)
-            .setWorkFlowType(WORKFLOW_TYPE)
-            .build()
-    )
-        .enqueueJob(JOB_NAME, jobBuilder)
-        .enqueueJob(JOB_NAME + 1, jobBuilder);
+    jobQueue
+        .setWorkflowConfig(new WorkflowConfig.Builder(queueName).setTimeout(timeout)
+            .setWorkFlowType(WORKFLOW_TYPE).build())
+        .enqueueJob(JOB_NAME, jobBuilder).enqueueJob(JOB_NAME + 1, jobBuilder);
 
     _driver.start(jobQueue.build());
 
@@ -215,22 +210,12 @@ public class TestWorkflowTermination extends TaskTestBase {
     JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, true, 1);
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
-        .setWorkflowConfig(
-            new WorkflowConfig.Builder(workflowName)
-                .setWorkFlowType(WORKFLOW_TYPE)
-                .setTimeout(timeout).setParallelJobs(4)
-                .setFailureThreshold(1)
-                .build()
-        )
-        .addJob(job1, jobBuilder)
-        .addJob(job2, jobBuilder)
-        .addJob(job3, failedJobBuilder)
-        .addJob(job4, jobBuilder)
-        .addParentChildDependency(job1, job2)
-        .addParentChildDependency(job1, job3)
-        .addParentChildDependency(job2, job4)
-        .addParentChildDependency(job3, job4)
-        .setExpiry(workflowExpiry);
+        .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setWorkFlowType(WORKFLOW_TYPE)
+            .setTimeout(timeout).setParallelJobs(4).setFailureThreshold(1).build())
+        .addJob(job1, jobBuilder).addJob(job2, jobBuilder).addJob(job3, failedJobBuilder)
+        .addJob(job4, jobBuilder).addParentChildDependency(job1, job2)
+        .addParentChildDependency(job1, job3).addParentChildDependency(job2, job4)
+        .addParentChildDependency(job3, job4).setExpiry(workflowExpiry);
 
     _driver.start(workflowBuilder.build());
 
@@ -262,13 +247,9 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     // For a failed workflow, after timing out, it will be purged
     Thread.sleep(workflowExpiry + 200);
-    verifyWorkflowCleanup(
-        workflowName,
-        getJobNameToPoll(workflowName, job1),
-        getJobNameToPoll(workflowName, job2),
-        getJobNameToPoll(workflowName, job3),
-        getJobNameToPoll(workflowName, job4)
-    );
+    verifyWorkflowCleanup(workflowName, getJobNameToPoll(workflowName, job1),
+        getJobNameToPoll(workflowName, job2), getJobNameToPoll(workflowName, job3),
+        getJobNameToPoll(workflowName, job4));
   }
 
   private void verifyWorkflowCleanup(String workflowName, String... jobNames) {
@@ -286,8 +267,7 @@ public class TestWorkflowTermination extends TaskTestBase {
 
   private ObjectName getWorkflowMBeanObjectName(String workflowName)
       throws MalformedObjectNameException {
-    return new ObjectName(String
-        .format("%s:%s=%s, %s=%s", MonitorDomainNames.ClusterStatus.name(), "cluster",
-            CLUSTER_NAME, "workflowType", WORKFLOW_TYPE));
+    return new ObjectName(String.format("%s:%s=%s, %s=%s", MonitorDomainNames.ClusterStatus.name(),
+        "cluster", CLUSTER_NAME, "workflowType", WORKFLOW_TYPE));
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
index fb8cdd3..3025d69 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
@@ -1,5 +1,24 @@
 package org.apache.helix.integration.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.util.Collections;
@@ -7,7 +26,6 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
@@ -29,8 +47,7 @@ public class TestWorkflowTimeout extends TaskTestBase {
     super.beforeClass();
 
     // Create a non-stop job
-    _jobBuilder = new JobConfig.Builder()
-        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+    _jobBuilder = new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
         .setCommand(MockTask.TASK_COMMAND)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
@@ -87,7 +104,7 @@ public class TestWorkflowTimeout extends TaskTestBase {
   public void testWorkflowTimeoutWhenWorkflowCompleted() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
     _jobBuilder.setWorkflow(workflowName);
-    _jobBuilder.setJobCommandConfigMap(Collections.<String, String>emptyMap());
+    _jobBuilder.setJobCommandConfigMap(Collections.<String, String> emptyMap());
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
         .setWorkflowConfig(new WorkflowConfig.Builder(workflowName).setTimeout(0).build())
         .addJob(JOB_NAME, _jobBuilder).setExpiry(2000L);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index 25f043a..39fe9f7 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -65,7 +65,7 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     super.beforeClass();
-    _participants =  new MockParticipantManager[_numNodes];
+    _participants = new MockParticipantManager[_numNodes];
     _gSetupTool.addCluster(CLUSTER_NAME, true);
     setupParticipants();
     setupDBs();
@@ -97,24 +97,26 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
       for (int i = 0; i < _numDbs; i++) {
         int varyNum = _partitionVary ? 10 * i : 0;
         String db = WorkflowGenerator.DEFAULT_TGT_DB + i;
-        clusterSetup
-            .addResourceToCluster(CLUSTER_NAME, db, _numPartitions + varyNum, MASTER_SLAVE_STATE_MODEL,
-                IdealState.RebalanceMode.FULL_AUTO.toString());
+        clusterSetup.addResourceToCluster(CLUSTER_NAME, db, _numPartitions + varyNum,
+            MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.toString());
         clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, db, _numReplicas);
         _testDbs.add(db);
       }
     } else {
       if (_instanceGroupTag) {
-        clusterSetup
-            .addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numPartitions,
-                "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name());
-        IdealState idealState = clusterSetup.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
+        clusterSetup.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
+            _numPartitions, "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name());
+        IdealState idealState = clusterSetup.getClusterManagementTool()
+            .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
         idealState.setInstanceGroupTag("TESTTAG0");
-        clusterSetup.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
+        clusterSetup.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
+            WorkflowGenerator.DEFAULT_TGT_DB, idealState);
       } else {
-        clusterSetup.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numPartitions, MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
+        clusterSetup.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
+            _numPartitions, MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
       }
-      clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numReplicas);
+      clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
+          _numReplicas);
     }
   }
 
@@ -192,17 +194,16 @@ public class TaskSynchronizedTestBase extends ZkTestBase {
   }
 
   protected void createManagers(String zkAddr, String clusterName) throws Exception {
-    _manager = HelixManagerFactory
-        .getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR, zkAddr);
+    _manager = HelixManagerFactory.getZKHelixManager(clusterName, "Admin",
+        InstanceType.ADMINISTRATOR, zkAddr);
     _manager.connect();
     _driver = new TaskDriver(_manager);
   }
 
-
   public void setSingleTestEnvironment() {
     _numDbs = 1;
     _numNodes = 1;
     _numPartitions = 1;
     _numReplicas = 1;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
index b5d5e44..b13b0fb 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
@@ -248,4 +248,4 @@ public class TestAssignableInstanceManager {
       return _partitionToTaskIDMap.get(p);
     }
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
index 590e4ad..1e8d17f 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
@@ -137,16 +137,16 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase
         taskConfigs.add(taskConfigBuilder.build());
       }
       String jobName = "JOB_" + i;
+      // Long-running job
       JobConfig.Builder jobBuilder =
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(10000)
               .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG)
               .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true)
               .setFailureThreshold(100000)
-              .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "120000")); // Long-running
-      // job
+              .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "120000"));
       builder.addJob(jobName, jobBuilder);
     }
     // Start the workflow
     _driver.start(builder.build());
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
index f75a60e..acbb7b8 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskExecInfo.java
@@ -125,4 +125,4 @@ public class TestGetLastScheduledTaskExecInfo extends TaskTestBase {
     Collections.sort(startTimes);
     return startTimes;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java b/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java
index 94e6c62..fffe1fd 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestSemiAutoStateTransition.java
@@ -99,4 +99,4 @@ public class TestSemiAutoStateTransition extends TaskTestBase {
     Assert.assertEquals("MASTER", stateMap.get(PARTICIPANT_PREFIX + "_" + (_startPort + 1)));
     Assert.assertEquals("SLAVE", stateMap.get(PARTICIPANT_PREFIX + "_" + (_startPort + 2)));
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
index 928b0f1..7571bf8 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
@@ -66,4 +66,4 @@ import org.apache.helix.model.LiveInstance;
     }
     return clusterConfig;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 5144bb6..ce2e1ad 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -274,10 +274,10 @@ public class TestAssignableInstance extends AssignerTestBase {
     AssignableInstance ai =
         new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true),
             new InstanceConfig(testInstanceName), createLiveInstance(new String[] {
-            LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
-        }, new String[] {
-            "40"
-        }));
+                LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
+            }, new String[] {
+                "40"
+            }));
 
     Map<String, TaskConfig> currentAssignments = new HashMap<>();
     TaskConfig supportedTask = new TaskConfig("", null, "supportedTask", "");
@@ -335,4 +335,4 @@ public class TestAssignableInstance extends AssignerTestBase {
     }
     return expectedQuotaPerType;
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
index d09028d..f5fa3f9 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java
@@ -28,7 +28,6 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -102,9 +101,8 @@ public class TestClusterStateVerifier extends ZkUnitTestBase {
   public void testEntireCluster() {
     // Just ensure that the entire cluster passes
     // ensure that the external view coalesces
-    boolean result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
+    boolean result = ClusterStateVerifier
+        .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
     Assert.assertTrue(result);
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index 420beba..9c8269d 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -24,7 +24,6 @@ import java.io.PrintWriter;
 import java.lang.reflect.Method;
 import java.util.Date;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
@@ -34,14 +33,12 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.store.ZNRecordJsonSerializer;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
 import org.testng.Assert;
diff --git a/helix-front/client/app/chooser/helix-list/helix-list.component.html b/helix-front/client/app/chooser/helix-list/helix-list.component.html
index 5e494a4..5c01336 100644
--- a/helix-front/client/app/chooser/helix-list/helix-list.component.html
+++ b/helix-front/client/app/chooser/helix-list/helix-list.component.html
@@ -5,8 +5,8 @@
   <mat-card-content>
     <section *ngFor="let section of groups[group]" class="section">
       <a *ngFor="let helix of keys(section)"
-        mat-button
-        [routerLink]="['/', group + '.' + helix]">
+         mat-button
+         [routerLink]="['/', group + '.' + helix]">
         <mat-icon>group_work</mat-icon>
         {{ helix }}
       </a>
diff --git a/helix-front/pom.xml b/helix-front/pom.xml
index 797553c..91dd0bf 100644
--- a/helix-front/pom.xml
+++ b/helix-front/pom.xml
@@ -17,11 +17,12 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
-   <groupId>org.apache.helix</groupId>
-   <artifactId>helix</artifactId>
-   <version>0.8.5-SNAPSHOT</version>
+    <groupId>org.apache.helix</groupId>
+    <artifactId>helix</artifactId>
+    <version>0.8.5-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml
index baaf1f5..9b81166 100644
--- a/helix-rest/pom.xml
+++ b/helix-rest/pom.xml
@@ -17,7 +17,8 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <groupId>org.apache.helix</groupId>
     <artifactId>helix</artifactId>
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index ef9e096..cce88bf 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -54,10 +54,10 @@ import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 import org.codehaus.jackson.node.ObjectNode;
 
-
 @Path("/clusters/{clusterId}/resources")
 public class ResourceAccessor extends AbstractHelixResource {
   private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class);
+
   public enum ResourceProperties {
     idealState,
     idealStates,
@@ -100,7 +100,6 @@ public class ResourceAccessor extends AbstractHelixResource {
 
   /**
    * Returns health profile of all resources in the cluster
-   *
    * @param clusterId
    * @return JSON result
    */
@@ -110,20 +109,25 @@ public class ResourceAccessor extends AbstractHelixResource {
 
     HelixZkClient zkClient = getHelixZkClient();
 
-    List<String> resourcesInIdealState = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
-    List<String> resourcesInExternalView = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
+    List<String> resourcesInIdealState =
+        zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
+    List<String> resourcesInExternalView =
+        zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
 
     Map<String, String> resourceHealthResult = new HashMap<>();
 
     for (String resourceName : resourcesInIdealState) {
-      if(resourcesInExternalView.contains(resourceName)) {
+      if (resourcesInExternalView.contains(resourceName)) {
         Map<String, String> partitionHealth = computePartitionHealth(clusterId, resourceName);
 
-        if (partitionHealth.isEmpty() || partitionHealth.values().contains(HealthStatus.UNHEALTHY.name())) {
-          // No partitions for a resource or there exists one or more UNHEALTHY partitions in this resource, UNHEALTHY
+        if (partitionHealth.isEmpty()
+            || partitionHealth.values().contains(HealthStatus.UNHEALTHY.name())) {
+          // No partitions for a resource or there exists one or more UNHEALTHY partitions in this
+          // resource, UNHEALTHY
           resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
         } else if (partitionHealth.values().contains(HealthStatus.PARTIAL_HEALTHY.name())) {
-          // No UNHEALTHY partition, but one or more partially healthy partitions, resource is partially healthy
+          // No UNHEALTHY partition, but one or more partially healthy partitions, resource is
+          // partially healthy
           resourceHealthResult.put(resourceName, HealthStatus.PARTIAL_HEALTHY.name());
         } else {
           // No UNHEALTHY or partially healthy partitions and non-empty, resource is healthy
@@ -140,7 +144,6 @@ public class ResourceAccessor extends AbstractHelixResource {
 
   /**
    * Returns health profile of all partitions for the corresponding resource in the cluster
-   *
    * @param clusterId
    * @param resourceName
    * @return JSON result
@@ -231,7 +234,7 @@ public class ResourceAccessor extends AbstractHelixResource {
       @PathParam("resourceName") String resourceName, @QueryParam("command") String command,
       @DefaultValue("-1") @QueryParam("replicas") int replicas,
       @DefaultValue("") @QueryParam("keyPrefix") String keyPrefix,
-      @DefaultValue("") @QueryParam("group") String group){
+      @DefaultValue("") @QueryParam("group") String group) {
     Command cmd;
     try {
       cmd = Command.valueOf(command);
@@ -420,10 +423,16 @@ public class ResourceAccessor extends AbstractHelixResource {
     HelixAdmin admin = getHelixAdmin();
     IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
     ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
-    StateModelDefinition stateModelDef = admin.getStateModelDef(clusterId, idealState.getStateModelDefRef());
+    StateModelDefinition stateModelDef =
+        admin.getStateModelDef(clusterId, idealState.getStateModelDefRef());
     String initialState = stateModelDef.getInitialState();
     List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
-    statesPriorityList = statesPriorityList.subList(0, statesPriorityList.indexOf(initialState)); // Trim stateList to initialState and above
+    statesPriorityList = statesPriorityList.subList(0, statesPriorityList.indexOf(initialState)); // Trim
+                                                                                                  // stateList
+                                                                                                  // to
+                                                                                                  // initialState
+                                                                                                  // and
+                                                                                                  // above
     int minActiveReplicas = idealState.getMinActiveReplicas();
 
     // Start the logic that determines the health status of each partition
@@ -431,30 +440,37 @@ public class ResourceAccessor extends AbstractHelixResource {
     Set<String> allPartitionNames = idealState.getPartitionSet();
     if (!allPartitionNames.isEmpty()) {
       for (String partitionName : allPartitionNames) {
-        int replicaCount = idealState.getReplicaCount(idealState.getPreferenceList(partitionName).size());
-        // Simplify expectedStateCountMap by assuming that all instances are available to reduce computation load on this REST endpoint
+        int replicaCount =
+            idealState.getReplicaCount(idealState.getPreferenceList(partitionName).size());
+        // Simplify expectedStateCountMap by assuming that all instances are available to reduce
+        // computation load on this REST endpoint
         LinkedHashMap<String, Integer> expectedStateCountMap =
             stateModelDef.getStateCountMap(replicaCount, replicaCount);
         // Extract all states into Collections from ExternalView
         Map<String, String> stateMapInExternalView = externalView.getStateMap(partitionName);
         Collection<String> allReplicaStatesInExternalView =
-            (stateMapInExternalView != null && !stateMapInExternalView.isEmpty()) ?
-                stateMapInExternalView.values() : Collections.<String>emptyList();
+            (stateMapInExternalView != null && !stateMapInExternalView.isEmpty())
+                ? stateMapInExternalView.values()
+                : Collections.<String> emptyList();
         int numActiveReplicasInExternalView = 0;
         HealthStatus status = HealthStatus.HEALTHY;
 
         // Go through all states that are "active" states (higher priority than InitialState)
-        for (int statePriorityIndex = 0; statePriorityIndex < statesPriorityList.size(); statePriorityIndex++) {
+        for (int statePriorityIndex = 0; statePriorityIndex < statesPriorityList
+            .size(); statePriorityIndex++) {
           String currentState = statesPriorityList.get(statePriorityIndex);
           int currentStateCountInIdealState = expectedStateCountMap.get(currentState);
-          int currentStateCountInExternalView = Collections.frequency(allReplicaStatesInExternalView, currentState);
+          int currentStateCountInExternalView =
+              Collections.frequency(allReplicaStatesInExternalView, currentState);
           numActiveReplicasInExternalView += currentStateCountInExternalView;
           // Top state counts must match, if not, unhealthy
-          if (statePriorityIndex == 0 && currentStateCountInExternalView != currentStateCountInIdealState) {
+          if (statePriorityIndex == 0
+              && currentStateCountInExternalView != currentStateCountInIdealState) {
             status = HealthStatus.UNHEALTHY;
             break;
           } else if (currentStateCountInExternalView < currentStateCountInIdealState) {
-            // For non-top states, if count in ExternalView is less than count in IdealState, partially healthy
+            // For non-top states, if count in ExternalView is less than count in IdealState,
+            // partially healthy
             status = HealthStatus.PARTIAL_HEALTHY;
           }
         }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
index a90de7b..f136d9f 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
@@ -62,8 +62,8 @@ public class TestResourceAccessor extends AbstractTestClass {
   public void testGetResources() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
 
-    String body =
-        get("clusters/" + CLUSTER_NAME + "/resources", null, Response.Status.OK.getStatusCode(), true);
+    String body = get("clusters/" + CLUSTER_NAME + "/resources", null,
+        Response.Status.OK.getStatusCode(), true);
 
     JsonNode node = OBJECT_MAPPER.readTree(body);
     String idealStates =
@@ -72,9 +72,8 @@ public class TestResourceAccessor extends AbstractTestClass {
 
     Set<String> resources = OBJECT_MAPPER.readValue(idealStates,
         OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class));
-    Assert.assertEquals(resources, _resourcesMap.get("TestCluster_0"),
-        "Resources from response: " + resources + " vs clusters actually: " + _resourcesMap
-            .get("TestCluster_0"));
+    Assert.assertEquals(resources, _resourcesMap.get("TestCluster_0"), "Resources from response: "
+        + resources + " vs clusters actually: " + _resourcesMap.get("TestCluster_0"));
   }
 
   @Test(dependsOnMethods = "testGetResources")
@@ -97,9 +96,8 @@ public class TestResourceAccessor extends AbstractTestClass {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String newResourceName = "newResource";
     IdealState idealState = new IdealState(newResourceName);
-    idealState.getRecord().getSimpleFields().putAll(
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME)
-            .getRecord().getSimpleFields());
+    idealState.getRecord().getSimpleFields().putAll(_gSetupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME).getRecord().getSimpleFields());
 
     // Add resource by IdealState
     Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(idealState.getRecord()),
@@ -114,7 +112,7 @@ public class TestResourceAccessor extends AbstractTestClass {
     entity = Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
 
     put("clusters/" + CLUSTER_NAME + "/resources/" + newResourceName + "0", ImmutableMap
-            .of("numPartitions", "4", "stateModelRef", "OnlineOffline", "rebalancerMode", "FULL_AUTO"),
+        .of("numPartitions", "4", "stateModelRef", "OnlineOffline", "rebalancerMode", "FULL_AUTO"),
         entity, Response.Status.OK.getStatusCode());
 
     IdealState queryIdealState = new FullAutoModeISBuilder(newResourceName + 0).setNumPartitions(4)
@@ -139,8 +137,8 @@ public class TestResourceAccessor extends AbstractTestClass {
   public void testIdealState() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
 
-    String body = get("clusters/" + CLUSTER_NAME + "/resources/" + RESOURCE_NAME + "/idealState", null,
-        Response.Status.OK.getStatusCode(), true);
+    String body = get("clusters/" + CLUSTER_NAME + "/resources/" + RESOURCE_NAME + "/idealState",
+        null, Response.Status.OK.getStatusCode(), true);
     IdealState idealState = new IdealState(toZNRecord(body));
     Assert.assertEquals(idealState,
         _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME));
@@ -150,8 +148,8 @@ public class TestResourceAccessor extends AbstractTestClass {
   public void testExternalView() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
 
-    String body = get("clusters/" + CLUSTER_NAME + "/resources/" + RESOURCE_NAME + "/externalView", null,
-        Response.Status.OK.getStatusCode(), true);
+    String body = get("clusters/" + CLUSTER_NAME + "/resources/" + RESOURCE_NAME + "/externalView",
+        null, Response.Status.OK.getStatusCode(), true);
     ExternalView externalView = new ExternalView(toZNRecord(body));
     Assert.assertEquals(externalView, _gSetupTool.getClusterManagementTool()
         .getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME));
@@ -174,9 +172,15 @@ public class TestResourceAccessor extends AbstractTestClass {
 
     // Create a mock state mapping for testing
     Map<String, List<String>> partitionReplicaStates = new LinkedHashMap<>();
-    String[] p0 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p1 = {"MASTER", "SLAVE", "ERROR"};
-    String[] p2 = {"ERROR", "SLAVE", "SLAVE"};
+    String[] p0 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p1 = {
+        "MASTER", "SLAVE", "ERROR"
+    };
+    String[] p2 = {
+        "ERROR", "SLAVE", "SLAVE"
+    };
     partitionReplicaStates.put("p0", Arrays.asList(p0));
     partitionReplicaStates.put("p1", Arrays.asList(p1));
     partitionReplicaStates.put("p2", Arrays.asList(p2));
@@ -188,7 +192,9 @@ public class TestResourceAccessor extends AbstractTestClass {
         Response.Status.OK.getStatusCode(), true);
 
     JsonNode node = OBJECT_MAPPER.readTree(body);
-    Map<String, String> healthStatus = OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>(){});
+    Map<String, String> healthStatus =
+        OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>() {
+        });
 
     Assert.assertEquals(healthStatus.get("p0"), "HEALTHY");
     Assert.assertEquals(healthStatus.get("p1"), "PARTIAL_HEALTHY");
@@ -210,9 +216,15 @@ public class TestResourceAccessor extends AbstractTestClass {
     // Create a healthy resource
     String resourceNameHealthy = clusterName + "_db_0";
     Map<String, List<String>> partitionReplicaStates = new LinkedHashMap<>();
-    String[] p0 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p1 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p2 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p0 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p1 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p2 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
     partitionReplicaStates.put("p0", Arrays.asList(p0));
     partitionReplicaStates.put("p1", Arrays.asList(p1));
     partitionReplicaStates.put("p2", Arrays.asList(p2));
@@ -222,33 +234,49 @@ public class TestResourceAccessor extends AbstractTestClass {
     // Create a partially healthy resource
     String resourceNamePartiallyHealthy = clusterName + "_db_1";
     Map<String, List<String>> partitionReplicaStates_1 = new LinkedHashMap<>();
-    String[] p0_1 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p1_1 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p2_1 = {"MASTER", "SLAVE", "ERROR"};
+    String[] p0_1 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p1_1 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p2_1 = {
+        "MASTER", "SLAVE", "ERROR"
+    };
     partitionReplicaStates_1.put("p0", Arrays.asList(p0_1));
     partitionReplicaStates_1.put("p1", Arrays.asList(p1_1));
     partitionReplicaStates_1.put("p2", Arrays.asList(p2_1));
 
-    createDummyMapping(clusterName, resourceNamePartiallyHealthy, idealStateParams, partitionReplicaStates_1);
+    createDummyMapping(clusterName, resourceNamePartiallyHealthy, idealStateParams,
+        partitionReplicaStates_1);
 
     // Create a partially healthy resource
     String resourceNameUnhealthy = clusterName + "_db_2";
     Map<String, List<String>> partitionReplicaStates_2 = new LinkedHashMap<>();
-    String[] p0_2 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p1_2 = {"MASTER", "SLAVE", "SLAVE"};
-    String[] p2_2 = {"ERROR", "SLAVE", "ERROR"};
+    String[] p0_2 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p1_2 = {
+        "MASTER", "SLAVE", "SLAVE"
+    };
+    String[] p2_2 = {
+        "ERROR", "SLAVE", "ERROR"
+    };
     partitionReplicaStates_2.put("p0", Arrays.asList(p0_2));
     partitionReplicaStates_2.put("p1", Arrays.asList(p1_2));
     partitionReplicaStates_2.put("p2", Arrays.asList(p2_2));
 
-    createDummyMapping(clusterName, resourceNameUnhealthy, idealStateParams, partitionReplicaStates_2);
+    createDummyMapping(clusterName, resourceNameUnhealthy, idealStateParams,
+        partitionReplicaStates_2);
 
     // Get the result of getResourceHealth
-    String body = get("clusters/" + clusterName + "/resources/health", null, Response.Status.OK.getStatusCode(),
-        true);
+    String body = get("clusters/" + clusterName + "/resources/health", null,
+        Response.Status.OK.getStatusCode(), true);
 
     JsonNode node = OBJECT_MAPPER.readTree(body);
-    Map<String, String> healthStatus = OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>(){});
+    Map<String, String> healthStatus =
+        OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>() {
+        });
 
     Assert.assertEquals(healthStatus.get(resourceNameHealthy), "HEALTHY");
     Assert.assertEquals(healthStatus.get(resourceNamePartiallyHealthy), "PARTIAL_HEALTHY");
@@ -348,7 +376,6 @@ public class TestResourceAccessor extends AbstractTestClass {
     }
   }
 
-
   /**
    * Test "update" command of updateResourceIdealState.
    * @throws Exception
@@ -357,7 +384,7 @@ public class TestResourceAccessor extends AbstractTestClass {
   public void updateResourceIdealState() throws Exception {
     // Get IdealState ZNode
     String zkPath = PropertyPathBuilder.idealState(CLUSTER_NAME, RESOURCE_NAME);
-    ZNRecord record = _baseAccessor.get(zkPath,  null, AccessOption.PERSISTENT);
+    ZNRecord record = _baseAccessor.get(zkPath, null, AccessOption.PERSISTENT);
 
     // 1. Add these fields by way of "update"
     Entity entity =
@@ -442,12 +469,13 @@ public class TestResourceAccessor extends AbstractTestClass {
    * @throws Exception
    */
   private void createDummyMapping(String clusterName, String resourceName,
-      Map<String, String> idealStateParams,
-      Map<String, List<String>> partitionReplicaStates) throws Exception {
+      Map<String, String> idealStateParams, Map<String, List<String>> partitionReplicaStates)
+      throws Exception {
     IdealState idealState = new IdealState(resourceName);
     idealState.setMinActiveReplicas(Integer.parseInt(idealStateParams.get("MinActiveReplicas"))); // 2
     idealState.setStateModelDefRef(idealStateParams.get("StateModelDefRef")); // MasterSlave
-    idealState.setMaxPartitionsPerInstance(Integer.parseInt(idealStateParams.get("MaxPartitionsPerInstance"))); // 3
+    idealState.setMaxPartitionsPerInstance(
+        Integer.parseInt(idealStateParams.get("MaxPartitionsPerInstance"))); // 3
     idealState.setReplicas(idealStateParams.get("Replicas")); // 3
     idealState.setNumPartitions(Integer.parseInt(idealStateParams.get("NumPartitions"))); // 3
     idealState.enable(false);
@@ -464,7 +492,8 @@ public class TestResourceAccessor extends AbstractTestClass {
     if (!_gSetupTool.getClusterManagementTool().getClusters().contains(clusterName)) {
       _gSetupTool.getClusterManagementTool().addCluster(clusterName);
     }
-    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, resourceName, idealState);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, resourceName,
+        idealState);
 
     // Set ExternalView's replica states for a given parameter map
     ExternalView externalView = new ExternalView(resourceName);
@@ -483,10 +512,11 @@ public class TestResourceAccessor extends AbstractTestClass {
 
     externalView.getRecord().getMapFields().putAll(mappingCurrent);
 
-    HelixManager helixManager = HelixManagerFactory
-        .getZKHelixManager(clusterName, "p1", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "p1",
+        InstanceType.ADMINISTRATOR, ZK_ADDR);
     helixManager.connect();
     HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
-    helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().externalView(resourceName), externalView);
+    helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().externalView(resourceName),
+        externalView);
   }
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
index 82c4f34..ca87aae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
@@ -34,11 +35,12 @@ under the License.
   <name>Apache Helix</name>
 
   <description>
-    Helix is a generic cluster management framework used for the automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
+    Helix is a generic cluster management framework used for the automatic management of partitioned, replicated and
+    distributed resources hosted on a cluster of nodes.
   </description>
 
   <url>http://helix.apache.org</url>
-<developers>
+  <developers>
     <developer>
       <id>kishoreg</id>
       <name>Kishore Gopalakrishna</name>
@@ -237,15 +239,6 @@ under the License.
       </roles>
       <timezone>-8</timezone>
     </developer>
-    <developer>
-      <id>hulee</id>
-      <name>Hunter Lee</name>
-      <email>hulee@apache.org</email>
-      <roles>
-        <role>Committer</role>
-      </roles>
-      <timezone>-8</timezone>
-    </developer>
   </developers>
   <modules>
     <module>helix-core</module>
@@ -357,7 +350,7 @@ under the License.
     <helix.scmPubCheckoutDirectory>${user.home}/helix-site/helix-site-content</helix.scmPubCheckoutDirectory>
     <scmSkipDeletedFiles>false</scmSkipDeletedFiles>
     <!-- fix issue with current apache parent pom -->
-    <arguments />
+    <arguments/>
 
     <!-- for svnpubsub site deployment -->
     <helix.release.preparationGoals>clean install</helix.release.preparationGoals>
@@ -368,14 +361,14 @@ under the License.
     <currentRelease>0.8.0</currentRelease>
 
     <!-- OSGi Properties -->
-    <osgi.import />
-    <osgi.dynamic.import />
-    <osgi.require.bundles />
-    <osgi.export />
-    <osgi.private />
-    <osgi.ignore />
-    <osgi.activator />
-    <osgi.export.service />
+    <osgi.import/>
+    <osgi.dynamic.import/>
+    <osgi.require.bundles/>
+    <osgi.export/>
+    <osgi.private/>
+    <osgi.ignore/>
+    <osgi.activator/>
+    <osgi.export.service/>
 
     <!--Skips test in default 'mvn clean install' build command-->
     <!-- <maven.test.skip.exec>true</maven.test.skip.exec> -->
diff --git a/recipes/distributed-lock-manager/pom.xml b/recipes/distributed-lock-manager/pom.xml
index 49f56ac..6e8b607 100644
--- a/recipes/distributed-lock-manager/pom.xml
+++ b/recipes/distributed-lock-manager/pom.xml
@@ -17,7 +17,8 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
 
   <parent>
@@ -115,10 +116,10 @@ under the License.
         <plugin>
           <groupId>org.apache.rat</groupId>
           <artifactId>apache-rat-plugin</artifactId>
-            <configuration>
-              <excludes combine.children="append">
-              </excludes>
-            </configuration>
+          <configuration>
+            <excludes combine.children="append">
+            </excludes>
+          </configuration>
         </plugin>
       </plugins>
     </pluginManagement>