You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/03/23 19:29:25 UTC

[4/6] helix git commit: Change RoutingTableProvider to support direct aggregating routing information from CurrentStates in each liveinstance. When sourceDataType is set as CurrentState, RoutingTableProvider will listen on CurrentStateChanges and refresh

Change RoutingTableProvider to support direct aggregating routing information from CurrentStates in each liveinstance. When sourceDataType is set as CurrentState, RoutingTableProvider will listen on CurrentStateChanges and refresh routing table from CurrentStates upon changes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e5728469
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e5728469
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e5728469

Branch: refs/heads/master
Commit: e5728469e02f196690654b4f7f2ed8ca9130a631
Parents: 6f6ab65
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Mar 20 15:04:30 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../helix/common/ClusterEventProcessor.java     |   5 +
 .../common/caches/BasicClusterDataCache.java    |  66 +++---
 .../controller/stages/ClusterEventType.java     |   1 +
 .../helix/spectator/RoutingDataCache.java       |  83 ++++++-
 .../apache/helix/spectator/RoutingTable.java    | 106 ++++++---
 .../helix/spectator/RoutingTableProvider.java   | 221 +++++++++++++++++--
 .../java/org/apache/helix/MockAccessor.java     |   7 +-
 .../Spectator/TestRoutingTableProvider.java     | 177 ---------------
 .../TestRoutingTableProviderWithSourceType.java | 152 -------------
 .../spectator/TestRoutingTableProvider.java     | 176 +++++++++++++++
 ...stRoutingTableProviderFromCurrentStates.java | 162 ++++++++++++++
 .../TestRoutingTableProviderFromTargetEV.java   | 152 +++++++++++++
 12 files changed, 882 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
index 6001edc..e4ceb85 100644
--- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
@@ -54,4 +54,9 @@ public abstract class ClusterEventProcessor extends Thread {
   public void queueEvent(ClusterEvent event) {
     _eventQueue.put(event);
   }
+
+  public void shutdown() {
+    _eventQueue.clear();
+    this.interrupt();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index f470272..e3acf14 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -37,28 +36,23 @@ import org.slf4j.LoggerFactory;
  * Cache the basic cluster data, including LiveInstances, InstanceConfigs and ExternalViews.
  */
 public class BasicClusterDataCache {
-  protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+  private static Logger LOG = LoggerFactory.getLogger(BasicClusterDataCache.class.getName());
 
-  private Map<String, LiveInstance> _liveInstanceMap;
-  private Map<String, InstanceConfig> _instanceConfigMap;
-  private Map<String, ExternalView> _externalViewMap;
-  private final PropertyType _sourceDataType;
+  protected Map<String, LiveInstance> _liveInstanceMap;
+  protected Map<String, InstanceConfig> _instanceConfigMap;
+  protected Map<String, ExternalView> _externalViewMap;
 
   protected String _clusterName;
 
   protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
 
   public BasicClusterDataCache(String clusterName) {
-    this(clusterName, PropertyType.EXTERNALVIEW);
-  }
-
-  public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     _liveInstanceMap = new HashMap<>();
     _instanceConfigMap = new HashMap<>();
     _externalViewMap = new HashMap<>();
     _clusterName = clusterName;
-    _sourceDataType = sourceDataType;
+    requireFullRefresh();
   }
 
   /**
@@ -69,54 +63,44 @@ public class BasicClusterDataCache {
    * @return
    */
   public synchronized void refresh(HelixDataAccessor accessor) {
-    LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName);
+    LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
     long startTime = System.currentTimeMillis();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
-      switch (_sourceDataType) {
-        case EXTERNALVIEW:
-          _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
-          break;
-        case TARGETEXTERNALVIEW:
-          _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
-          break;
-        default:
-          break;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
+      _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+      LOG.info("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
             System.currentTimeMillis() - start) + " ms");
-      }
     }
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
+      long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
       _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-      LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet());
+      LOG.info("Reload LiveInstances: " + _liveInstanceMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
     }
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+      long start = System.currentTimeMillis();
       _propertyDataChangedMap
           .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
       _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
-      LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet());
+      LOG.info("Reload InstanceConfig: " + _instanceConfigMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
     }
 
     long endTime = System.currentTimeMillis();
     LOG.info(
-        "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+        "END: BasicClusterDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
             - startTime) + " ms");
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
-      for (LiveInstance instance : _liveInstanceMap.values()) {
-        LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
-      }
-      LOG.debug("ExternalViews: " + _externalViewMap.keySet());
-      LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
+      LOG.debug("LiveInstances: " + _liveInstanceMap);
+      LOG.debug("ExternalViews: " + _externalViewMap);
+      LOG.debug("InstanceConfigs: " + _instanceConfigMap);
     }
   }
 
@@ -149,8 +133,20 @@ public class BasicClusterDataCache {
 
   /**
    * Notify the cache that some part of the cluster data has been changed.
+   *
+   * @param changeType
+   * @param pathChanged
+   */
+  public void notifyDataChange(HelixConstants.ChangeType changeType, String pathChanged) {
+    notifyDataChange(changeType);
+  }
+
+  /**
+   * Notify the cache that some part of the cluster data has been changed.
+   *
+   * @param changeType
    */
-  public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) {
+  public void notifyDataChange(HelixConstants.ChangeType changeType) {
     _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 9ca697f..d19c7e8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -29,6 +29,7 @@ public enum ClusterEventType {
   LiveInstanceChange,
   MessageChange,
   ExternalViewChange,
+  TargetExternalViewChange,
   Resume,
   PeriodicalRebalance,
   StateVerifier,

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 332cd8a..ffa2fe7 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -19,24 +19,99 @@ package org.apache.helix.spectator;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.Map;
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.common.caches.BasicClusterDataCache;
+import org.apache.helix.common.caches.CurrentStateCache;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.
  */
 public class RoutingDataCache extends BasicClusterDataCache {
+  private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
+
+  private final PropertyType _sourceDataType;
+  private CurrentStateCache _currentStateCache;
+  private Map<String, ExternalView> _targetExternalViewMap;
+
   public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
-    super(clusterName, sourceDataType);
+    super(clusterName);
+    _sourceDataType = sourceDataType;
+    _currentStateCache = new CurrentStateCache(clusterName);
+    _targetExternalViewMap = Collections.emptyMap();
     requireFullRefresh();
   }
 
   /**
-   * Notify the cache that some part of the cluster data has been changed.
+   * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
+   *
+   * @param accessor
+   *
+   * @return
+   */
+  @Override
+  public synchronized void refresh(HelixDataAccessor accessor) {
+    LOG.info("START: RoutingDataCache.refresh() for cluster " + _clusterName);
+    long startTime = System.currentTimeMillis();
+
+    super.refresh(accessor);
+
+    if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW) && _propertyDataChangedMap
+        .get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+      long start = System.currentTimeMillis();
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      _propertyDataChangedMap
+          .put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, Boolean.valueOf(false));
+      _targetExternalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
+      LOG.info("Reload TargetExternalViews: " + _targetExternalViewMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
+    }
+
+    if (_sourceDataType.equals(PropertyType.CURRENTSTATES) && _propertyDataChangedMap
+        .get(HelixConstants.ChangeType.CURRENT_STATE)) {
+      long start = System.currentTimeMillis();
+      Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
+      _currentStateCache.refresh(accessor, liveInstanceMap);
+      LOG.info("Reload CurrentStates: " + _targetExternalViewMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
+    }
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+        - startTime) + " ms");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("CurrentStates: " + _currentStateCache);
+      LOG.debug("TargetExternalViews: " + _targetExternalViewMap);
+    }
+  }
+
+  /**
+   * Retrieves the TargetExternalView for all resources
+   *
+   * @return
+   */
+  public Map<String, ExternalView> getTargetExternalViews() {
+    return Collections.unmodifiableMap(_targetExternalViewMap);
+  }
+
+  /**
+   * Get map of current states in cluster. {InstanceName -> {SessionId -> {ResourceName ->
+   * CurrentState}}}
+   *
+   * @return
    */
-  public void notifyDataChange(HelixConstants.ChangeType changeType, String pathChanged) {
-    _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
+  public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() {
+    return _currentStateCache.getCurrentStatesMap();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index f704ee4..b705dff 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -58,9 +59,81 @@ class RoutingTable {
       Collection<LiveInstance> liveInstances) {
     _resourceInfoMap = new HashMap<>();
     _resourceGroupInfoMap = new HashMap<>();
+    _liveInstances = new HashSet<>(liveInstances);
+    _instanceConfigs = new HashSet<>(instanceConfigs);
+    refresh(externalViews);
+  }
+
+  public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    _resourceInfoMap = new HashMap<>();
+    _resourceGroupInfoMap = new HashMap<>();
     _liveInstances = liveInstances;
     _instanceConfigs = instanceConfigs;
-    refresh(externalViews, instanceConfigs);
+    refresh(currentStateMap);
+  }
+
+  private void refresh(Collection<ExternalView> externalViewList) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    for (InstanceConfig config : _instanceConfigs) {
+      instanceConfigMap.put(config.getId(), config);
+    }
+    if (externalViewList != null) {
+      for (ExternalView extView : externalViewList) {
+        String resourceName = extView.getId();
+        for (String partitionName : extView.getPartitionSet()) {
+          Map<String, String> stateMap = extView.getStateMap(partitionName);
+          for (String instanceName : stateMap.keySet()) {
+            String currentState = stateMap.get(instanceName);
+            if (instanceConfigMap.containsKey(instanceName)) {
+              InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+              if (extView.isGroupRoutingEnabled()) {
+                addEntry(resourceName, extView.getResourceGroupName(),
+                    extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig);
+              } else {
+                addEntry(resourceName, partitionName, currentState, instanceConfig);
+              }
+            } else {
+              logger.error("Invalid instance name. " + instanceName
+                  + " .Not found in /cluster/configs/. instanceName: ");
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    for (InstanceConfig config : _instanceConfigs) {
+      instanceConfigMap.put(config.getId(), config);
+    }
+
+    for (LiveInstance liveInstance : _liveInstances) {
+      String instanceName = liveInstance.getInstanceName();
+      String sessionId = liveInstance.getSessionId();
+      InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+      if (instanceConfig == null) {
+        logger.error("Invalid instance name. " + instanceName
+            + " .Not found in /cluster/configs/. instanceName: ");
+      }
+
+      Map<String, CurrentState> currentStates = Collections.emptyMap();
+      if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName)
+          .containsKey(sessionId)) {
+        currentStates = currentStateMap.get(instanceName).get(sessionId);
+      }
+
+      for (CurrentState currentState : currentStates.values()) {
+        String resourceName = currentState.getResourceName();
+        Map<String, String> stateMap = currentState.getPartitionStateMap();
+
+        for (String partitionName : stateMap.keySet()) {
+          String state = stateMap.get(partitionName);
+          addEntry(resourceName, partitionName, state, instanceConfig);
+        }
+      }
+    }
   }
 
   private void addEntry(String resourceName, String partitionName, String state,
@@ -271,37 +344,6 @@ class RoutingTable {
     return Collections.unmodifiableList(instanceList);
   }
 
-  private void refresh(Collection<ExternalView> externalViewList,
-      Collection<InstanceConfig> instanceConfigList) {
-    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig config : instanceConfigList) {
-      instanceConfigMap.put(config.getId(), config);
-    }
-    if (externalViewList != null) {
-      for (ExternalView extView : externalViewList) {
-        String resourceName = extView.getId();
-        for (String partitionName : extView.getPartitionSet()) {
-          Map<String, String> stateMap = extView.getStateMap(partitionName);
-          for (String instanceName : stateMap.keySet()) {
-            String currentState = stateMap.get(instanceName);
-            if (instanceConfigMap.containsKey(instanceName)) {
-              InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
-              if (extView.isGroupRoutingEnabled()) {
-                addEntry(resourceName, extView.getResourceGroupName(),
-                    extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig);
-              } else {
-                addEntry(resourceName, partitionName, currentState, instanceConfig);
-              }
-            } else {
-              logger.error("Invalid instance name. " + instanceName
-                  + " .Not found in /cluster/configs/. instanceName: ");
-            }
-          }
-        }
-      }
-    }
-  }
-
   /**
    * Class to store instances, partitions and their states for each resource.
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index be85b65..f539ac9 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -21,7 +21,9 @@ package org.apache.helix.spectator;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -30,6 +32,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.ConfigChangeListener;
+import org.apache.helix.api.listeners.CurrentStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
@@ -41,6 +44,7 @@ import org.apache.helix.common.ClusterEventProcessor;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -48,7 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener,
-    ConfigChangeListener, LiveInstanceChangeListener {
+    ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener {
   private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
   private final AtomicReference<RoutingTable> _routingTableRef;
   private final HelixManager _helixManager;
@@ -63,7 +67,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     this(helixManager, PropertyType.EXTERNALVIEW);
   }
 
-  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType) throws HelixException {
+  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
+      throws HelixException {
     _routingTableRef = new AtomicReference<>(new RoutingTable());
     _helixManager = helixManager;
     _sourceDataType = sourceDataType;
@@ -71,28 +76,77 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
     if (_helixManager != null) {
-      try {
-        switch (_sourceDataType) {
-        case EXTERNALVIEW:
+      switch (_sourceDataType) {
+      case EXTERNALVIEW:
+        try {
           _helixManager.addExternalViewChangeListener(this);
-          break;
-        case TARGETEXTERNALVIEW:
-          // Check whether target external has been enabled or not
-          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
-              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
-              0)) {
-            throw new HelixException("Target External View is not enabled!");
-          }
+        } catch (Exception e) {
+          shutdown();
+          logger.error("Failed to attach ExternalView Listener to HelixManager!");
+          throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+        }
+        break;
+
+      case TARGETEXTERNALVIEW:
+        // Check whether target external has been enabled or not
+        if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+            _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
+          shutdown();
+          throw new HelixException("Target External View is not enabled!");
+        }
+
+        try {
           _helixManager.addTargetExternalViewChangeListener(this);
-          break;
-        default:
-          throw new HelixException("Unsupported source data type: " + sourceDataType);
+        } catch (Exception e) {
+          shutdown();
+          logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
+          throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!", e);
         }
+        break;
+
+      case CURRENTSTATES:
+        // CurrentState change listeners will be added later in LiveInstanceChange call.
+        break;
+
+      default:
+        throw new HelixException("Unsupported source data type: " + sourceDataType);
+      }
+
+      try {
         _helixManager.addInstanceConfigChangeListener(this);
         _helixManager.addLiveInstanceChangeListener(this);
       } catch (Exception e) {
-        logger.error("Failed to attach listeners to HelixManager!");
-        throw new HelixException("Failed to attach listeners to HelixManager!", e);
+        shutdown();
+        logger.error(
+            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!");
+        throw new HelixException(
+            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
+            e);
+      }
+    }
+  }
+
+  /**
+   * Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
+   */
+  public void shutdown() {
+    _routerUpdater.shutdown();
+    if (_helixManager != null) {
+      PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
+      switch (_sourceDataType) {
+        case EXTERNALVIEW:
+          _helixManager.removeListener(keyBuilder.externalViews(), this);
+          break;
+        case TARGETEXTERNALVIEW:
+          _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
+          break;
+        case CURRENTSTATES:
+          NotificationContext context = new NotificationContext(_helixManager);
+          context.setType(NotificationContext.Type.FINALIZE);
+          updateCurrentStatesListeners(Collections.<LiveInstance>emptyList(), context);
+          break;
+        default:
+          break;
       }
     }
   }
@@ -249,12 +303,21 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
       return;
     }
     // Refresh with full list of external view.
-    // keep this here for back-compatibility
     if (externalViewList != null && externalViewList.size() > 0) {
+      // keep this here for back-compatibility, application can call onExternalViewChange directly with externalview list supplied.
       refresh(externalViewList, changeContext);
     } else {
-      _routerUpdater.queueEvent(changeContext, ClusterEventType.ExternalViewChange,
-          HelixConstants.ChangeType.EXTERNAL_VIEW);
+      ClusterEventType eventType;
+      if (_sourceDataType.equals(PropertyType.EXTERNALVIEW)) {
+        eventType = ClusterEventType.ExternalViewChange;
+      } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
+        eventType = ClusterEventType.TargetExternalViewChange;
+      } else {
+        logger.warn("onExternalViewChange called with dis-matched change types. Source data type "
+            + _sourceDataType + ", change type: " + changeType);
+        return;
+      }
+      _routerUpdater.queueEvent(changeContext, eventType, changeType);
     }
   }
 
@@ -274,13 +337,91 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   }
 
   @Override
-  @PreFetch(enabled = false)
+  @PreFetch(enabled = true)
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
+    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+      // Go though the live instance list and update CurrentState listeners
+      updateCurrentStatesListeners(liveInstances, changeContext);
+    }
+
     _routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
         HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
+  @Override
+  @PreFetch(enabled = false)
+  public void onStateChange(String instanceName,
+      List<CurrentState> statesInfo, NotificationContext changeContext) {
+    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+      _routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
+          HelixConstants.ChangeType.CURRENT_STATE);
+    } else {
+      logger.warn(
+          "RoutingTableProvider does not use CurrentStates as source, ignore CurrentState changes!");
+    }
+  }
+
+  final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions = new AtomicReference<>();
+
+  /**
+   * Go through all live instances in the cluster, add CurrentStateChange listener to
+   * them if they are newly added, and remove CurrentStateChange listener if instance is offline.
+   */
+  private void updateCurrentStatesListeners(List<LiveInstance> liveInstances,
+      NotificationContext changeContext) {
+    HelixManager manager = changeContext.getManager();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(manager.getClusterName());
+
+    if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
+      // on finalize, should remove all current-state listeners
+      logger.info("remove current-state listeners. lastSeenSessions: " + _lastSeenSessions);
+      liveInstances = Collections.emptyList();
+    }
+
+    Map<String, LiveInstance> curSessions = new HashMap<>();
+    for (LiveInstance liveInstance : liveInstances) {
+      curSessions.put(liveInstance.getSessionId(), liveInstance);
+    }
+
+    // Go though the live instance list and update CurrentState listeners
+    synchronized (_lastSeenSessions) {
+      Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
+      if (lastSessions == null) {
+        lastSessions = Collections.emptyMap();
+      }
+
+      // add listeners to new live instances
+      for (String session : curSessions.keySet()) {
+        if (!lastSessions.containsKey(session)) {
+          String instanceName = curSessions.get(session).getInstanceName();
+          try {
+            // add current-state listeners for new sessions
+            manager.addCurrentStateChangeListener(this, instanceName, session);
+            logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+                + instanceName + ", session: " + session + ", listener: " + this);
+          } catch (Exception e) {
+            logger.error("Fail to add current state listener for instance: " + instanceName
+                + " with session: " + session, e);
+          }
+        }
+      }
+
+      // remove current-state listener for expired session
+      for (String session : lastSessions.keySet()) {
+        if (!curSessions.containsKey(session)) {
+          String instanceName = lastSessions.get(session).getInstanceName();
+          manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+          logger.info("remove current-state listener for instance:" + instanceName + ", session: "
+              + session);
+        }
+      }
+
+      // update last-seen
+      _lastSeenSessions.set(curSessions);
+    }
+  }
+
   private void reset() {
     logger.info("Resetting the routing table.");
     RoutingTable newRoutingTable = new RoutingTable();
@@ -298,8 +439,21 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
 
   public void refresh(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
     _routingTableRef.set(newRoutingTable);
+    logger.info("Refreshed the RoutingTable for cluster " + (_helixManager != null ? _helixManager
+        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms.");
+  }
+
+  protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    long startTime = System.currentTimeMillis();
+    RoutingTable newRoutingTable =
+        new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
+    _routingTableRef.set(newRoutingTable);
+    logger.info("Refresh the RoutingTable for cluster " + (_helixManager != null ? _helixManager
+        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms.");
   }
 
   private class RouterUpdater extends ClusterEventProcessor {
@@ -324,8 +478,27 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
           throw new HelixException("HelixManager is null for router update event.");
         }
         _dataCache.refresh(manager.getHelixDataAccessor());
-        refresh(_dataCache.getExternalViews().values(), _dataCache.getInstanceConfigMap().values(),
-            _dataCache.getLiveInstances().values());
+
+        switch (_sourceDataType) {
+          case EXTERNALVIEW:
+            refresh(_dataCache.getExternalViews().values(),
+                _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
+            break;
+
+          case TARGETEXTERNALVIEW:
+            refresh(_dataCache.getTargetExternalViews().values(),
+                _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
+            break;
+
+          case CURRENTSTATES:
+            refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
+                _dataCache.getLiveInstances().values());
+            break;
+
+          default:
+            logger.warn("Unsupported source data type: " + _sourceDataType
+                + ", stop refreshing the routing table!");
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/MockAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
index 583dbd8..14c1845 100644
--- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
@@ -205,8 +205,11 @@ public class MockAccessor implements HelixDataAccessor {
 
   @Override
   public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys, List<T> children) {
-    // TODO Auto-generated method stub
-    return null;
+    boolean[] results = new boolean[keys.size()];
+    for (int i = 0; i < keys.size(); i++) {
+      results[i] = setProperty(keys.get(i), children.get(i));
+    }
+    return results;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
deleted file mode 100644
index 0f72091..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.integration.Spectator;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
-import org.mockito.internal.util.collections.Sets;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestRoutingTableProvider extends ZkIntegrationTestBase {
-
-  static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name();
-  static final String TEST_DB = "TestDB";
-  static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName();
-  static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
-  static final int PARTICIPANT_NUMBER = 3;
-  static final int PARTICIPANT_START_PORT = 12918;
-
-  static final int PARTITION_NUMBER = 20;
-  static final int REPLICA_NUMBER = 3;
-
-  private HelixManager _spectator;
-  private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
-  private List<String> _instances = new ArrayList<>();
-  private ClusterControllerManager _controller;
-  private HelixClusterVerifier _clusterVerifier;
-  private RoutingTableProvider _routingTableProvider;
-  private RoutingTableProvider _routingTableProvider2;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    System.out.println(
-        "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-
-    // setup storage cluster
-    _gSetupTool.addCluster(CLUSTER_NAME, true);
-
-    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
-      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
-      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
-      _instances.add(instance);
-    }
-
-    // start dummy participants
-    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
-      MockParticipantManager participant =
-          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
-      participant.syncStart();
-      _participants.add(participant);
-    }
-
-    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances,
-        STATE_MODEL, PARTITION_NUMBER, REPLICA_NUMBER);
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // start speculator
-    _routingTableProvider = new RoutingTableProvider();
-    _spectator = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
-    _spectator.connect();
-    _spectator.addExternalViewChangeListener(_routingTableProvider);
-    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
-    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
-
-    _routingTableProvider2 = new RoutingTableProvider(_spectator);
-
-    _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
-    Assert.assertTrue(_clusterVerifier.verify());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    // stop participants
-    for (MockParticipantManager p : _participants) {
-      p.syncStop();
-    }
-    _controller.syncStop();
-    _spectator.disconnect();
-    _gSetupTool.deleteCluster(CLUSTER_NAME);
-  }
-
-  @Test
-  public void testRoutingTable() {
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size());
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
-
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size());
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
-
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(1), _instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(1), _instances.get(2)));
-
-    Collection<String> databases = _routingTableProvider.getResources();
-    Assert.assertEquals(databases.size(), 1);
-  }
-
-  @Test(dependsOnMethods = { "testRoutingTable" })
-  public void testDisableInstance() throws InterruptedException {
-    // disable the master instance
-    String prevMasterInstance = _instances.get(0);
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)),
-        Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)),
-        Sets.newSet(_instances.get(2)));
-  }
-
-  @Test(dependsOnMethods = { "testDisableInstance" })
-  public void testShutdownInstance() throws InterruptedException {
-    // reenable the first instance
-    String prevMasterInstance = _instances.get(0);
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
-
-    // shutdown second instance.
-    _participants.get(1).syncStop();
-
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
-
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
-
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(2)));
-  }
-
-  private void validateRoutingTable(RoutingTableProvider routingTableProvider,
-      Set<String> masterNodes, Set<String> slaveNodes) {
-    IdealState is =
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
-    for (String p : is.getPartitionSet()) {
-      Set<String> masterInstances = new HashSet<>();
-      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "MASTER")) {
-        masterInstances.add(config.getInstanceName());
-      }
-
-      Set<String> slaveInstances = new HashSet<>();
-      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "SLAVE")) {
-        slaveInstances.add(config.getInstanceName());
-      }
-
-      Assert.assertEquals(masterInstances, masterNodes);
-      Assert.assertEquals(slaveInstances, slaveNodes);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
deleted file mode 100644
index ddd27e3..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package org.apache.helix.integration.Spectator;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyType;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.WorkflowGenerator;
-import org.apache.helix.mock.participant.MockDelayMSStateModelFactory;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.helix.tools.ClusterSetup;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestRoutingTableProviderWithSourceType extends ZkIntegrationTestBase {
-  private HelixManager _manager;
-  private ClusterSetup _setupTool;
-  private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
-  private final int NUM_NODES = 10;
-  protected int NUM_PARTITIONS = 20;
-  protected int NUM_REPLICAS = 3;
-  private final int START_PORT = 12918;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private MockParticipantManager[] _participants;
-  private ClusterControllerManager _controller;
-  private ConfigAccessor _configAccessor;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    _participants =  new MockParticipantManager[NUM_NODES];
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
-    _setupTool.addCluster(CLUSTER_NAME, true);
-
-    _participants = new MockParticipantManager[NUM_NODES];
-    for (int i = 0; i < NUM_NODES; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
-        MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
-
-    _setupTool
-        .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
-
-    for (int i = 0; i < NUM_NODES; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // add a delayed state model
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      MockDelayMSStateModelFactory delayFactory =
-          new MockDelayMSStateModelFactory().setDelay(-300000L);
-      stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
-      _participants[i].syncStart();
-    }
-
-    _manager = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-    _configAccessor = new ConfigAccessor(_gZkClient);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    for (int i = 0; i < NUM_NODES; i++) {
-      if (_participants[i] != null && _participants[i].isConnected()) {
-        _participants[i].reset();
-      }
-    }
-  }
-
-  @Test (expectedExceptions = HelixException.class)
-  public void testTargetExternalViewWithoutEnable() {
-    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
-  }
-
-  @Test
-  public void testExternalViewDoesNotExist() {
-    String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1;
-    RoutingTableProvider externalViewProvider =
-        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
-    Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(),
-        0);
-  }
-
-  @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
-  public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
-    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
-    clusterConfig.enableTargetExternalView(true);
-    clusterConfig.setPersistBestPossibleAssignment(true);
-    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    Thread.sleep(2000);
-
-    RoutingTableProvider externalViewProvider =
-        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
-    RoutingTableProvider targetExternalViewProvider =
-        new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
-
-    // ExternalView should not contain any MASTERS
-    // TargetExternalView should contain MASTERS same as the partition number
-    Set<InstanceConfig> externalViewMasters =
-        externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
-    Assert.assertEquals(externalViewMasters.size(), 0);
-    Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
-        .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
-    Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
-
-    // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
-    Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool()
-        .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord()
-        .getMapFields();
-
-    Set<String> idealMasters = new HashSet<>();
-    Set<String> targetMasters = new HashSet<>();
-    for (Map<String, String> instanceMap : stateMap.values()) {
-      for (String instance : instanceMap.keySet()) {
-        if (instanceMap.get(instance).equals("MASTER")) {
-          idealMasters.add(instance);
-        }
-      }
-    }
-
-    for (InstanceConfig instanceConfig : targetExternalViewMasters) {
-      targetMasters.add(instanceConfig.getInstanceName());
-    }
-    Assert.assertTrue(idealMasters.equals(targetMasters));
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
new file mode 100644
index 0000000..eb8f4b1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -0,0 +1,176 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.mockito.internal.util.collections.Sets;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProvider extends ZkIntegrationTestBase {
+
+  static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name();
+  static final String TEST_DB = "TestDB";
+  static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName();
+  static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  static final int PARTICIPANT_NUMBER = 3;
+  static final int PARTICIPANT_START_PORT = 12918;
+
+  static final int PARTITION_NUMBER = 20;
+  static final int REPLICA_NUMBER = 3;
+
+  private HelixManager _spectator;
+  private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  private List<String> _instances = new ArrayList<>();
+  private ClusterControllerManager _controller;
+  private HelixClusterVerifier _clusterVerifier;
+  private RoutingTableProvider _routingTableProvider;
+  private RoutingTableProvider _routingTableProvider2;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println(
+        "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances,
+        STATE_MODEL, PARTITION_NUMBER, REPLICA_NUMBER);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // start speculator
+    _routingTableProvider = new RoutingTableProvider();
+    _spectator = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator.connect();
+    _spectator.addExternalViewChangeListener(_routingTableProvider);
+    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
+    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
+
+    _routingTableProvider2 = new RoutingTableProvider(_spectator);
+
+    _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // stop participants
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+    _controller.syncStop();
+    _spectator.disconnect();
+    _gSetupTool.deleteCluster(CLUSTER_NAME);
+  }
+
+  @Test
+  public void testRoutingTable() {
+    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
+
+    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
+
+    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(1), _instances.get(2)));
+    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(1), _instances.get(2)));
+
+    Collection<String> databases = _routingTableProvider.getResources();
+    Assert.assertEquals(databases.size(), 1);
+  }
+
+  @Test(dependsOnMethods = { "testRoutingTable" })
+  public void testDisableInstance() throws InterruptedException {
+    // disable the master instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)),
+        Sets.newSet(_instances.get(2)));
+  }
+
+  @Test(dependsOnMethods = { "testDisableInstance" })
+  public void testShutdownInstance() throws InterruptedException {
+    // reenable the first instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
+
+    // shutdown second instance.
+    _participants.get(1).syncStop();
+
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
+
+    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
+
+    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(2)));
+  }
+
+  private void validateRoutingTable(RoutingTableProvider routingTableProvider,
+      Set<String> masterNodes, Set<String> slaveNodes) {
+    IdealState is =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    for (String p : is.getPartitionSet()) {
+      Set<String> masterInstances = new HashSet<>();
+      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "MASTER")) {
+        masterInstances.add(config.getInstanceName());
+      }
+
+      Set<String> slaveInstances = new HashSet<>();
+      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "SLAVE")) {
+        slaveInstances.add(config.getInstanceName());
+      }
+
+      Assert.assertEquals(masterInstances, masterNodes);
+      Assert.assertEquals(slaveInstances, slaveNodes);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
new file mode 100644
index 0000000..6187cff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -0,0 +1,162 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderFromCurrentStates extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    ConfigAccessor _configAccessor = _manager.getConfigAccessor();
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.enableTargetExternalView(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test
+  public void testRoutingTableWithCurrentStates() throws InterruptedException {
+    RoutingTableProvider routingTableEV =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    RoutingTableProvider routingTableCurrentStates = new RoutingTableProvider(_manager, PropertyType.CURRENTSTATES);
+
+    String db1 = "TestDB-1";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS);
+
+    Thread.sleep(200);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verify());
+
+    IdealState idealState1 = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, db1);
+    validate(idealState1, routingTableEV, routingTableCurrentStates);
+
+    // add new DB
+    String db2 = "TestDB-2";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS);
+
+    Thread.sleep(200);
+    Assert.assertTrue(clusterVerifier.verify());
+
+    IdealState idealState2 = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, db2);
+    validate(idealState2, routingTableEV, routingTableCurrentStates);
+
+    // shutdown an instance
+    _participants[0].syncStop();
+    Thread.sleep(200);
+    Assert.assertTrue(clusterVerifier.verify());
+    validate(idealState1, routingTableEV, routingTableCurrentStates);
+    validate(idealState2, routingTableEV, routingTableCurrentStates);
+  }
+
+  @Test (dependsOnMethods = {"testRoutingTableWithCurrentStates"})
+  public void testWithSupportSourceDataType() {
+    new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+    new RoutingTableProvider(_manager, PropertyType.CURRENTSTATES);
+
+    try {
+      new RoutingTableProvider(_manager, PropertyType.IDEALSTATES);
+      Assert.fail();
+    } catch (HelixException ex) {
+      Assert.assertTrue(ex.getMessage().contains("Unsupported source data type"));
+    }
+  }
+
+  private void validate(IdealState idealState, RoutingTableProvider routingTableEV,
+      RoutingTableProvider routingTableCurrentStates) {
+    String db = idealState.getResourceName();
+    Set<String> partitions = idealState.getPartitionSet();
+    for (String partition : partitions) {
+      List<InstanceConfig> masterInsEv =
+          routingTableEV.getInstancesForResource(db, partition, "MASTER");
+      List<InstanceConfig> masterInsCs =
+          routingTableCurrentStates.getInstancesForResource(db, partition, "MASTER");
+      Assert.assertEquals(masterInsEv.size(), 1);
+      Assert.assertEquals(masterInsCs.size(), 1);
+      Assert.assertEquals(masterInsCs, masterInsEv);
+
+      List<InstanceConfig> slaveInsEv =
+          routingTableEV.getInstancesForResource(db, partition, "SLAVE");
+      List<InstanceConfig> slaveInsCs =
+          routingTableCurrentStates.getInstancesForResource(db, partition, "SLAVE");
+      Assert.assertEquals(slaveInsEv.size(), 2);
+      Assert.assertEquals(slaveInsCs.size(), 2);
+      Assert.assertEquals(new HashSet(slaveInsCs), new HashSet(slaveInsEv));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
new file mode 100644
index 0000000..fbf17cf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.mock.participant.MockDelayMSStateModelFactory;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderFromTargetEV extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
+
+    _setupTool
+        .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // add a delayed state model
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      MockDelayMSStateModelFactory delayFactory =
+          new MockDelayMSStateModelFactory().setDelay(-300000L);
+      stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test (expectedExceptions = HelixException.class)
+  public void testTargetExternalViewWithoutEnable() {
+    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+  }
+
+  @Test
+  public void testExternalViewDoesNotExist() {
+    String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1;
+    RoutingTableProvider externalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(),
+        0);
+  }
+
+  @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
+  public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.enableTargetExternalView(true);
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    Thread.sleep(2000);
+
+    RoutingTableProvider externalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    RoutingTableProvider targetExternalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+
+    // ExternalView should not contain any MASTERS
+    // TargetExternalView should contain MASTERS same as the partition number
+    Set<InstanceConfig> externalViewMasters =
+        externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+    Assert.assertEquals(externalViewMasters.size(), 0);
+    Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
+        .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+    Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
+
+    // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
+    Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord()
+        .getMapFields();
+
+    Set<String> idealMasters = new HashSet<>();
+    Set<String> targetMasters = new HashSet<>();
+    for (Map<String, String> instanceMap : stateMap.values()) {
+      for (String instance : instanceMap.keySet()) {
+        if (instanceMap.get(instance).equals("MASTER")) {
+          idealMasters.add(instance);
+        }
+      }
+    }
+
+    for (InstanceConfig instanceConfig : targetExternalViewMasters) {
+      targetMasters.add(instanceConfig.getInstanceName());
+    }
+    Assert.assertTrue(idealMasters.equals(targetMasters));
+  }
+}