You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/26 22:41:56 UTC

helix git commit: Fail rebalance pipeline and retry if the data load from zookeeper fails in any read/batch-read calls.

Repository: helix
Updated Branches:
  refs/heads/master c78f84284 -> 149831c8f


Fail rebalance pipeline and retry if the data load from zookeeper fails in any read/batch-read calls.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/149831c8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/149831c8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/149831c8

Branch: refs/heads/master
Commit: 149831c8f49ee893f4966c2d01c21c005cca9d0a
Parents: c78f842
Author: Lei Xia <lx...@linkedin.com>
Authored: Sat Mar 24 20:29:36 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Mar 26 14:50:31 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixProperty.java    |   2 +
 .../HelixMetaDataAccessException.java           |  41 +++++
 .../common/caches/BasicClusterDataCache.java    |  10 +-
 .../helix/common/caches/CurrentStateCache.java  |   4 +-
 .../common/caches/InstanceMessagesCache.java    |   4 +-
 .../controller/GenericHelixController.java      | 154 ++++++++++++++-----
 .../controller/rebalancer/CustomRebalancer.java |   2 +-
 .../controller/stages/ClusterDataCache.java     |  14 +-
 .../controller/stages/ClusterEventType.java     |   1 +
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   6 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    |  11 +-
 .../manager/zk/ZkCacheBaseDataAccessor.java     |   3 -
 .../monitoring/mbeans/ClusterStatusMonitor.java |  20 ++-
 .../mbeans/ClusterStatusMonitorMBean.java       |   6 +
 .../java/org/apache/helix/MockAccessor.java     |  11 +-
 .../TestClusterDataCacheSelectiveUpdate.java    |  16 ++
 .../manager/TestHelixDataAccessor.java          |  52 +++++--
 .../org/apache/helix/mock/MockZkClient.java     |  92 ++---------
 18 files changed, 284 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index f3805fe..2f3e68d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -31,6 +31,8 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
+
 /**
  * A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
  */

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java b/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java
new file mode 100644
index 0000000..e8acf0c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java
@@ -0,0 +1,41 @@
+package org.apache.helix.api.exceptions;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+
+/**
+ * Class for an exception thrown by Helix due to Helix's failure to read or write some metadata from zookeeper.
+ */
+public class HelixMetaDataAccessException extends HelixException {
+  private static final long serialVersionUID = 6558251214364526258L;
+
+  public HelixMetaDataAccessException(String message) {
+    super(message);
+  }
+
+  public HelixMetaDataAccessException(Throwable cause) {
+    super(cause);
+  }
+
+  public HelixMetaDataAccessException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index e3acf14..117d18a 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -62,7 +62,7 @@ public class BasicClusterDataCache {
    *
    * @return
    */
-  public synchronized void refresh(HelixDataAccessor accessor) {
+  public void refresh(HelixDataAccessor accessor) {
     LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
     long startTime = System.currentTimeMillis();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -70,7 +70,7 @@ public class BasicClusterDataCache {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
-      _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+      _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews(), true);
       LOG.info("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
             System.currentTimeMillis() - start) + " ms");
     }
@@ -78,7 +78,7 @@ public class BasicClusterDataCache {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
-      _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+      _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
       LOG.info("Reload LiveInstances: " + _liveInstanceMap.keySet() + ". Takes " + (
           System.currentTimeMillis() - start) + " ms");
     }
@@ -87,7 +87,7 @@ public class BasicClusterDataCache {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap
           .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
-      _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+      _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
       LOG.info("Reload InstanceConfig: " + _instanceConfigMap.keySet() + ". Takes " + (
           System.currentTimeMillis() - start) + " ms");
     }
@@ -173,7 +173,7 @@ public class BasicClusterDataCache {
   /**
    * Indicate that a full read should be done on the next refresh
    */
-  public synchronized void requireFullRefresh() {
+  public void requireFullRefresh() {
     for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       _propertyDataChangedMap.put(type, Boolean.valueOf(true));
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 5e69c1d..711a6d6 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -59,7 +59,7 @@ public class CurrentStateCache {
    *
    * @return
    */
-  public synchronized boolean refresh(HelixDataAccessor accessor,
+  public boolean refresh(HelixDataAccessor accessor,
       Map<String, LiveInstance> liveInstanceMap) {
     LOG.info("START: CurrentStateCache.refresh()");
     long startTime = System.currentTimeMillis();
@@ -143,7 +143,7 @@ public class CurrentStateCache {
       }
     }
 
-    List<CurrentState> currentStates = accessor.getProperty(reloadKeys);
+    List<CurrentState> currentStates = accessor.getProperty(reloadKeys, true);
     Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
     for (CurrentState currentState : currentStates) {
       PropertyKey key = csKeyIter.next();

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 438e3a6..69094d3 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -64,7 +64,7 @@ public class InstanceMessagesCache {
    *
    * @return
    */
-  public synchronized boolean refresh(HelixDataAccessor accessor,
+  public boolean refresh(HelixDataAccessor accessor,
       Map<String, LiveInstance> liveInstanceMap) {
     LOG.info("START: InstanceMessagesCache.refresh()");
     long startTime = System.currentTimeMillis();
@@ -108,7 +108,7 @@ public class InstanceMessagesCache {
 
     // get the new messages
     if (newMessageKeys.size() > 0) {
-      List<Message> newMessages = accessor.getProperty(newMessageKeys);
+      List<Message> newMessages = accessor.getProperty(newMessageKeys, true);
       for (Message message : newMessages) {
         if (message != null) {
           Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index df5c86b..fd14e65 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,13 +19,15 @@ package org.apache.helix.controller;
  * under the License.
  */
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.api.listeners.*;
 import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -39,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +90,8 @@ public class GenericHelixController implements IdealStateChangeListener,
   private final ClusterEventBlockingQueue _taskEventQueue;
   private final ClusterEventProcessor _taskEventThread;
 
+  private long _continousRebalanceFailureCount = 0;
+
   /**
    * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
    * will be no-op. Other event handling logic keeps the same when the flag is set.
@@ -100,7 +103,13 @@ public class GenericHelixController implements IdealStateChangeListener,
    * The timer that can periodically run the rebalancing pipeline. The timer will start if there is
    * one resource group has the config to use the timer.
    */
-  Timer _rebalanceTimer = null;
+  Timer _periodicalRebalanceTimer = null;
+
+  /**
+   * The timer to schedule ad hoc forced rebalance or retry rebalance event.
+   */
+  Timer _forceRebalanceTimer = null;
+
   long _timerPeriod = Long.MAX_VALUE;
 
   /**
@@ -108,7 +117,7 @@ public class GenericHelixController implements IdealStateChangeListener,
    */
   private ClusterDataCache _cache;
   private ClusterDataCache _taskCache;
-  private ExecutorService _asyncTasksThreadPool;
+  private ScheduledExecutorService _asyncTasksThreadPool;
 
   private String _clusterName;
 
@@ -134,41 +143,53 @@ public class GenericHelixController implements IdealStateChangeListener,
 
   class RebalanceTask extends TimerTask {
     HelixManager _manager;
+    ClusterEventType _clusterEventType;
 
-    public RebalanceTask(HelixManager manager) {
+    public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
       _manager = manager;
+      _clusterEventType = clusterEventType;
     }
 
     @Override
     public void run() {
-      //TODO: this is the temporary workaround
-      _cache.requireFullRefresh();
-      _taskCache.requireFullRefresh();
-      _cache.refresh(_manager.getHelixDataAccessor());
-      _taskCache.refresh(_manager.getHelixDataAccessor());
-      if (_cache.getLiveInstances() != null) {
-        NotificationContext changeContext = new NotificationContext(_manager);
-        changeContext.setType(NotificationContext.Type.CALLBACK);
-        synchronized (_manager) {
-          checkLiveInstancesObservation(new ArrayList<>(_cache.getLiveInstances().values()),
-              changeContext);
+      try {
+        if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance)) {
+          _cache.requireFullRefresh();
+          _taskCache.requireFullRefresh();
+          _cache.refresh(_manager.getHelixDataAccessor());
+          if (_cache.getLiveInstances() != null) {
+            NotificationContext changeContext = new NotificationContext(_manager);
+            changeContext.setType(NotificationContext.Type.CALLBACK);
+            synchronized (_manager) {
+              checkLiveInstancesObservation(new ArrayList<>(_cache.getLiveInstances().values()),
+                  changeContext);
+            }
+          }
         }
-      }
 
-      NotificationContext changeContext = new NotificationContext(_manager);
-      changeContext.setType(NotificationContext.Type.CALLBACK);
-      ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.PeriodicalRebalance);
-      event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
-      event.addAttribute(AttributeName.changeContext.name(), changeContext);
-      List<ZNRecord> dummy = new ArrayList<>();
-      event.addAttribute(AttributeName.eventData.name(), dummy);
-      // Should be able to process
-      _eventQueue.put(event);
-      _taskEventQueue.put(event.clone());
-      logger.info("Controller periodicalRebalance event triggered!");
+        forceRebalance(_manager, _clusterEventType);
+      } catch (Throwable ex) {
+        logger.error("Time task failed. Rebalance task type: " + _clusterEventType + ", cluster: "
+            + _clusterName);
+      }
     }
   }
 
+  /* Trigger a rebalance pipeline */
+  private void forceRebalance(HelixManager manager, ClusterEventType eventType) {
+    NotificationContext changeContext = new NotificationContext(manager);
+    changeContext.setType(NotificationContext.Type.CALLBACK);
+    ClusterEvent event = new ClusterEvent(_clusterName, eventType);
+    event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
+    event.addAttribute(AttributeName.changeContext.name(), changeContext);
+    event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
+
+    _taskEventQueue.put(event);
+    _eventQueue.put(event);
+
+    logger.info("Controller rebalance event triggered with event type: " + eventType);
+  }
+
   // TODO who should stop this timer
   /**
    * Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
@@ -177,13 +198,14 @@ public class GenericHelixController implements IdealStateChangeListener,
   void startRebalancingTimer(long period, HelixManager manager) {
     if (period != _timerPeriod) {
       logger.info("Controller starting timer at period " + period);
-      if (_rebalanceTimer != null) {
-        _rebalanceTimer.cancel();
+      if (_periodicalRebalanceTimer != null) {
+        _periodicalRebalanceTimer.cancel();
       }
-      _rebalanceTimer = new Timer(true);
+      _periodicalRebalanceTimer = new Timer(true);
       _timerPeriod = period;
-      _rebalanceTimer
-          .scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
+      _periodicalRebalanceTimer
+          .scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
+              _timerPeriod, _timerPeriod);
     } else {
       logger.info("Controller already has timer at period " + _timerPeriod);
     }
@@ -192,10 +214,10 @@ public class GenericHelixController implements IdealStateChangeListener,
   /**
    * Stops the rebalancing timer
    */
-  void stopRebalancingTimer() {
-    if (_rebalanceTimer != null) {
-      _rebalanceTimer.cancel();
-      _rebalanceTimer = null;
+  void stopRebalancingTimers() {
+    if (_periodicalRebalanceTimer != null) {
+      _periodicalRebalanceTimer.cancel();
+      _periodicalRebalanceTimer = null;
     }
     _timerPeriod = Integer.MAX_VALUE;
   }
@@ -260,7 +282,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     _lastSeenSessions = new AtomicReference<>();
     _clusterName = clusterName;
     _asyncTasksThreadPool =
-        Executors.newFixedThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
+        Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
           @Override public Thread newThread(Runnable r) {
             return new Thread(r, "GerenricHelixController-async_task_thread");
           }
@@ -274,12 +296,22 @@ public class GenericHelixController implements IdealStateChangeListener,
     _eventThread = new ClusterEventProcessor(_cache, _eventQueue);
     _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
 
+    _forceRebalanceTimer = new Timer();
+
     initPipelines(_eventThread, _cache, false);
     initPipelines(_taskEventThread, _taskCache, true);
 
     _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
   }
 
+  private boolean isEventQueueEmpty(boolean taskQueue) {
+    if (taskQueue) {
+      return _taskEventQueue.isEmpty();
+    } else {
+      return _eventQueue.isEmpty();
+    }
+  }
+
   /**
    * lock-always: caller always needs to obtain an external lock before call, calls to handleEvent()
    * should be serialized
@@ -314,7 +346,7 @@ public class GenericHelixController implements IdealStateChangeListener,
 
     if (context != null) {
       if (context.getType() == Type.FINALIZE) {
-        stopRebalancingTimer();
+        stopRebalancingTimers();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
         return;
       } else {
@@ -345,6 +377,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s",
         manager.getClusterName(), getPipelineType(cache.isTaskCache()), event.getEventType()));
     long startTime = System.currentTimeMillis();
+    boolean rebalanceFail = false;
     for (Pipeline pipeline : pipelines) {
       try {
         pipeline.handle(event);
@@ -352,10 +385,36 @@ public class GenericHelixController implements IdealStateChangeListener,
       } catch (Exception e) {
         logger.error(
             "Exception while executing " + getPipelineType(cache.isTaskCache()) + "pipeline: "
-                + pipeline + ". Will not continue to next pipeline", e);
+                + pipeline + "for cluster ." + _clusterName
+                + ". Will not continue to next pipeline", e);
+
+        if (e instanceof HelixMetaDataAccessException) {
+          rebalanceFail = true;
+          // If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
+          cache.requireFullRefresh();
+          logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);
+
+          // only push a retry event when there is no pending event in the corresponding event queue.
+          if (isEventQueueEmpty(cache.isTaskCache())) {
+            _continousRebalanceFailureCount ++;
+            long delay = getRetryDelay(_continousRebalanceFailureCount);
+            if (delay == 0) {
+              forceRebalance(manager, ClusterEventType.RetryRebalance);
+            } else {
+              _asyncTasksThreadPool
+                  .schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
+                      TimeUnit.MILLISECONDS);
+            }
+            logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
+          }
+        }
+        _clusterStatusMonitor.reportRebalanceFailure();
         break;
       }
     }
+    if (!rebalanceFail) {
+      _continousRebalanceFailureCount = 0;
+    }
     long endTime = System.currentTimeMillis();
     logger.info(
         "END: Invoking " + getPipelineType(cache.isTaskCache()) + " controller pipeline for event: "
@@ -409,6 +468,19 @@ public class GenericHelixController implements IdealStateChangeListener,
     resetClusterStatusMonitor();
   }
 
+  /**
+   * get the delay on next retry rebalance due to zk read failure, We use a simple exponential
+   * backoff to make the delay between [10ms, 1000ms]
+   */
+  private long getRetryDelay(long failCount) {
+    int lowLimit = 5;
+    if (failCount <= lowLimit) {
+      return 0;
+    }
+    long backoff = (long) (Math.pow(2, failCount - lowLimit) * 10);
+    return Math.min(backoff, 1000);
+  }
+
   @Override
   @PreFetch(enabled = false)
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -692,7 +764,7 @@ public class GenericHelixController implements IdealStateChangeListener,
   }
 
   public void shutdown() throws InterruptedException {
-    stopRebalancingTimer();
+    stopRebalancingTimers();
 
     terminateEventThread(_eventThread);
     terminateEventThread(_taskEventThread);

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index dae9f57..1f4adca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -65,7 +65,7 @@ public class CustomRebalancer extends AbstractRebalancer {
       return partitionMapping;
     }
 
-    LOG.info("Computing BestPossibleMapping for " + resource);
+    LOG.info("Computing BestPossibleMapping for " + resource.getResourceName());
 
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2317197..3056531 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -141,7 +141,7 @@ public class ClusterDataCache {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, Boolean.valueOf(false));
       clearCachedResourceAssignments();
-      _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+      _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates(), true);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". Takes " + (
             System.currentTimeMillis() - start) + " ms");
@@ -151,7 +151,7 @@ public class ClusterDataCache {
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
       _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
       clearCachedResourceAssignments();
-      _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+      _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
       _updateInstanceOfflineTime = true;
       LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet());
     }
@@ -159,14 +159,14 @@ public class ClusterDataCache {
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
       clearCachedResourceAssignments();
-      _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+      _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
       LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet());
     }
 
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, Boolean.valueOf(false));
       clearCachedResourceAssignments();
-      _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs());
+      _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true);
       LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
     }
 
@@ -184,9 +184,9 @@ public class ClusterDataCache {
     }
 
     Map<String, StateModelDefinition> stateDefMap =
-        accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+        accessor.getChildValuesMap(keyBuilder.stateModelDefs(), true);
     _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap);
-    _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
+    _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints(), true);
     _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
 
 
@@ -793,7 +793,7 @@ public class ClusterDataCache {
   }
 
 
-  public void clearCachedResourceAssignments() {
+  protected void clearCachedResourceAssignments() {
     _resourceAssignmentCache.clear();
     _idealMappingCache.clear();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index d19c7e8..5312c35 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -32,6 +32,7 @@ public enum ClusterEventType {
   TargetExternalViewChange,
   Resume,
   PeriodicalRebalance,
+  RetryRebalance,
   StateVerifier,
   Unknown
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index a0c06f7..4c25741 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -31,7 +31,6 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.GroupCommit;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
@@ -42,6 +41,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordAssembler;
 import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
@@ -124,7 +124,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
   public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value) {
     PropertyType type = key.getType();
     if (!value.isValid()) {
-      throw new HelixException("The ZNRecord for " + type + " is not valid.");
+      throw new HelixMetaDataAccessException("The ZNRecord for " + type + " is not valid.");
     }
 
     String path = key.getPath();
@@ -201,7 +201,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
 
   @Override
   public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys,
-      boolean throwException) throws HelixException {
+      boolean throwException) throws HelixMetaDataAccessException {
     if (keys == null || keys.size() == 0) {
       return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index c83d9d5..6ee64a4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -37,6 +37,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
@@ -400,7 +401,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             stats.set(i, cb._stat);
           }
         } else if (Code.get(cb.getRc()) != Code.NONODE && throwException) {
-          throw new HelixException(String.format("Failed to read node %s", paths.get(i)));
+          throw new HelixMetaDataAccessException(String.format("Failed to read node %s", paths.get(i)));
         } else {
           pathFailToRead.put(paths.get(i), cb.getRc());
         }
@@ -410,7 +411,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       }
       return records;
     } catch (Exception e) {
-      throw new HelixException(String.format("Fail to read nodes for %s", paths));
+      throw new HelixMetaDataAccessException(String.format("Fail to read nodes for %s", paths));
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
@@ -440,14 +441,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         readCount--;
         List<T> records = getChildren(parentPath, stats, options, true);
         return records;
-      } catch (HelixException e) {
+      } catch (HelixMetaDataAccessException e) {
         if (readCount == 0) {
-          throw new HelixException(String.format("Failed to get full list of %s", parentPath), e);
+          throw new HelixMetaDataAccessException(String.format("Failed to get full list of %s", parentPath), e);
         }
         try {
           Thread.sleep(retryInterval);
         } catch (InterruptedException interruptedException) {
-          throw new HelixException("Fail to interrupt the sleep", interruptedException);
+          throw new HelixMetaDataAccessException("Fail to interrupt the sleep", interruptedException);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 4c0fcf2..73cd2ae 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -32,13 +32,10 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
 import org.apache.helix.store.HelixPropertyListener;

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 40801b1..c7e0fdb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.*;
 import org.apache.helix.task.*;
@@ -66,6 +67,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private Map<String, List<String>> _oldDisabledPartitions = Collections.emptyMap();
   private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
   private boolean _rebalanceFailure = false;
+  private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
+
 
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>();
 
@@ -441,8 +444,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions, numPendingLoadRebalancePartitions,
-          numRecoveryRebalanceThrottledPartitions, numLoadRebalanceThrottledPartitions);
+      resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions,
+          numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions,
+          numLoadRebalanceThrottledPartitions);
     }
   }
 
@@ -655,7 +659,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       String resourceName = monitor.getResourceName();
       String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
       register(monitor, getObjectName(beanName));
-      _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+      _perInstanceResourceMap.put(
+          new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
     }
   }
 
@@ -785,4 +790,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public void setEnabled(boolean enabled) {
     this._enabled = enabled;
   }
+
+  public void reportRebalanceFailure() {
+    _rebalanceFailureCount.incrementAndGet();
+  }
+
+  @Override
+  public long getRebalanceFailureCounter() {
+    return _rebalanceFailureCount.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 4cdd357..49d316e 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -61,4 +61,10 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
    * @return 1 if cluster is paused, otherwise 0
    */
   public long getPaused();
+
+  /**
+   * The number of failures during rebalance pipeline.
+   * @return
+   */
+  long getRebalanceFailureCounter();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/MockAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
index e3189f8..8bcfe19 100644
--- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
@@ -192,7 +192,7 @@ public class MockAccessor implements HelixDataAccessor {
 
   @Override public <T extends HelixProperty> List<T> getChildValues(PropertyKey key,
       boolean throwException) {
-    return null;
+    return getChildValues(key);
   }
 
   @Override
@@ -203,14 +203,13 @@ public class MockAccessor implements HelixDataAccessor {
 
   @Override public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key,
       boolean throwException) {
-    return null;
+    return getChildValuesMap(key);
   }
 
   @Override
   public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
       List<T> children) {
-    // TODO Auto-generated method stub
-    return null;
+    throw new HelixException("Method not implemented!");
   }
 
   @Override
@@ -236,7 +235,7 @@ public class MockAccessor implements HelixDataAccessor {
   public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
       List<DataUpdater<ZNRecord>> updaters, int options) {
     // TODO Auto-generated method stub
-    return null;
+    throw new HelixException("Method not implemented!");
   }
 
   @Override
@@ -252,6 +251,6 @@ public class MockAccessor implements HelixDataAccessor {
 
   @Override public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys,
       boolean throwException) {
-    return null;
+    return getProperty(keys);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
index fee1ee3..de4e9d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
@@ -139,17 +139,33 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase
     }
 
     @Override
+    public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys, boolean throwException) {
+      for (PropertyKey key : keys) {
+        addCount(key);
+      }
+      return super.getProperty(keys, throwException);
+    }
+
+    @Override
     public <T extends HelixProperty> T getProperty(PropertyKey key) {
       addCount(key);
       return super.getProperty(key);
     }
 
+    @Override
     public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) {
       Map<String, T> map = super.getChildValuesMap(key);
       addCount(key, map.keySet().size());
       return map;
     }
 
+    @Override
+    public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key, boolean throwException) {
+      Map<String, T> map = super.getChildValuesMap(key, throwException);
+      addCount(key, map.keySet().size());
+      return map;
+    }
+
     private void addCount(PropertyKey key) {
       addCount(key, 1);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
index ec0275a..39fdb2a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
@@ -10,6 +10,10 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -18,21 +22,21 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestHelixDataAccessor extends ZkIntegrationTestBase{
+public class TestHelixDataAccessor extends ZkIntegrationTestBase {
   private MockZkClient _zkClient;
+  BaseDataAccessor<ZNRecord> baseDataAccessor;
+  HelixDataAccessor accessor;
+  List<PropertyKey> propertyKeys;
 
   @BeforeClass
   public void beforeClass() {
     _zkClient = new MockZkClient(ZK_ADDR);
-  }
 
-  @Test(expectedExceptions = HelixException.class)
-  public void testHelixDataAccessorReadData() {
-    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
-    HelixDataAccessor accessor = new ZKHelixDataAccessor("HELIX", baseDataAccessor);
+    baseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
+    accessor = new ZKHelixDataAccessor("HELIX", baseDataAccessor);
 
     Map<String, HelixProperty> paths = new TreeMap<>();
-    List<PropertyKey> propertyKeys = new ArrayList<>();
+    propertyKeys = new ArrayList<>();
     for (int i = 0; i < 5; i++) {
       PropertyKey key = accessor.keyBuilder().idealStates("RESOURCE" + i);
       propertyKeys.add(key);
@@ -40,12 +44,42 @@ public class TestHelixDataAccessor extends ZkIntegrationTestBase{
       accessor.setProperty(key, paths.get(key.getPath()));
     }
 
-    List<HelixProperty> data = accessor.getProperty(new ArrayList<>(propertyKeys));
+    List<HelixProperty> data = accessor.getProperty(new ArrayList<>(propertyKeys), true);
     Assert.assertEquals(data.size(), 5);
 
     PropertyKey key = accessor.keyBuilder().idealStates("RESOURCE6");
     propertyKeys.add(key);
     _zkClient.putData(key.getPath(), null);
-    accessor.getProperty(new ArrayList<>(propertyKeys), true);
+  }
+
+  @Test
+  public void testHelixDataAccessorReadData() {
+    accessor.getProperty(new ArrayList<>(propertyKeys), false);
+    try {
+      accessor.getProperty(new ArrayList<>(propertyKeys), true);
+      Assert.fail();
+    } catch (HelixMetaDataAccessException ex) {
+    }
+
+    PropertyKey idealStates = accessor.keyBuilder().idealStates();
+    accessor.getChildValues(idealStates, false);
+    try {
+      accessor.getChildValues(idealStates, true);
+      Assert.fail();
+    } catch (HelixMetaDataAccessException ex) {
+    }
+
+    accessor.getChildValuesMap(idealStates, false);
+    try {
+      accessor.getChildValuesMap(idealStates, true);
+      Assert.fail();
+    } catch (HelixMetaDataAccessException ex) {
+    }
+  }
+
+  @Test (expectedExceptions = {HelixMetaDataAccessException.class})
+  public void testClusterDataCache() {
+    ClusterDataCache cache = new ClusterDataCache("MyCluster");
+    cache.refresh(accessor);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java b/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
index 05d7894..5026999 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
@@ -1,14 +1,11 @@
 package org.apache.helix.mock;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.zookeeper.KeeperException;
 
 public class MockZkClient extends ZkClient {
   Map<String, byte[]> _dataMap;
@@ -18,79 +15,6 @@ public class MockZkClient extends ZkClient {
     _dataMap = new HashMap<>();
     setZkSerializer(new ZNRecordSerializer());
   }
-  public MockZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
-      PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
-      String monitorInstanceName, boolean monitorRootPathOnly) {
-    super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
-        monitorKey, monitorInstanceName, monitorRootPathOnly);
-  }
-
-  public MockZkClient(IZkConnection connection, int connectionTimeout,
-      PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
-      long operationRetryTimeout) {
-    super(connection, connectionTimeout, zkSerializer, monitorType, monitorKey,
-        operationRetryTimeout);
-  }
-
-  public MockZkClient(IZkConnection connection, int connectionTimeout,
-      PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey) {
-    super(connection, connectionTimeout, zkSerializer, monitorType, monitorKey);
-  }
-
-  public MockZkClient(String zkServers, String monitorType, String monitorKey) {
-    super(zkServers, monitorType, monitorKey);
-  }
-
-  public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-      PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey) {
-    super(zkServers, sessionTimeout, connectionTimeout, zkSerializer, monitorType, monitorKey);
-  }
-
-  public MockZkClient(IZkConnection connection, int connectionTimeout,
-      PathBasedZkSerializer zkSerializer) {
-    super(connection, connectionTimeout, zkSerializer);
-  }
-
-  public MockZkClient(IZkConnection connection, int connectionTimeout, ZkSerializer zkSerializer) {
-    super(connection, connectionTimeout, zkSerializer);
-  }
-
-  public MockZkClient(IZkConnection connection, int connectionTimeout) {
-    super(connection, connectionTimeout);
-  }
-
-  public MockZkClient(IZkConnection connection) {
-    super(connection);
-  }
-
-  public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-      ZkSerializer zkSerializer) {
-    super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
-  }
-
-  public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-      PathBasedZkSerializer zkSerializer) {
-    super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
-  }
-
-  public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
-    super(zkServers, sessionTimeout, connectionTimeout);
-  }
-
-  public MockZkClient(String zkServers, int connectionTimeout) {
-    super(zkServers, connectionTimeout);
-  }
-
-  public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
-      ZkSerializer zkSerializer, long operationRetryTimeout) {
-    super(zkServers, sessionTimeout, connectionTimeout, zkSerializer, operationRetryTimeout);
-  }
-
-  public MockZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer,
-      long operationRetryTimeout) {
-    super(zkConnection, connectionTimeout, zkSerializer, operationRetryTimeout);
-  }
-
 
   @Override
   public void asyncGetData(final String path,
@@ -102,8 +26,20 @@ public class MockZkClient extends ZkClient {
         cb.processResult(0, path, null, _dataMap.get(path), null);
       }
     } else {
-      cb.processResult(KeeperException.Code.NONODE.intValue(), path, null, null, null);
+      super.asyncGetData(path, cb);
+    }
+  }
+
+  public List<String> getChildren(final String path) {
+    List<String> children = super.getChildren(path);
+    for (String p : _dataMap.keySet()) {
+      if (p.contains(path)) {
+        String[] paths = p.split("/");
+        children.add(paths[paths.length-1]);
+      }
     }
+
+    return children;
   }
 
   public void putData(String path, byte[] data) {