You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/03/20 18:32:58 UTC

[helix] branch customizeView updated: Complete the Routing Table Provider for CustomizedView (#834)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch customizeView
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/customizeView by this push:
     new 3f8ebc3  Complete the Routing Table Provider for CustomizedView (#834)
3f8ebc3 is described below

commit 3f8ebc3a0bf2dbba4d01bdbea71bd339384e2ba0
Author: Ali Reza Zamani Zadeh Najari <an...@linkedin.com>
AuthorDate: Fri Mar 20 11:32:47 2020 -0700

    Complete the Routing Table Provider for CustomizedView (#834)
    
    In this commit, the routing table provider has been changed in a way to include customized view feature.
---
 .../main/java/org/apache/helix/HelixManager.java   |   2 +-
 .../main/java/org/apache/helix/PropertyKey.java    |  12 +-
 .../java/org/apache/helix/PropertyPathBuilder.java |   9 +-
 .../helix/common/caches/CustomizedViewCache.java   |  16 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |   4 +-
 .../spectator/CustomizedViewRoutingTable.java      | 112 ++++
 .../apache/helix/spectator/RoutingDataCache.java   |  94 +++-
 .../org/apache/helix/spectator/RoutingTable.java   |  52 +-
 .../helix/spectator/RoutingTableProvider.java      | 582 +++++++++++++++------
 .../helix/spectator/RoutingTableSnapshot.java      |  37 ++
 .../controller/stages/DummyClusterManager.java     |   2 +-
 .../spectator/TestRoutingTableProvider.java        | 109 ++++
 .../TestRoutingTableProviderPeriodicRefresh.java   |  26 +-
 .../java/org/apache/helix/mock/MockManager.java    |   2 +-
 .../helix/participant/MockZKHelixManager.java      |   2 +-
 15 files changed, 837 insertions(+), 224 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index e4adbd0..5740196 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -242,7 +242,7 @@ public interface HelixManager {
    * @see CustomizedViewChangeListener#onCustomizedViewChange(List, NotificationContext)
    * @param listener
    */
-  void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) throws Exception;
+  void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) throws Exception;
 
   /**
    * @see ExternalViewChangeListener#onExternalViewChange(List, NotificationContext)
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 608c037..57d5925 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -640,20 +640,20 @@ public class PropertyKey {
 
     /**
      * Get a property key associated with an {@link CustomizedView} of a type
-     * @param aggregationType
+     * @param customizedStateType
      * @return {@link PropertyKey}
      */
-    public PropertyKey customizedView(String aggregationType) {
-      return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, aggregationType);
+    public PropertyKey customizedView(String customizedStateType) {
+      return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, customizedStateType);
     }
 
     /**
      * Get a property key associated with an {@link CustomizedView} of a type and resource
-     * @param aggregationType
+     * @param customizedStateType
      * @return {@link PropertyKey}
      */
-    public PropertyKey customizedView(String aggregationType, String resourceName) {
-      return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, aggregationType,
+    public PropertyKey customizedView(String customizedStateType, String resourceName) {
+      return new PropertyKey(CUSTOMIZEDVIEW, CustomizedView.class, _clusterName, customizedStateType,
           resourceName);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
index d9bb2b4..f39530e 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
 
 import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.PropertyType.CONFIGS;
 import static org.apache.helix.PropertyType.CURRENTSTATES;
+import static org.apache.helix.PropertyType.CUSTOMIZEDVIEW;
 import static org.apache.helix.PropertyType.EXTERNALVIEW;
 import static org.apache.helix.PropertyType.HISTORY;
 import static org.apache.helix.PropertyType.IDEALSTATES;
@@ -70,6 +72,7 @@ public class PropertyPathBuilder {
     typeToClassMapping.put(IDEALSTATES, IdealState.class);
     typeToClassMapping.put(CONFIGS, InstanceConfig.class);
     typeToClassMapping.put(EXTERNALVIEW, ExternalView.class);
+    typeToClassMapping.put(CUSTOMIZEDVIEW, CustomizedView.class);
     typeToClassMapping.put(STATEMODELDEFS, StateModelDefinition.class);
     typeToClassMapping.put(MESSAGES, Message.class);
     typeToClassMapping.put(CURRENTSTATES, CurrentState.class);
@@ -94,6 +97,10 @@ public class PropertyPathBuilder {
     addEntry(PropertyType.IDEALSTATES, 2, "/{clusterName}/IDEALSTATES/{resourceName}");
     addEntry(PropertyType.EXTERNALVIEW, 1, "/{clusterName}/EXTERNALVIEW");
     addEntry(PropertyType.EXTERNALVIEW, 2, "/{clusterName}/EXTERNALVIEW/{resourceName}");
+    addEntry(PropertyType.CUSTOMIZEDVIEW, 1, "/{clusterName}/CUSTOMIZEDVIEW");
+    addEntry(PropertyType.CUSTOMIZEDVIEW, 2, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}");
+    addEntry(PropertyType.CUSTOMIZEDVIEW, 3, "/{clusterName}/CUSTOMIZEDVIEW/{customizedStateType}/{resourceName}");
+
     addEntry(PropertyType.TARGETEXTERNALVIEW, 1, "/{clusterName}/TARGETEXTERNALVIEW");
     addEntry(PropertyType.TARGETEXTERNALVIEW, 2,
         "/{clusterName}/TARGETEXTERNALVIEW/{resourceName}");
@@ -269,7 +276,7 @@ public class PropertyPathBuilder {
   public static String externalView(String clusterName, String resourceName) {
     return String.format("/%s/EXTERNALVIEW/%s", clusterName, resourceName);
   }
-
+  
   public static String targetExternalView(String clusterName) {
     return String.format("/%s/TARGETEXTERNALVIEW", clusterName);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java
index a718872..d46be64 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CustomizedViewCache.java
@@ -47,19 +47,19 @@ public class CustomizedViewCache extends AbstractDataCache<CustomizedView> {
   protected Map<String, CustomizedView> _customizedViewCache;
   protected String _clusterName;
   private PropertyType _propertyType;
-  private String _aggregationType;
+  private String _customizedStateType;
 
-  public CustomizedViewCache(String clusterName, String aggregationType) {
-    this(clusterName, PropertyType.CUSTOMIZEDVIEW, aggregationType);
+  public CustomizedViewCache(String clusterName, String customizedStateType) {
+    this(clusterName, PropertyType.CUSTOMIZEDVIEW, customizedStateType);
   }
 
-  protected CustomizedViewCache(String clusterName, PropertyType propertyType, String aggregationType) {
+  protected CustomizedViewCache(String clusterName, PropertyType propertyType, String customizedStateType) {
     super(createDefaultControlContextProvider(clusterName));
     _clusterName = clusterName;
     _customizedViewMap = Collections.emptyMap();
     _customizedViewCache = Collections.emptyMap();
     _propertyType = propertyType;
-    _aggregationType = aggregationType;
+    _customizedStateType = customizedStateType;
   }
 
 
@@ -105,14 +105,14 @@ public class CustomizedViewCache extends AbstractDataCache<CustomizedView> {
     _customizedViewMap = new HashMap<>(newCustomizedViewMap);
 
     long endTime = System.currentTimeMillis();
-    LOG.info("Refresh " + _customizedViewMap.size() + " CustomizedViews of type " + _aggregationType
+    LOG.info("Refresh " + _customizedViewMap.size() + " CustomizedViews of type " + _customizedStateType
         + " for cluster " + _clusterName + ", took " + (endTime - startTime) + " ms");
   }
 
   private PropertyKey customizedViewsKey(PropertyKey.Builder keyBuilder) {
     PropertyKey customizedViewPropertyKey;
     if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)){
-      customizedViewPropertyKey = keyBuilder.customizedView(_aggregationType);
+      customizedViewPropertyKey = keyBuilder.customizedView(_customizedStateType);
     } else {
       throw new HelixException(
           "Failed to refresh CustomizedViewCache, Wrong property type " + _propertyType + "!");
@@ -123,7 +123,7 @@ public class CustomizedViewCache extends AbstractDataCache<CustomizedView> {
   private PropertyKey customizedViewKey(PropertyKey.Builder keyBuilder, String resource) {
     PropertyKey customizedViewPropertyKey;
     if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
-      customizedViewPropertyKey = keyBuilder.customizedView(_aggregationType, resource);
+      customizedViewPropertyKey = keyBuilder.customizedView(_customizedStateType, resource);
     } else {
       throw new HelixException(
           "Failed to refresh CustomizedViewCache, Wrong property type " + _propertyType + "!");
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 1fde7a4..7b7f33c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -584,9 +584,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   @Override
-  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType)
+  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType)
       throws Exception {
-    addListener(listener, new Builder(_clusterName).customizedView(aggregationType),
+    addListener(listener, new Builder(_clusterName).customizedView(customizedStateType),
         ChangeType.CUSTOMIZED_VIEW, new EventType[] {
             EventType.NodeChildrenChanged
         });
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/CustomizedViewRoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/CustomizedViewRoutingTable.java
new file mode 100644
index 0000000..5f986ce
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/CustomizedViewRoutingTable.java
@@ -0,0 +1,112 @@
+package org.apache.helix.spectator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.CustomizedView;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CustomizedViewRoutingTable extends RoutingTable {
+  private static final Logger logger = LoggerFactory.getLogger(CustomizedViewRoutingTable.class);
+
+  private final Collection<CustomizedView> _customizedViews;
+  /*
+   * The customizedStateType field is the type the controller is aggregating.
+   * For example if RoutingTableProvider initialized using the code below:
+   * Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+   * sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
+   * RoutingTableProvider routingTableProvider =
+   * new RoutingTableProvider(_spectator, sourceDataTypes);
+   * Each one of the TypeA and TypeB is a customizedStateType.
+   */
+  private final String _customizedStateType;
+
+  public CustomizedViewRoutingTable(PropertyType propertyType, String customizedStateType) {
+    this(Collections.<CustomizedView> emptyList(), propertyType, customizedStateType);
+  }
+
+  protected CustomizedViewRoutingTable(Collection<CustomizedView> customizedViews,
+      PropertyType propertytype, String customizedStateType) {
+    this(customizedViews, Collections.<InstanceConfig> emptyList(),
+        Collections.<LiveInstance> emptyList(), propertytype, customizedStateType);
+  }
+
+  protected CustomizedViewRoutingTable(Collection<CustomizedView> customizedViews,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+      PropertyType propertytype, String customizedStateType) {
+    super(Collections.<ExternalView> emptyList(), instanceConfigs, liveInstances,
+        PropertyType.CUSTOMIZEDVIEW);
+    _customizedStateType = customizedStateType;
+    _customizedViews = new HashSet<>(customizedViews);
+    refresh(_customizedViews);
+  }
+
+  private void refresh(Collection<CustomizedView> customizedViewList) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    if (customizedViewList != null && !customizedViewList.isEmpty()) {
+      for (InstanceConfig config : _instanceConfigs) {
+        instanceConfigMap.put(config.getId(), config);
+      }
+      for (CustomizedView customizeView : customizedViewList) {
+        String resourceName = customizeView.getId();
+        for (String partitionName : customizeView.getPartitionSet()) {
+          Map<String, String> stateMap = customizeView.getStateMap(partitionName);
+          for (String instanceName : stateMap.keySet()) {
+            String customizedState = stateMap.get(instanceName);
+            if (instanceConfigMap.containsKey(instanceName)) {
+              InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+              addEntry(resourceName, partitionName, customizedState, instanceConfig);
+            } else {
+              logger.warn(
+                  "Participant {} is not found with proper configuration information. It might already be removed from the cluster. "
+                      + "Skip recording partition assignment entry: Partition {}, Participant {}, State {}.",
+                  instanceName, partitionName, instanceName, stateMap.get(instanceName));
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns CustomizedView.
+   * @return a collection of CustomizedView
+   */
+  protected Collection<CustomizedView> geCustomizedViews() {
+    return Collections.unmodifiableCollection(_customizedViews);
+  }
+
+  /**
+   * Returns CustomizedStateType
+   * @return the CustomizedStateType of this RoutingTable (Used for CustomizedView)
+   */
+  protected String getStateType() {
+    return _customizedStateType;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index bfed4b5..bfae632 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -19,16 +19,23 @@ package org.apache.helix.spectator;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.common.caches.BasicClusterDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.CurrentStateSnapshot;
+import org.apache.helix.common.caches.CustomizedViewCache;
 import org.apache.helix.common.caches.TargetExternalViewCache;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
@@ -40,14 +47,31 @@ import org.slf4j.LoggerFactory;
 class RoutingDataCache extends BasicClusterDataCache {
   private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
 
-  private final PropertyType _sourceDataType;
+  private final Map<PropertyType, List<String>> _sourceDataTypeMap;
+
   private CurrentStateCache _currentStateCache;
+  // TODO: CustomizedCache needs to be migrated to propertyCache. Once we migrate all cache to
+  // propertyCache, this hardcoded list of fields won't be necessary.
+  private Map<String, CustomizedViewCache> _customizedViewCaches;
   private TargetExternalViewCache _targetExternalViewCache;
 
   public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
+    this (clusterName, ImmutableMap.of(sourceDataType, Collections.emptyList()));
+  }
+
+  /**
+   * Initialize empty RoutingDataCache with clusterName, _propertyTypes.
+   * @param clusterName
+   * @param sourceDataTypeMap
+   */
+  public RoutingDataCache(String clusterName, Map<PropertyType, List<String>> sourceDataTypeMap) {
     super(clusterName);
-    _sourceDataType = sourceDataType;
+    _sourceDataTypeMap = sourceDataTypeMap;
     _currentStateCache = new CurrentStateCache(clusterName);
+    _customizedViewCaches = new HashMap<>();
+    sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())
+        .forEach(customizedStateType -> _customizedViewCaches.put(customizedStateType,
+            new CustomizedViewCache(clusterName, customizedStateType)));
     _targetExternalViewCache = new TargetExternalViewCache(clusterName);
     requireFullRefresh();
   }
@@ -65,25 +89,41 @@ class RoutingDataCache extends BasicClusterDataCache {
     long startTime = System.currentTimeMillis();
 
     super.refresh(accessor);
-
-    if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW) && _propertyDataChangedMap
-        .get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
-      long start = System.currentTimeMillis();
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
-      _targetExternalViewCache.refresh(accessor);
-      LOG.info("Reload " + _targetExternalViewCache.getExternalViewMap().keySet().size()
-          + " TargetExternalViews. Takes " + (System.currentTimeMillis() - start) + " ms");
-    }
-
-    if (_sourceDataType.equals(PropertyType.CURRENTSTATES) && _propertyDataChangedMap
-        .get(HelixConstants.ChangeType.CURRENT_STATE)) {
+    for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
       long start = System.currentTimeMillis();
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
-      Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
-      _currentStateCache.refresh(accessor, liveInstanceMap);
-      LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
+      switch (propertyType) {
+      case TARGETEXTERNALVIEW:
+        if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+          _propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
+          _targetExternalViewCache.refresh(accessor);
+          LOG.info("Reload " + _targetExternalViewCache.getExternalViewMap().keySet().size()
+              + " TargetExternalViews. Takes " + (System.currentTimeMillis() - start) + " ms");
+        }
+        break;
+      case CURRENTSTATES:
+        if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE)) {
+          _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
+          Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
+          _currentStateCache.refresh(accessor, liveInstanceMap);
+          LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
+        }
+        break;
+      case CUSTOMIZEDVIEW: {
+        if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW)) {
+          for (String customizedStateType : _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW)) {
+            _customizedViewCaches.get(customizedStateType).refresh(accessor);
+          }
+          LOG.info("Reload CustomizedView for types "
+              + _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW) + " Takes "
+              + (System.currentTimeMillis() - start) + " ms");
+        }
+        _propertyDataChangedMap.put(HelixConstants.ChangeType.CUSTOMIZED_VIEW, false);
+      }
+        break;
+      default:
+        break;
+      }
     }
-
     long endTime = System.currentTimeMillis();
     LOG.info("END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
         - startTime) + " ms");
@@ -91,6 +131,10 @@ class RoutingDataCache extends BasicClusterDataCache {
     if (LOG.isDebugEnabled()) {
       LOG.debug("CurrentStates: " + _currentStateCache);
       LOG.debug("TargetExternalViews: " + _targetExternalViewCache.getExternalViewMap());
+      for (String customizedStateType : _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW)) {
+        LOG.debug("CustomizedViews customizedStateType: " + customizedStateType + " "
+            + _customizedViewCaches.get(customizedStateType).getCustomizedViewMap());
+      }
     }
   }
 
@@ -104,6 +148,18 @@ class RoutingDataCache extends BasicClusterDataCache {
   }
 
   /**
+   * Retrieves the CustomizedView for all resources
+   * @return
+   */
+  public Map<String, CustomizedView> getCustomizedView(String customizedStateType) {
+    if (_customizedViewCaches.containsKey(customizedStateType)) {
+      return _customizedViewCaches.get(customizedStateType).getCustomizedViewMap();
+    }
+    throw new HelixException(String.format(
+        "customizedStateType %s does not exist in customizedViewCaches.", customizedStateType));
+  }
+
+  /**
    * Get map of current states in cluster. {InstanceName -> {SessionId -> {ResourceName ->
    * CurrentState}}}
    *
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index dd1b623..89ad16f 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.helix.PropertyType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
@@ -38,42 +39,64 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A class to consume ExternalViews of a cluster and provide {resource, partition, state} to
- * {instances} map function.
+ * A class to consume ExternalViews or CustomizedViews of a cluster and provide
+ * {resource, partition, state} to {instances} map function.
  */
 class RoutingTable {
   private static final Logger logger = LoggerFactory.getLogger(RoutingTable.class);
 
   // mapping a resourceName to the ResourceInfo
   private final Map<String, ResourceInfo> _resourceInfoMap;
+
   // mapping a resource group name to a resourceGroupInfo
   private final Map<String, ResourceGroupInfo> _resourceGroupInfoMap;
 
   private final Collection<LiveInstance> _liveInstances;
-  private final Collection<InstanceConfig> _instanceConfigs;
+  protected final Collection<InstanceConfig> _instanceConfigs;
   private final Collection<ExternalView> _externalViews;
 
+  private final PropertyType _propertyType;
+
+  @Deprecated
   public RoutingTable() {
     this(Collections.<ExternalView> emptyList(), Collections.<InstanceConfig> emptyList(),
         Collections.<LiveInstance> emptyList());
   }
 
+  /**
+   * Initialize empty RoutingTable and set _propertyType fields.
+   * @param propertyType
+   */
+  protected RoutingTable(PropertyType propertyType) {
+    this(Collections.<ExternalView> emptyList(), Collections.<InstanceConfig> emptyList(),
+        Collections.<LiveInstance> emptyList(), propertyType);
+  }
+
   public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
     // TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to
     // refresh according to the currentStateMap. - jjwang
-    this(Collections.<ExternalView> emptyList(), instanceConfigs, liveInstances);
+    this(Collections.<ExternalView> emptyList(),
+        instanceConfigs, liveInstances, PropertyType.CURRENTSTATES);
     refresh(currentStateMap);
   }
 
   public RoutingTable(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    this(externalViews, instanceConfigs, liveInstances,
+        PropertyType.EXTERNALVIEW);
+  }
+
+  protected RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs,
+      Collection<LiveInstance> liveInstances, PropertyType propertytype) {
+    // TODO Refactor these constructors so we don't have so many constructor.
+    _propertyType = propertytype;
     _resourceInfoMap = new HashMap<>();
     _resourceGroupInfoMap = new HashMap<>();
     _liveInstances = new HashSet<>(liveInstances);
     _instanceConfigs = new HashSet<>(instanceConfigs);
     _externalViews = new HashSet<>(externalViews);
-    refresh(externalViews);
+    refresh(_externalViews);
   }
 
   private void refresh(Collection<ExternalView> externalViewList) {
@@ -145,7 +168,7 @@ class RoutingTable {
     }
   }
 
-  private void addEntry(String resourceName, String partitionName, String state,
+  protected void addEntry(String resourceName, String partitionName, String state,
       InstanceConfig config) {
     if (!_resourceInfoMap.containsKey(resourceName)) {
       _resourceInfoMap.put(resourceName, new ResourceInfo());
@@ -352,6 +375,23 @@ class RoutingTable {
   }
 
   /**
+   * Returns PropertyTYpe
+   * @return the PropertyTYpe of this RoutingTable
+   */
+  protected PropertyType getPropertyType() {
+    return _propertyType;
+  }
+
+  /**
+   * Returns CustomizedStateType
+   * @return the CustomizedStateType of this RoutingTable (Used for CustomizedView)
+   */
+  protected String getStateType() {
+    return RoutingTableProvider.DEFAULT_STATE_TYPE;
+  }
+
+
+  /**
    * Class to store instances, partitions and their states for each resource.
    */
   class ResourceInfo {
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 1a8f641..1e11965 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.ImmutableMap;
 import javax.management.JMException;
 
 import org.apache.helix.HelixConstants;
@@ -44,6 +45,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedViewChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
@@ -55,6 +57,7 @@ import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -64,15 +67,15 @@ import org.slf4j.LoggerFactory;
 
 public class RoutingTableProvider
     implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
-    LiveInstanceChangeListener, CurrentStateChangeListener {
+    LiveInstanceChangeListener, CurrentStateChangeListener, CustomizedViewChangeListener {
   private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
   private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes
-  private final AtomicReference<RoutingTable> _routingTableRef;
+  private final Map<String, AtomicReference<RoutingTable>> _routingTableRefMap;
   private final HelixManager _helixManager;
   private final RouterUpdater _routerUpdater;
-  private final PropertyType _sourceDataType;
+  private final Map<PropertyType, List<String>> _sourceDataTypeMap;
   private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
-  private final RoutingTableProviderMonitor _monitor;
+  private final Map<PropertyType, RoutingTableProviderMonitor> _monitorMap;
 
   // For periodic refresh
   private long _lastRefreshTimestamp;
@@ -83,17 +86,27 @@ public class RoutingTableProvider
   private ExecutorService _reportExecutor;
   private Future _reportingTask = null;
 
+  protected static final  String DEFAULT_PROPERTY_TYPE = "HELIX_DEFAULT_PROPERTY";
+  protected static final  String DEFAULT_STATE_TYPE = "HELIX_DEFAULT";
+
+
   public RoutingTableProvider() {
     this(null);
   }
 
   public RoutingTableProvider(HelixManager helixManager) throws HelixException {
-    this(helixManager, PropertyType.EXTERNALVIEW, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
+    this(helixManager, ImmutableMap.of(PropertyType.EXTERNALVIEW, Collections.emptyList()), true,
+        DEFAULT_PERIODIC_REFRESH_INTERVAL);
   }
 
   public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
       throws HelixException {
-    this(helixManager, sourceDataType, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
+    this(helixManager, ImmutableMap.of(sourceDataType, Collections.emptyList()), true,
+        DEFAULT_PERIODIC_REFRESH_INTERVAL);
+  }
+
+  public RoutingTableProvider(HelixManager helixManager, Map<PropertyType, List<String>> sourceDataTypeMap) {
+    this(helixManager, sourceDataTypeMap, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
   }
 
   /**
@@ -106,73 +119,75 @@ public class RoutingTableProvider
    */
   public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType,
       boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws HelixException {
-    _routingTableRef = new AtomicReference<>(new RoutingTable());
+    this(helixManager, ImmutableMap.of(sourceDataType, Collections.emptyList()),
+        isPeriodicRefreshEnabled, periodRefreshInterval);
+  }
+
+  /**
+   * Initialize an instance of RoutingTableProvider
+   * @param helixManager
+   * @param sourceDataTypeMap
+   * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise
+   * @param periodRefreshInterval only effective if isPeriodRefreshEnabled is true
+   * @throws HelixException
+   */
+  public RoutingTableProvider(HelixManager helixManager,
+      Map<PropertyType, List<String>> sourceDataTypeMap, boolean isPeriodicRefreshEnabled,
+      long periodRefreshInterval) throws HelixException {
+
+    validateSourceDataTypeMap(sourceDataTypeMap);
+
+    _routingTableRefMap = new HashMap<>();
     _helixManager = helixManager;
-    _sourceDataType = sourceDataType;
+    _sourceDataTypeMap = sourceDataTypeMap;
     _routingTableChangeListenerMap = new ConcurrentHashMap<>();
     String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
 
-    _monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName);
-    try {
-      _monitor.register();
-    } catch (JMException e) {
-      logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
-    }
-    _reportExecutor = Executors.newSingleThreadExecutor();
-
-    _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
-    _routerUpdater.start();
-
-    if (_helixManager != null) {
-      switch (_sourceDataType) {
-      case EXTERNALVIEW:
-        try {
-          _helixManager.addExternalViewChangeListener(this);
-        } catch (Exception e) {
-          shutdown();
-          logger.error("Failed to attach ExternalView Listener to HelixManager!");
-          throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+    // Initialize the tables
+    for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+      if (_sourceDataTypeMap.get(propertyType).size() == 0) {
+        if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
+          throw new HelixException("CustomizedView has been used without any aggregation type!");
         }
-        break;
-
-      case TARGETEXTERNALVIEW:
-        // Check whether target external has been enabled or not
-        if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
-            _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
-          shutdown();
-          throw new HelixException("Target External View is not enabled!");
+        String key = generateReferenceKey(propertyType.name(),  DEFAULT_STATE_TYPE);
+        if (_routingTableRefMap.get(key) == null) {
+          _routingTableRefMap.put(key, new AtomicReference<>(new RoutingTable(propertyType)));
         }
-
-        try {
-          _helixManager.addTargetExternalViewChangeListener(this);
-        } catch (Exception e) {
-          shutdown();
-          logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
-          throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
-              e);
+      } else {
+        if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)) {
+          throw new HelixException(
+              String.format("Type %s has been used in addition to the propertyType %s !",
+                  sourceDataTypeMap.get(propertyType), propertyType.name()));
+        }
+        for (String customizedStateType : _sourceDataTypeMap.get(propertyType)) {
+          String key = generateReferenceKey(propertyType.name(),  customizedStateType);
+          if (_routingTableRefMap.get(key) == null) {
+            _routingTableRefMap.put(key, new AtomicReference<>(
+                new CustomizedViewRoutingTable(propertyType, customizedStateType)));
+          }
         }
-        break;
-
-      case CURRENTSTATES:
-        // CurrentState change listeners will be added later in LiveInstanceChange call.
-        break;
-
-      default:
-        throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
       }
+    }
+
+    // Start Monitoring
+    _monitorMap = new HashMap<>();
 
+    for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+      _monitorMap.put(propertyType, new RoutingTableProviderMonitor(propertyType, clusterName));
       try {
-        _helixManager.addInstanceConfigChangeListener(this);
-        _helixManager.addLiveInstanceChangeListener(this);
-      } catch (Exception e) {
-        shutdown();
-        logger.error(
-            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!");
-        throw new HelixException(
-            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
-            e);
+        _monitorMap.get(propertyType).register();
+      } catch (JMException e) {
+        logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
       }
     }
+    _reportExecutor = Executors.newSingleThreadExecutor();
+
+    // Start Updaters
+    _routerUpdater = new RouterUpdater(clusterName, sourceDataTypeMap);
+    _routerUpdater.start();
+
+    // Add listeners
+    addListeners();
 
     // For periodic refresh
     if (isPeriodicRefreshEnabled && _helixManager != null) {
@@ -200,6 +215,92 @@ public class RoutingTableProvider
   }
 
   /**
+   * A method that adds the ChangeListeners to HelixManager
+   */
+  private void addListeners() {
+    if (_helixManager != null) {
+      for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+        switch (propertyType) {
+        case EXTERNALVIEW:
+          try {
+            _helixManager.addExternalViewChangeListener(this);
+          } catch (Exception e) {
+            shutdown();
+            throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+          }
+          break;
+        case CUSTOMIZEDVIEW:
+          List<String> customizedStateTypes = _sourceDataTypeMap.get(propertyType);
+          for (String customizedStateType : customizedStateTypes) {
+            try {
+              _helixManager.addCustomizedViewChangeListener(this, customizedStateType);
+            } catch (Exception e) {
+              shutdown();
+              throw new HelixException(String.format(
+                  "Failed to attach CustomizedView Listener to HelixManager for type %s!",
+                  customizedStateType), e);
+            }
+          }
+          break;
+        case TARGETEXTERNALVIEW:
+          // Check whether target external has been enabled or not
+          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
+              0)) {
+            shutdown();
+            throw new HelixException("Target External View is not enabled!");
+          }
+
+          try {
+            _helixManager.addTargetExternalViewChangeListener(this);
+          } catch (Exception e) {
+            shutdown();
+            throw new HelixException(
+                "Failed to attach TargetExternalView Listener to HelixManager!", e);
+          }
+          break;
+        case CURRENTSTATES:
+          // CurrentState change listeners will be added later in LiveInstanceChange call.
+          break;
+        default:
+          throw new HelixException(String.format("Unsupported source data type: %s", propertyType));
+        }
+      }
+      try {
+        _helixManager.addInstanceConfigChangeListener(this);
+        _helixManager.addLiveInstanceChangeListener(this);
+      } catch (Exception e) {
+        shutdown();
+        throw new HelixException(
+            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
+            e);
+      }
+    }
+  }
+
+  /**
+   * Check and validate the input of the sourceDataTypeMap parameter
+   * @param sourceDataTypeMap
+   */
+  private void validateSourceDataTypeMap(Map<PropertyType, List<String>> sourceDataTypeMap) {
+    for (PropertyType propertyType : sourceDataTypeMap.keySet()) {
+      if (propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
+          && sourceDataTypeMap.get(propertyType).size() == 0) {
+        logger.error("CustomizedView has been used without any aggregation type!");
+        throw new HelixException("CustomizedView has been used without any aggregation type!");
+      }
+      if (!propertyType.equals(PropertyType.CUSTOMIZEDVIEW)
+          && sourceDataTypeMap.get(propertyType).size() != 0) {
+        logger.error("Type has been used in addition to the propertyType {} !",
+            propertyType.name());
+        throw new HelixException(
+            String.format("Type %s has been used in addition to the propertyType %s !",
+                sourceDataTypeMap.get(propertyType), propertyType.name()));
+      }
+    }
+  }
+
+  /**
    * Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
    */
   public void shutdown() {
@@ -209,24 +310,36 @@ public class RoutingTableProvider
     }
     _routerUpdater.shutdown();
 
-    _monitor.unregister();
+
+    for (PropertyType propertyType : _monitorMap.keySet()) {
+      _monitorMap.get(propertyType).unregister();
+    }
 
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
-      switch (_sourceDataType) {
-      case EXTERNALVIEW:
-        _helixManager.removeListener(keyBuilder.externalViews(), this);
-        break;
-      case TARGETEXTERNALVIEW:
-        _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
-        break;
-      case CURRENTSTATES:
-        NotificationContext context = new NotificationContext(_helixManager);
-        context.setType(NotificationContext.Type.FINALIZE);
-        updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
-        break;
-      default:
-        break;
+      for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+        switch (propertyType) {
+        case EXTERNALVIEW:
+          _helixManager.removeListener(keyBuilder.externalViews(), this);
+          break;
+        case CUSTOMIZEDVIEW:
+          List<String> customizedStateTypes = _sourceDataTypeMap.get(propertyType);
+          // Remove listener on each individual customizedStateType
+          for (String customizedStateType : customizedStateTypes) {
+            _helixManager.removeListener(keyBuilder.customizedView(customizedStateType), this);
+          }
+          break;
+        case TARGETEXTERNALVIEW:
+          _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
+          break;
+        case CURRENTSTATES:
+          NotificationContext context = new NotificationContext(_helixManager);
+          context.setType(NotificationContext.Type.FINALIZE);
+          updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
+          break;
+        default:
+          break;
+        }
       }
     }
   }
@@ -237,7 +350,46 @@ public class RoutingTableProvider
    * @return snapshot of current routing table.
    */
   public RoutingTableSnapshot getRoutingTableSnapshot() {
-    return new RoutingTableSnapshot(_routingTableRef.get());
+    return new RoutingTableSnapshot(getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE));
+  }
+
+  /**
+   * Get an snapshot of current RoutingTable information for specific PropertyType.
+   * The snapshot is immutable, it reflects the routing table information at the time this method is
+   * called.
+   * @return snapshot of current routing table.
+   */
+  public RoutingTableSnapshot getRoutingTableSnapshot(PropertyType propertyType) {
+    return new RoutingTableSnapshot(getRoutingTableRef(propertyType.name(), DEFAULT_STATE_TYPE));
+  }
+
+  /**
+   * Get an snapshot of all of the available RoutingTable information. The snapshot is immutable, it
+   * reflects the routing table information at the time this method is called.
+   * @return snapshot associated with specific propertyType and type.
+   */
+  public RoutingTableSnapshot getRoutingTableSnapshot(PropertyType propertyType, String stateType) {
+    return new RoutingTableSnapshot(getRoutingTableRef(propertyType.name(), stateType));
+  }
+
+  /**
+   * Get an snapshot of all of the available RoutingTable information. The snapshot is immutable, it
+   * reflects the routing table information at the time this method is called.
+   * @return all of the available snapshots of current routing table.
+   */
+  public Map<String, Map<String, RoutingTableSnapshot>> getRoutingTableSnapshots() {
+    Map<String, Map<String, RoutingTableSnapshot>> snapshots = new HashMap<>();
+    for (String key : _routingTableRefMap.keySet()) {
+      RoutingTable routingTable = _routingTableRefMap.get(key).get();
+      String propertyTypeName = routingTable.getPropertyType().name();
+      String customizedStateType = routingTable.getStateType();
+      if (!snapshots.containsKey(propertyTypeName)) {
+        snapshots.put(propertyTypeName, new HashMap<>());
+      }
+      snapshots.get(propertyTypeName).put(customizedStateType,
+          new RoutingTableSnapshot(routingTable));
+    }
+    return snapshots;
   }
 
   /**
@@ -264,12 +416,10 @@ public class RoutingTableProvider
   }
 
   /**
-   * returns the instances for {resource,partition} pair that are in a specific
-   * {state}
+   * returns the instances for {resource,partition} pair that are in a specific {state}.
    * This method will be deprecated, please use the
    * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
    * @param resourceName
-   *          -
    * @param partitionName
    * @param state
    * @return empty list if there is no instance in a given state
@@ -280,17 +430,16 @@ public class RoutingTableProvider
   }
 
   /**
-   * returns the instances for {resource,partition} pair that are in a specific
-   * {state}
+   * returns the instances for {resource,partition} pair that are in a specific {state}
    * @param resourceName
-   *          -
    * @param partitionName
    * @param state
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
       String state) {
-    return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state);
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+        .getInstancesForResource(resourceName, partitionName, state);
   }
 
   /**
@@ -305,8 +454,8 @@ public class RoutingTableProvider
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
       String partitionName, String state) {
-    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
-        state);
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+        .getInstancesForResourceGroup(resourceGroupName, partitionName, state);
   }
 
   /**
@@ -322,11 +471,12 @@ public class RoutingTableProvider
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
       String partitionName, String state, List<String> resourceTags) {
-    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
-        state, resourceTags);
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+        .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
   }
 
   /**
+   * For specific routing table associated with {propertyType, stateType}
    * returns all instances for {resource} that are in a specific {state}
    * This method will be deprecated, please use the
    * {@link #getInstancesForResource(String, String) getInstancesForResource} method.
@@ -345,7 +495,8 @@ public class RoutingTableProvider
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
-    return _routingTableRef.get().getInstancesForResource(resourceName, state);
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+        .getInstancesForResource(resourceName, state);
   }
 
   /**
@@ -355,7 +506,8 @@ public class RoutingTableProvider
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
-    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state);
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+        .getInstancesForResourceGroup(resourceGroupName, state);
   }
 
   /**
@@ -367,8 +519,8 @@ public class RoutingTableProvider
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
       List<String> resourceTags) {
-    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state,
-        resourceTags);
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE)
+        .getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
   }
 
   /**
@@ -376,7 +528,11 @@ public class RoutingTableProvider
    * @return
    */
   public Collection<LiveInstance> getLiveInstances() {
-    return _routingTableRef.get().getLiveInstances();
+    // Since line instances will be the same across all _routingTableRefMap, here one of the keys
+    // will be used without considering PropertyType
+    // TODO each table will keep a separate instance list.This can be improve by only keeping one
+    // copy of the data
+    return _routingTableRefMap.values().iterator().next().get().getLiveInstances();
   }
 
   /**
@@ -384,14 +540,58 @@ public class RoutingTableProvider
    * @return
    */
   public Collection<InstanceConfig> getInstanceConfigs() {
-    return _routingTableRef.get().getInstanceConfigs();
+    // Since line instances will be the same across all _routingTableRefMap, here one of the keys
+    // will be used without considering PropertyType
+    // TODO each table will keep a separate instance list.This can be improve by only keeping one copy of the data
+    return _routingTableRefMap.values().iterator().next().get().getInstanceConfigs();
   }
 
   /**
-   * Return names of all resources (shown in ExternalView) in this cluster.
+   * Return names of all resources (shown in ExternalView or CustomizedView) in this cluster.
    */
   public Collection<String> getResources() {
-    return _routingTableRef.get().getResources();
+    return getRoutingTableRef(DEFAULT_PROPERTY_TYPE, DEFAULT_STATE_TYPE).getResources();
+  }
+
+  /**
+   * Provide the key associated with specific PropertyType and StateType for _routingTableRefMap lookup.
+   * @param propertyTypeName
+   * @param stateType
+   * @return
+   */
+  private RoutingTable getRoutingTableRef(String propertyTypeName, String stateType) {
+    if (propertyTypeName.equals(DEFAULT_PROPERTY_TYPE)) {
+      // Check whether there exist only one snapshot (_routingTableRefMap)
+      if (_routingTableRefMap.keySet().size() == 1) {
+        String key = _routingTableRefMap.keySet().iterator().next();
+        if (!_routingTableRefMap.containsKey(key)) {
+          throw new HelixException(
+              String.format("Currently there is no snapshot available for PropertyType %s and stateType %s",
+                  propertyTypeName, stateType));
+        }
+        return _routingTableRefMap.get(key).get();
+      } else {
+        throw new HelixException("There is none or more than one RoutingTableSnapshot");
+      }
+    }
+
+    if (stateType.equals(DEFAULT_STATE_TYPE)) {
+      if (propertyTypeName.equals(PropertyType.CUSTOMIZEDVIEW.name())) {
+        throw new HelixException("Specific type needs to be used for CUSTOMIZEDVIEW PropertyType");
+      }
+    }
+
+    String key = generateReferenceKey(propertyTypeName,  stateType);
+    if (!_routingTableRefMap.containsKey(key)) {
+      throw new HelixException(
+          String.format("Currently there is no snapshot available for PropertyType %s and stateType %s",
+              propertyTypeName, stateType));
+    }
+    return _routingTableRefMap.get(key).get();
+  }
+
+  private String generateReferenceKey(String propertyType, String stateType) {
+    return propertyType + "_" + stateType;
   }
 
   @Override
@@ -399,27 +599,32 @@ public class RoutingTableProvider
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
     HelixConstants.ChangeType changeType = changeContext.getChangeType();
-    if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) {
+    if (changeType != null && !_sourceDataTypeMap.containsKey(changeType.getPropertyType())) {
       logger.warn(
-          "onExternalViewChange called with mismatched change types. Source data type {}, changed data type: {}",
-          _sourceDataType, changeType);
+          "onExternalViewChange called with mismatched change types. Source data types does not contain changed data type: {}",
+          changeType);
       return;
     }
     // Refresh with full list of external view.
     if (externalViewList != null && externalViewList.size() > 0) {
       // keep this here for back-compatibility, application can call onExternalViewChange directly
       // with externalview list supplied.
-      refresh(externalViewList, changeContext);
+      String keyReference = generateReferenceKey(PropertyType.EXTERNALVIEW.name(),  DEFAULT_STATE_TYPE);
+      HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
+      List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
+      refreshExternalView(externalViewList, configList, liveInstances, keyReference);
     } else {
       ClusterEventType eventType;
-      if (_sourceDataType.equals(PropertyType.EXTERNALVIEW)) {
+      if (_sourceDataTypeMap.containsKey(PropertyType.EXTERNALVIEW)) {
         eventType = ClusterEventType.ExternalViewChange;
-      } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
+      } else if (_sourceDataTypeMap.containsKey(PropertyType.TARGETEXTERNALVIEW)) {
         eventType = ClusterEventType.TargetExternalViewChange;
       } else {
         logger.warn(
-            "onExternalViewChange called with mismatched change types. Source data type {}, change type: {}",
-            _sourceDataType, changeType);
+            "onExternalViewChange called with mismatched change types. Source data types does not contain changed data type: {}",
+            changeType);
         return;
       }
       _routerUpdater.queueEvent(changeContext, eventType, changeType);
@@ -444,11 +649,10 @@ public class RoutingTableProvider
   @PreFetch(enabled = true)
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
-    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+    if (_sourceDataTypeMap.containsKey(PropertyType.CURRENTSTATES)) {
       // Go though the live instance list and update CurrentState listeners
       updateCurrentStatesListeners(liveInstances, changeContext);
     }
-
     _routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
         HelixConstants.ChangeType.LIVE_INSTANCE);
   }
@@ -457,7 +661,7 @@ public class RoutingTableProvider
   @PreFetch(enabled = false)
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
       NotificationContext changeContext) {
-    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+    if (_sourceDataTypeMap.containsKey(PropertyType.CURRENTSTATES)) {
       _routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
           HelixConstants.ChangeType.CURRENT_STATE);
     } else {
@@ -466,6 +670,19 @@ public class RoutingTableProvider
     }
   }
 
+  @Override
+  @PreFetch(enabled = false)
+  public void onCustomizedViewChange(List<CustomizedView> customizedViewList,
+      NotificationContext changeContext) {
+    if (_sourceDataTypeMap.containsKey(PropertyType.CUSTOMIZEDVIEW)) {
+      _routerUpdater.queueEvent(changeContext, ClusterEventType.CustomizedViewChange,
+          HelixConstants.ChangeType.CUSTOMIZED_VIEW);
+    } else {
+      logger.warn(
+          "RoutingTableProvider does not use CurrentStates as source, ignore CurrentState changes!");
+    }
+  }
+
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions = new AtomicReference<>();
 
   /**
@@ -529,43 +746,58 @@ public class RoutingTableProvider
 
   private void reset() {
     logger.info("Resetting the routing table.");
-    RoutingTable newRoutingTable = new RoutingTable();
-    _routingTableRef.set(newRoutingTable);
+    RoutingTable newRoutingTable;
+    for (String key: _routingTableRefMap.keySet()) {
+      PropertyType propertyType = _routingTableRefMap.get(key).get().getPropertyType();
+      if (propertyType == PropertyType.CUSTOMIZEDVIEW) {
+        String stateType = _routingTableRefMap.get(key).get().getStateType();
+        newRoutingTable = new CustomizedViewRoutingTable(propertyType, stateType);
+      } else {
+        newRoutingTable = new RoutingTable(propertyType);
+      }
+      _routingTableRefMap.get(key).set(newRoutingTable);
+    }
   }
 
-  protected void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) {
-    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
-    List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
-    List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
-    refresh(externalViewList, configList, liveInstances);
+  protected void refreshExternalView(Collection<ExternalView> externalViews,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+      String referenceKey) {
+    long startTime = System.currentTimeMillis();
+    PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType();
+    RoutingTable newRoutingTable =
+        new RoutingTable(externalViews, instanceConfigs, liveInstances, propertyType);
+    resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
   }
 
-  protected void refresh(Collection<ExternalView> externalViews,
-      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+  protected void refreshCustomizedView(Collection<CustomizedView> customizedViews,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+      String referenceKey) {
     long startTime = System.currentTimeMillis();
-    RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
-    resetRoutingTableAndNotify(startTime, newRoutingTable);
+    PropertyType propertyType = _routingTableRefMap.get(referenceKey).get().getPropertyType();
+    String customizedStateType = _routingTableRefMap.get(referenceKey).get().getStateType();
+    RoutingTable newRoutingTable = new CustomizedViewRoutingTable(customizedViews, instanceConfigs,
+        liveInstances, propertyType, customizedStateType);
+    resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
   }
 
-  protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
-      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+  protected void refreshCurrentState(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+      String referenceKey) {
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable =
         new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
-    resetRoutingTableAndNotify(startTime, newRoutingTable);
+    resetRoutingTableAndNotify(startTime, newRoutingTable, referenceKey);
   }
 
-  private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
-    _routingTableRef.set(newRoutingTable);
+  private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable, String referenceKey) {
+    _routingTableRefMap.get(referenceKey).set(newRoutingTable);
     String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
     logger.info("Refreshed the RoutingTable for cluster {}, took {} ms.", clusterName,
         (System.currentTimeMillis() - startTime));
 
     // TODO: move the callback user code logic to separate thread upon routing table statePropagation latency
     // integration test result. If the latency is more than 2 secs, we need to change this part.
-    notifyRoutingTableChange(clusterName);
+    notifyRoutingTableChange(clusterName, referenceKey);
 
     // Update timestamp for last refresh
     if (_isPeriodicRefreshEnabled) {
@@ -573,13 +805,16 @@ public class RoutingTableProvider
     }
   }
 
-  private void notifyRoutingTableChange(String clusterName) {
-    // This call back is called in the main event queue of RoutingTableProvider. We add log to record time spent
+  private void notifyRoutingTableChange(String clusterName, String referenceKey) {
+    // This call back is called in the main event queue of RoutingTableProvider. We add log to
+    // record time spent
     // here. Potentially, we should call this callback in a separate thread if this is a bottleneck.
     long startTime = System.currentTimeMillis();
-    for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap.entrySet()) {
-      entry.getKey()
-          .onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()), entry.getValue().getContext());
+    for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
+        .entrySet()) {
+      entry.getKey().onRoutingTableChange(
+          new RoutingTableSnapshot(_routingTableRefMap.get(referenceKey).get()),
+          entry.getValue().getContext());
     }
     logger.info("RoutingTableProvider user callback time for cluster {}, took {} ms.", clusterName,
         (System.currentTimeMillis() - startTime));
@@ -587,10 +822,12 @@ public class RoutingTableProvider
 
   private class RouterUpdater extends ClusterEventProcessor {
     private final RoutingDataCache _dataCache;
+    private final Map<PropertyType, List<String>> _sourceDataTypeMap;
 
-    public RouterUpdater(String clusterName, PropertyType sourceDataType) {
+    public RouterUpdater(String clusterName, Map<PropertyType, List<String>> sourceDataTypeMap) {
       super(clusterName, "Helix-RouterUpdater-event_process");
-      _dataCache = new RoutingDataCache(clusterName, sourceDataType);
+      _sourceDataTypeMap = sourceDataTypeMap;
+      _dataCache = new RoutingDataCache(clusterName, _sourceDataTypeMap);
     }
 
     @Override
@@ -618,36 +855,49 @@ public class RoutingTableProvider
           throw new HelixException("HelixManager is null for router update event.");
         }
         if (!manager.isConnected()) {
-          logger.error(
-              String.format("HelixManager is not connected for router update event: %s", event));
+          logger.error(String.format("HelixManager is not connected for router update event: %s", event));
           throw new HelixException("HelixManager is not connected for router update event.");
         }
 
         long startTime = System.currentTimeMillis();
 
         _dataCache.refresh(manager.getHelixDataAccessor());
-        switch (_sourceDataType) {
-        case EXTERNALVIEW:
-          refresh(_dataCache.getExternalViews().values(),
-              _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
-          break;
-        case TARGETEXTERNALVIEW:
-          refresh(_dataCache.getTargetExternalViews().values(),
-              _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
-          break;
-        case CURRENTSTATES:
-          refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
-              _dataCache.getLiveInstances().values());
+        for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+          switch (propertyType) {
+          case EXTERNALVIEW: {
+            String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+            refreshExternalView(_dataCache.getExternalViews().values(),
+                _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(),
+                keyReference);
+          }
+            break;
+          case TARGETEXTERNALVIEW: {
+            String keyReference = generateReferenceKey(propertyType.name(), DEFAULT_STATE_TYPE);
+            refreshExternalView(_dataCache.getTargetExternalViews().values(),
+                _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(),
+                keyReference);
+          }
+              break;
+            case CUSTOMIZEDVIEW:
+              for (String customizedStateType : _sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) {
+                String keyReference = generateReferenceKey(propertyType.name(),  customizedStateType);
+                refreshCustomizedView(_dataCache.getCustomizedView(customizedStateType).values(),
+                    _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values(), keyReference);
+              }
+              break;
+            case CURRENTSTATES: {
+              String keyReference = generateReferenceKey(propertyType.name(),  DEFAULT_STATE_TYPE);;
+              refreshCurrentState(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
+                  _dataCache.getLiveInstances().values(), keyReference);
+              recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
+            }
+              break;
+            default:
+              logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", propertyType);
+          }
 
-          recordPropagationLatency(System.currentTimeMillis(),
-              _dataCache.getCurrentStateSnapshot());
-          break;
-        default:
-          logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
-              _sourceDataType);
+          _monitorMap.get(propertyType).increaseDataRefreshCounters(startTime);
         }
-
-        _monitor.increaseDataRefreshCounters(startTime);
       }
     }
 
@@ -673,10 +923,12 @@ public class RoutingTableProvider
               for (String partition : partitionStateEndTimes.keySet()) {
                 long endTime = partitionStateEndTimes.get(partition);
                 if (currentTime >= endTime) {
-                  _monitor.recordStatePropagationLatency(currentTime - endTime);
-                  logger.debug(
-                      "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
-                      key.toString(), partition, endTime, currentTime - endTime);
+                  for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
+                    _monitorMap.get(propertyType).recordStatePropagationLatency(currentTime - endTime);
+                    logger.debug(
+                        "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
+                        key.toString(), partition, endTime, currentTime - endTime);
+                  }
                 } else {
                   // Verbose log in case currentTime < endTime. This could be the case that Router
                   // clock is slower than the participant clock.
@@ -692,6 +944,7 @@ public class RoutingTableProvider
       }
     }
 
+
     public void queueEvent(NotificationContext context, ClusterEventType eventType,
         HelixConstants.ChangeType changeType) {
       ClusterEvent event = new ClusterEvent(_clusterName, eventType);
@@ -700,12 +953,17 @@ public class RoutingTableProvider
       event.addAttribute(AttributeName.helixmanager.name(), context.getManager());
       event.addAttribute(AttributeName.changeContext.name(), context);
       queueEvent(event);
-
-      _monitor.increaseCallbackCounters(_eventQueue.size());
+      // TODO: Split the monitor into. One is general router callback tracking. The other one is for
+      // each type of tracking.
+      // TODO: We may need to add more complexity to the customized view monitor for each state
+      // type.
+      for (PropertyType propertyType : _monitorMap.keySet()) {
+        _monitorMap.get(propertyType).increaseCallbackCounters(_eventQueue.size());
+      }
     }
   }
 
-  private class ListenerContext {
+  protected class ListenerContext {
     private Object _context;
 
     public ListenerContext(Object context) {
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
index abfc87b..e7cc6bb 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
@@ -20,9 +20,12 @@ package org.apache.helix.spectator;
  */
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -33,9 +36,13 @@ import org.apache.helix.model.LiveInstance;
  */
 public class RoutingTableSnapshot {
   private final RoutingTable _routingTable;
+  private final PropertyType _propertyType;
+  private final String _stateType;
 
   public RoutingTableSnapshot(RoutingTable routingTable) {
     _routingTable = routingTable;
+    _propertyType = routingTable.getPropertyType();
+    _stateType = routingTable.getStateType();
   }
 
   /**
@@ -145,4 +152,34 @@ public class RoutingTableSnapshot {
   public Collection<ExternalView> getExternalViews() {
     return _routingTable.getExternalViews();
   }
+
+  /**
+   * Returns a Collection of latest snapshot of CustomizedView. Note that if the RoutingTable is
+   * instantiated using CurrentStates, this Collection will be empty.
+   * @return
+   */
+  public Collection<CustomizedView> getCustomizeViews() {
+    if (_propertyType.equals(PropertyType.CUSTOMIZEDVIEW)){
+    CustomizedViewRoutingTable customizedViewRoutingTable =
+        (CustomizedViewRoutingTable) _routingTable;
+    return customizedViewRoutingTable.geCustomizedViews();
+    }
+    return Collections.emptySet();
+  }
+
+  /**
+   * Returns the PropertyType associated with this RoutingTableSnapshot
+   * @return
+   */
+  public PropertyType getPropertyType() {
+    return _propertyType;
+  }
+
+  /**
+   * Return the Type associated with the RoutingTableSnapshot (mainly used for CustomizedView)
+   * @return
+   */
+  public String getCustomizedStateType() {
+    return _stateType;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 7cda4a0..af816a1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -147,7 +147,7 @@ public class DummyClusterManager implements HelixManager {
   }
 
   @Override
-  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) throws Exception {
+  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) throws Exception {
     // TODO Auto-generated method stub
 
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index 6dc036a..d1a2b4c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -1,7 +1,9 @@
 package org.apache.helix.integration.spectator;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -9,15 +11,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.listeners.RoutingTableChangeListener;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
@@ -52,6 +61,9 @@ public class TestRoutingTableProvider extends ZkTestBase {
   private RoutingTableProvider _routingTableProvider_cs;
   private boolean _listenerTestResult = true;
 
+
+  private static final AtomicBoolean customizedViewChangeCalled = new AtomicBoolean(false);
+
   class MockRoutingTableChangeListener implements RoutingTableChangeListener {
     @Override
     public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
@@ -73,6 +85,19 @@ public class TestRoutingTableProvider extends ZkTestBase {
     }
   }
 
+  class MockRoutingTableProvider extends RoutingTableProvider {
+    MockRoutingTableProvider(HelixManager helixManager, Map<PropertyType, List<String>> sourceDataTypes) {
+      super(helixManager, sourceDataTypes);
+    }
+
+    @Override
+    public void onCustomizedViewChange(List<CustomizedView> customizedViewList,
+        NotificationContext changeContext){
+      customizedViewChangeCalled.getAndSet(true);
+      super.onCustomizedViewChange(customizedViewList, changeContext);
+    }
+  }
+
   @BeforeClass
   public void beforeClass() throws Exception {
     System.out.println(
@@ -212,6 +237,90 @@ public class TestRoutingTableProvider extends ZkTestBase {
         Sets.newSet(_instances.get(2)));
   }
 
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testShutdownInstance")
+  public void testExternalViewWithType() {
+    Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+    sourceDataTypes.put(PropertyType.EXTERNALVIEW, Arrays.asList("typeA"));
+    sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
+    RoutingTableProvider routingTableProvider;
+    routingTableProvider = new RoutingTableProvider(_spectator, sourceDataTypes);
+  }
+
+  @Test(dependsOnMethods = "testExternalViewWithType", expectedExceptions = HelixException.class)
+  public void testCustomizedViewWithoutType() {
+    RoutingTableProvider routingTableProvider;
+    routingTableProvider = new RoutingTableProvider(_spectator, PropertyType.CUSTOMIZEDVIEW);
+  }
+
+  @Test(dependsOnMethods = "testCustomizedViewWithoutType")
+  public void testCustomizedViewCorrectConstructor() throws Exception {
+    Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+    sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA"));
+    MockRoutingTableProvider routingTableProvider =
+        new MockRoutingTableProvider(_spectator, sourceDataTypes);
+
+    CustomizedView customizedView = new CustomizedView(TEST_DB);
+    customizedView.setState("p1", "h1", "testState");
+
+    // Clear the flag before writing to the Customized View Path
+    customizedViewChangeCalled.getAndSet(false);
+    String customizedViewPath = PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", TEST_DB);
+    _spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath,
+        customizedView.getRecord(), AccessOption.PERSISTENT);
+
+    boolean onCustomizedViewChangeCalled =
+        TestHelper.verify(() -> customizedViewChangeCalled.get(), TestHelper.WAIT_DURATION);
+    Assert.assertTrue(onCustomizedViewChangeCalled);
+
+    _spectator.getHelixDataAccessor().getBaseDataAccessor().remove(customizedViewPath,
+        AccessOption.PERSISTENT);
+    routingTableProvider.shutdown();
+  }
+
+  @Test(dependsOnMethods = "testCustomizedViewCorrectConstructor")
+  public void testGetRoutingTableSnapshot() {
+    Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
+    sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
+    sourceDataTypes.put(PropertyType.EXTERNALVIEW, Collections.emptyList());
+    RoutingTableProvider routingTableProvider =
+        new RoutingTableProvider(_spectator, sourceDataTypes);
+
+    CustomizedView customizedView1 = new CustomizedView("Resource1");
+    customizedView1.setState("p1", "h1", "testState1");
+    customizedView1.setState("p1", "h2", "testState1");
+    customizedView1.setState("p2", "h1", "testState2");
+    customizedView1.setState("p3", "h2", "testState3");
+    String customizedViewPath1 =
+        PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", "Resource1");
+
+    CustomizedView customizedView2 = new CustomizedView("Resource2");
+    customizedView2.setState("p1", "h3", "testState3");
+    customizedView2.setState("p1", "h4", "testState2");
+    String customizedViewPath2 =
+        PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeB", "Resource2");
+
+    _spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath1,
+        customizedView1.getRecord(), AccessOption.PERSISTENT);
+    _spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath2,
+        customizedView2.getRecord(), AccessOption.PERSISTENT);
+
+    RoutingTableSnapshot routingTableSnapshot =
+        routingTableProvider.getRoutingTableSnapshot(PropertyType.CUSTOMIZEDVIEW, "typeA");
+    Assert.assertEquals(routingTableSnapshot.getPropertyType(), PropertyType.CUSTOMIZEDVIEW);
+    Assert.assertEquals(routingTableSnapshot.getCustomizedStateType(), "typeA");
+    routingTableSnapshot =
+        routingTableProvider.getRoutingTableSnapshot(PropertyType.CUSTOMIZEDVIEW, "typeB");
+    Assert.assertEquals(routingTableSnapshot.getPropertyType(), PropertyType.CUSTOMIZEDVIEW);
+    Assert.assertEquals(routingTableSnapshot.getCustomizedStateType(), "typeB");
+
+    Map<String, Map<String, RoutingTableSnapshot>> routingTableSnapshots =
+        routingTableProvider.getRoutingTableSnapshots();
+    Assert.assertEquals(routingTableSnapshots.size(), 2);
+    Assert.assertEquals(routingTableSnapshots.get(PropertyType.CUSTOMIZEDVIEW.name()).size(), 2);
+    routingTableProvider.shutdown();
+  }
+
+
   private void validateRoutingTable(RoutingTableProvider routingTableProvider,
       Set<String> masterNodes, Set<String> slaveNodes) {
     IdealState is =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
index fdce32e..776fb95 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
@@ -2,6 +2,7 @@ package org.apache.helix.integration.spectator;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -16,6 +17,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -144,9 +146,10 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase {
     }
 
     @Override
-    protected synchronized void refresh(List<ExternalView> externalViewList,
-        NotificationContext changeContext) {
-      super.refresh(externalViewList, changeContext);
+    protected synchronized void refreshExternalView(Collection<ExternalView> externalViews,
+        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+        String referenceKey) {
+      super.refreshExternalView(externalViews, instanceConfigs, liveInstances, referenceKey);
       _refreshCount++;
       if (DEBUG) {
         print();
@@ -154,20 +157,11 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase {
     }
 
     @Override
-    protected synchronized void refresh(Collection<ExternalView> externalViews,
-        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
-      super.refresh(externalViews, instanceConfigs, liveInstances);
-      _refreshCount++;
-      if (DEBUG) {
-        print();
-      }
-    }
-
-    @Override
-    protected synchronized void refresh(
+    protected synchronized void refreshCurrentState(
         Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
-        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
-      super.refresh(currentStateMap, instanceConfigs, liveInstances);
+        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances,
+        String referenceKey) {
+      super.refreshCurrentState(currentStateMap, instanceConfigs, liveInstances, "Test");
       _refreshCount++;
       if (DEBUG) {
         print();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
index 4e95b39..1371a46 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
@@ -151,7 +151,7 @@ public class MockManager implements HelixManager {
   }
 
   @Override
-  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) {
+  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) {
     // TODO Auto-generated method stub
 
   }
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 3a9b617..fbd8599 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -183,7 +183,7 @@ public class MockZKHelixManager implements HelixManager {
   }
 
   @Override
-  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String aggregationType) throws Exception {
+  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) throws Exception {
     // TODO Auto-generated method stub
 
   }