You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/04/23 19:28:01 UTC

[helix] 15/23: Add new stages in Helix generic controller for customized view aggregation. (#851)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fc3dfdc9f7fd0ec2fbdd41846f847e948e651cdd
Author: zhangmeng916 <56...@users.noreply.github.com>
AuthorDate: Wed Mar 25 14:24:25 2020 -0700

    Add new stages in Helix generic controller for customized view aggregation. (#851)
    
    Add extra stages and pipelines in controller for customized state computation and customized view aggregation.
    Add refresh logic in resource data provider for customized view related data refresh.
    Add customized state event handling in CallbackHandler.
    Add integration test for customized view aggregation.
    Modify existing tests to verify new logic.
    
    Co-authored-by: Meng Zhang <mn...@mnzhang-mn1.linkedin.biz>
---
 .../main/java/org/apache/helix/HelixConstants.java |   1 +
 .../main/java/org/apache/helix/HelixManager.java   |  14 +-
 .../main/java/org/apache/helix/PropertyKey.java    |   9 +
 .../listeners/CustomizedStateChangeListener.java   |   2 +-
 ...java => CustomizedStateRootChangeListener.java} |  14 +-
 .../helix/controller/GenericHelixController.java   | 151 ++++++++++--
 .../helix/controller/stages/ClusterEventType.java  |   2 +
 .../apache/helix/manager/zk/CallbackHandler.java   | 200 +++++++++------
 .../helix/manager/zk/ControllerManagerHelper.java  |   2 +
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   2 +
 .../apache/helix/manager/zk/ZKHelixManager.java    |  20 +-
 .../test/java/org/apache/helix/TestZKCallback.java |  29 ++-
 .../controller/stages/DummyClusterManager.java     |  15 +-
 .../TestComputeAndCleanupCustomizedView.java       | 271 +++++++++++++++++++++
 .../integration/TestZkCallbackHandlerLeak.java     |  16 +-
 .../manager/TestParticipantManager.java            |   2 +-
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  |   2 +
 .../java/org/apache/helix/mock/MockHelixAdmin.java |   4 +
 .../java/org/apache/helix/mock/MockManager.java    |  15 +-
 .../helix/participant/MockZKHelixManager.java      |  15 +-
 20 files changed, 656 insertions(+), 130 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index c71f340..ca32c44 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -34,6 +34,7 @@ public interface HelixConstants {
     CLUSTER_CONFIG (PropertyType.CONFIGS),
     LIVE_INSTANCE (PropertyType.LIVEINSTANCES),
     CURRENT_STATE (PropertyType.CURRENTSTATES),
+    CUSTOMIZED_STATE_ROOT (PropertyType.CUSTOMIZEDSTATES),
     CUSTOMIZED_STATE (PropertyType.CUSTOMIZEDSTATES),
     MESSAGE (PropertyType.MESSAGES),
     EXTERNAL_VIEW (PropertyType.EXTERNALVIEW),
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 90aaa0c..9702015 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -26,10 +26,11 @@ import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
 import org.apache.helix.api.listeners.CustomizedStateChangeListener;
-import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
 import org.apache.helix.api.listeners.CustomizedViewChangeListener;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
@@ -227,6 +228,15 @@ public interface HelixManager {
       String sessionId) throws Exception;
 
   /**
+
+   * @see CustomizedStateRootChangeListener#onCustomizedStateRootChange(String, NotificationContext)
+   * @param listener
+   * @param instanceName
+   */
+  void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
+      String instanceName) throws Exception;
+
+  /**
    * @see CustomizedStateChangeListener#onCustomizedStateChange(String, List, NotificationContext)
    * @param listener
    * @param instanceName
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 41bc9cd..64d57f8 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -486,6 +486,15 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with the root of {@link CustomizedState} of an instance
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey customizedStatesRoot(String instanceName) {
+      return new PropertyKey(CUSTOMIZEDSTATES, CustomizedState.class, _clusterName, instanceName);
+    }
+
+    /**
      * Get a property key associated with {@link CustomizedState} of an instance and customized state
      * @param instanceName
      * @param customizedStateName
diff --git a/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java
index 0753f67..2a59065 100644
--- a/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java
+++ b/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java
@@ -24,7 +24,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.model.CustomizedState;
 
 /**
- * Interface to implement to respond to changes in the current state
+ * Interface to implement to respond to changes in the customized state
  */
 public interface CustomizedStateChangeListener {
 
diff --git a/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java b/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateRootChangeListener.java
similarity index 72%
copy from helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java
copy to helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateRootChangeListener.java
index 0753f67..593a975 100644
--- a/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateChangeListener.java
+++ b/helix-core/src/main/java/org/apache/helix/api/listeners/CustomizedStateRootChangeListener.java
@@ -20,20 +20,20 @@ package org.apache.helix.api.listeners;
  */
 
 import java.util.List;
+
 import org.apache.helix.NotificationContext;
-import org.apache.helix.model.CustomizedState;
 
 /**
- * Interface to implement to respond to changes in the current state
+ * Interface to implement to respond to changes in the root path of customized state
  */
-public interface CustomizedStateChangeListener {
+public interface CustomizedStateRootChangeListener {
 
   /**
-   * Invoked when customized state changes
+   * Invoked when root path customized state changes
    * @param instanceName name of the instance whose state changed
-   * @param customizedStatesInfo a list of the customized states
+   * @param customizedStateTypes the state types under the root
    * @param changeContext the change event and state
    */
-  void onCustomizedStateChange(String instanceName, List<CustomizedState> customizedStatesInfo,
-      NotificationContext changeContext);
+  void onCustomizedStateRootChange(String instanceName,
+      List<String> customizedStateTypes, NotificationContext changeContext);
 }
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8f7da2d..8644441 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -47,6 +48,9 @@ import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
@@ -69,6 +73,8 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CustomizedStateComputationStage;
+import org.apache.helix.controller.stages.CustomizedViewAggregationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
 import org.apache.helix.controller.stages.IntermediateStateCalcStage;
 import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
@@ -89,6 +95,8 @@ import org.apache.helix.controller.stages.task.TaskPersistDataStage;
 import org.apache.helix.controller.stages.task.TaskSchedulingStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -117,10 +125,12 @@ import static org.apache.helix.HelixConstants.ChangeType;
  * 4. select the messages that can be sent, needs messages and state model constraints <br>
  * 5. send messages
  */
-public class GenericHelixController implements IdealStateChangeListener,
-    LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener,
-    ClusterConfigChangeListener {
+public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener,
+                                               MessageListener, CurrentStateChangeListener,
+                                               CustomizedStateRootChangeListener,
+                                               CustomizedStateChangeListener,
+    CustomizedStateConfigChangeListener, ControllerChangeListener,
+    InstanceConfigChangeListener, ResourceConfigChangeListener, ClusterConfigChangeListener {
   private static final Logger logger =
       LoggerFactory.getLogger(GenericHelixController.class.getName());
 
@@ -132,6 +142,8 @@ public class GenericHelixController implements IdealStateChangeListener,
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
 
+  final AtomicReference<Set<String>> _lastSeenCustomizedStateTypes;
+
   // By default not reporting status until controller status is changed to activate
   // TODO This flag should be inside ClusterStatusMonitor. When false, no MBean registering.
   private boolean _isMonitoring = false;
@@ -440,6 +452,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       dataPreprocess.addStage(new ResourceComputationStage());
       dataPreprocess.addStage(new ResourceValidationStage());
       dataPreprocess.addStage(new CurrentStateComputationStage());
+      dataPreprocess.addStage(new CustomizedStateComputationStage());
       dataPreprocess.addStage(new TopStateHandoffReportStage());
 
       // rebalance pipeline
@@ -460,6 +473,10 @@ public class GenericHelixController implements IdealStateChangeListener,
       Pipeline externalViewPipeline = new Pipeline(pipelineName);
       externalViewPipeline.addStage(new ExternalViewComputeStage());
 
+      // customized state view generation
+      Pipeline customizedViewPipeline = new Pipeline(pipelineName);
+      customizedViewPipeline.addStage(new CustomizedViewAggregationStage());
+
       // backward compatibility check
       Pipeline liveInstancePipeline = new Pipeline(pipelineName);
       liveInstancePipeline.addStage(new CompatibilityCheckStage());
@@ -468,16 +485,38 @@ public class GenericHelixController implements IdealStateChangeListener,
       Pipeline autoExitMaintenancePipeline = new Pipeline(pipelineName);
       autoExitMaintenancePipeline.addStage(new MaintenanceRecoveryStage());
 
-      registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
-      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
-      registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
-      registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
-      registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
-      registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess,
+          externalViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
+          rebalancePipeline);
+      registry
+          .register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline,
+              dataPreprocess, rebalancePipeline);
+      registry
+          .register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline,
+              liveInstancePipeline, dataPreprocess, externalViewPipeline, customizedViewPipeline,
+              rebalancePipeline);
+      registry
+          .register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
+      registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline,
+          rebalancePipeline);
+      registry
+          .register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline,
+              dataPreprocess, externalViewPipeline, rebalancePipeline);
+      registry
+          .register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline,
+              dataPreprocess, externalViewPipeline, rebalancePipeline);
+      // TODO: We now include rebalance pipeline in customized state change for correctness.
+      // However, it is not efficient, and we should improve this by splitting the pipeline or
+      // controller roles to multiple hosts.
+      registry.register(ClusterEventType.CustomizedStateChange, dataRefresh, dataPreprocess,
+          customizedViewPipeline, rebalancePipeline);
+      registry.register(ClusterEventType.CustomizeStateConfigChange, dataRefresh, dataPreprocess,
+          customizedViewPipeline, rebalancePipeline);
       return registry;
     }
   }
@@ -546,6 +585,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     _taskRegistry = taskRegistry;
     _lastSeenInstances = new AtomicReference<>();
     _lastSeenSessions = new AtomicReference<>();
+    _lastSeenCustomizedStateTypes = new AtomicReference<>();
     _clusterName = clusterName;
     _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
     _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
@@ -830,6 +870,57 @@ public class GenericHelixController implements IdealStateChangeListener,
 
   @Override
   @PreFetch(enabled = false)
+  public void onCustomizedStateRootChange(String instanceName, List<String> customizedStateTypes,
+      NotificationContext changeContext) {
+    logger.info("START: GenericClusterController.onCustomizedStateRootChange()");
+    HelixManager manager = changeContext.getManager();
+    Builder keyBuilder = new Builder(manager.getClusterName());
+    if (customizedStateTypes.isEmpty()) {
+      customizedStateTypes = manager.getHelixDataAccessor()
+          .getChildNames(keyBuilder.customizedStatesRoot(instanceName));
+    }
+
+    // TODO: remove the synchronization here once we move this update into dataCache.
+    synchronized (_lastSeenCustomizedStateTypes) {
+      Set<String> lastSeenCustomizedStateTypes = _lastSeenCustomizedStateTypes.get();
+      for (String customizedState : customizedStateTypes) {
+        try {
+          if (lastSeenCustomizedStateTypes == null || !lastSeenCustomizedStateTypes
+              .contains(customizedState)) {
+            manager.addCustomizedStateChangeListener(this, instanceName, customizedState);
+            logger.info(
+                manager.getInstanceName() + " added customized state listener for " + instanceName
+                    + ", listener: " + this);
+          }
+        } catch (Exception e) {
+          logger.error("Fail to add customized state listener for instance: " + instanceName, e);
+        }
+      }
+
+      for (String previousCustomizedState : lastSeenCustomizedStateTypes) {
+        if (!customizedStateTypes.contains(previousCustomizedState)) {
+          manager.removeListener(keyBuilder.customizedStates(instanceName, previousCustomizedState),
+              this);
+        }
+      }
+
+      _lastSeenCustomizedStateTypes.set(new HashSet<>(customizedStateTypes));
+    }
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onCustomizedStateChange(String instanceName, List<CustomizedState> statesInfo,
+      NotificationContext changeContext) {
+    logger.info("START: GenericClusterController.onCustomizedStateChange()");
+    notifyCaches(changeContext, ChangeType.CUSTOMIZED_STATE);
+    pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, Collections
+        .<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
+    logger.info("END: GenericClusterController.onCustomizedStateChange()");
+  }
+
+  @Override
+  @PreFetch(enabled = false)
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage() for cluster " + _clusterName);
@@ -954,6 +1045,22 @@ public class GenericHelixController implements IdealStateChangeListener,
 
   @Override
   @PreFetch(enabled = false)
+  public void onCustomizedStateConfigChange(
+      CustomizedStateConfig customizedStateConfig,
+      NotificationContext context) {
+    logger.info(
+        "START: GenericClusterController.onCustomizedStateConfigChange() for cluster "
+            + _clusterName);
+    notifyCaches(context, ChangeType.CUSTOMIZED_STATE_CONFIG);
+    pushToEventQueues(ClusterEventType.CustomizeStateConfigChange, context,
+        Collections.<String, Object> emptyMap());
+    logger.info(
+        "END: GenericClusterController.onCustomizedStateConfigChange() for cluster "
+            + _clusterName);
+  }
+
+  @Override
+  @PreFetch(enabled = false)
   public void onClusterConfigChange(ClusterConfig clusterConfig,
       NotificationContext context) {
     logger.info(
@@ -1099,6 +1206,8 @@ public class GenericHelixController implements IdealStateChangeListener,
           if (!curInstances.containsKey(instance)) {
             // remove message listener for disconnected instances
             manager.removeListener(keyBuilder.messages(instance), this);
+            // remove customized state root listener for disconnected instances
+            manager.removeListener(keyBuilder.customizedStatesRoot(instance), this);
           }
         }
       }
@@ -1131,6 +1240,20 @@ public class GenericHelixController implements IdealStateChangeListener,
         }
       }
 
+        for (String instance : curInstances.keySet()) {
+          if (lastInstances == null || !lastInstances.containsKey(instance)) {
+            try {
+              manager.addCustomizedStateRootChangeListener(this, instance);
+              logger.info(manager.getInstanceName() + " added customized root change listener for"
+                  + " " + instance
+                  + ", listener: " + this);
+            } catch (Exception e) {
+              logger.error("Fail to add customized root change listener for instance: " + instance,
+                  e);
+            }
+        }
+      }
+
       // update last-seen
       _lastSeenInstances.set(curInstances);
       _lastSeenSessions.set(curSessions);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 2fd015e..b54f03b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -22,10 +22,12 @@ package org.apache.helix.controller.stages;
 public enum ClusterEventType {
   IdealStateChange,
   CurrentStateChange,
+  CustomizedStateChange,
   ConfigChange,
   ClusterConfigChange,
   ResourceConfigChange,
   InstanceConfigChange,
+  CustomizeStateConfigChange,
   LiveInstanceChange,
   MessageChange,
   ExternalViewChange,
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index dc91028..436873a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -20,6 +20,7 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,6 +46,9 @@ import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
 import org.apache.helix.api.listeners.CustomizedViewChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
@@ -57,6 +61,8 @@ import org.apache.helix.api.listeners.ScopedConfigChangeListener;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -79,8 +85,11 @@ import static org.apache.helix.HelixConstants.ChangeType.CLUSTER_CONFIG;
 import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
 import static org.apache.helix.HelixConstants.ChangeType.CONTROLLER;
 import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
-import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.CUSTOMIZED_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.CUSTOMIZED_STATE_CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CUSTOMIZED_STATE_ROOT;
 import static org.apache.helix.HelixConstants.ChangeType.CUSTOMIZED_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
 import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
 import static org.apache.helix.HelixConstants.ChangeType.INSTANCE_CONFIG;
 import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
@@ -89,7 +98,6 @@ import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
 import static org.apache.helix.HelixConstants.ChangeType.RESOURCE_CONFIG;
 import static org.apache.helix.HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW;
 
-
 @PreFetch(enabled = false)
 public class CallbackHandler implements IZkChildListener, IZkDataListener {
   private static Logger logger = LoggerFactory.getLogger(CallbackHandler.class);
@@ -123,7 +131,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   // TODO: make this be per _manager or per _listener instaed of per callbackHandler -- Lei
   private CallbackProcessor _batchCallbackProcessor;
   private boolean _watchChild = true; // Whether we should subscribe to the child znode's data
-                                      // change.
+  // change.
 
   // indicated whether this CallbackHandler is ready to serve event callback from ZkClient.
   private boolean _ready = false;
@@ -260,45 +268,53 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
     Class listenerClass = null;
     switch (_changeType) {
-    case IDEAL_STATE:
-      listenerClass = IdealStateChangeListener.class;
-      break;
-    case INSTANCE_CONFIG:
-      if (_listener instanceof ConfigChangeListener) {
+      case IDEAL_STATE:
+        listenerClass = IdealStateChangeListener.class;
+        break;
+      case INSTANCE_CONFIG:
+        if (_listener instanceof ConfigChangeListener) {
+          listenerClass = ConfigChangeListener.class;
+        } else if (_listener instanceof InstanceConfigChangeListener) {
+          listenerClass = InstanceConfigChangeListener.class;
+        }
+        break;
+      case CLUSTER_CONFIG:
+        listenerClass = ClusterConfigChangeListener.class;
+        break;
+      case RESOURCE_CONFIG:
+        listenerClass = ResourceConfigChangeListener.class;
+        break;
+      case CUSTOMIZED_STATE_CONFIG:
+        listenerClass = CustomizedStateConfigChangeListener.class;
+        break;
+      case CONFIG:
         listenerClass = ConfigChangeListener.class;
-      } else if (_listener instanceof InstanceConfigChangeListener) {
-        listenerClass = InstanceConfigChangeListener.class;
-      }
-      break;
-    case CLUSTER_CONFIG:
-      listenerClass = ClusterConfigChangeListener.class;
-      break;
-    case RESOURCE_CONFIG:
-      listenerClass = ResourceConfigChangeListener.class;
-      break;
-    case CONFIG:
-      listenerClass = ConfigChangeListener.class;
-      break;
-    case LIVE_INSTANCE:
-      listenerClass = LiveInstanceChangeListener.class;
-      break;
-    case CURRENT_STATE:
-      listenerClass = CurrentStateChangeListener.class;
-      ;
-      break;
-    case MESSAGE:
-    case MESSAGES_CONTROLLER:
-      listenerClass = MessageListener.class;
-      break;
-    case EXTERNAL_VIEW:
-    case TARGET_EXTERNAL_VIEW:
-      listenerClass = ExternalViewChangeListener.class;
-      break;
-    case CUSTOMIZED_VIEW:
-      listenerClass = CustomizedViewChangeListener.class;
-      break;
-    case CONTROLLER:
-      listenerClass = ControllerChangeListener.class;
+        break;
+      case LIVE_INSTANCE:
+        listenerClass = LiveInstanceChangeListener.class;
+        break;
+      case CURRENT_STATE:
+        listenerClass = CurrentStateChangeListener.class;
+        break;
+      case CUSTOMIZED_STATE_ROOT:
+        listenerClass = CustomizedStateRootChangeListener.class;
+        break;
+      case CUSTOMIZED_STATE:
+        listenerClass = CustomizedStateChangeListener.class;
+        break;
+      case MESSAGE:
+      case MESSAGES_CONTROLLER:
+        listenerClass = MessageListener.class;
+        break;
+      case EXTERNAL_VIEW:
+      case TARGET_EXTERNAL_VIEW:
+        listenerClass = ExternalViewChangeListener.class;
+        break;
+      case CUSTOMIZED_VIEW:
+        listenerClass = CustomizedViewChangeListener.class;
+        break;
+      case CONTROLLER:
+        listenerClass = ControllerChangeListener.class;
     }
 
     Method callbackMethod = listenerClass.getMethods()[0];
@@ -399,6 +415,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         List<ResourceConfig> configs = preFetch(_propertyKey);
         listener.onResourceConfigChange(configs, changeContext);
 
+      } else if (_changeType == CUSTOMIZED_STATE_CONFIG) {
+        CustomizedStateConfigChangeListener listener = (CustomizedStateConfigChangeListener) _listener;
+        CustomizedStateConfig config = null;
+        if (_preFetchEnabled) {
+          config = _accessor.getProperty(_propertyKey);
+        }
+        listener.onCustomizedStateConfigChange(config, changeContext);
+
       } else if (_changeType == CLUSTER_CONFIG) {
         ClusterConfigChangeListener listener = (ClusterConfigChangeListener) _listener;
         ClusterConfig config = null;
@@ -425,6 +449,25 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
         List<CurrentState> currentStates = preFetch(_propertyKey);
         currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
 
+      } else if (_changeType == CUSTOMIZED_STATE_ROOT) {
+        CustomizedStateRootChangeListener customizedStateRootChangeListener =
+            (CustomizedStateRootChangeListener) _listener;
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+        List<String> customizedStateTypes = new ArrayList<>();
+        if (_preFetchEnabled) {
+          customizedStateTypes =
+              _accessor.getChildNames(_accessor.keyBuilder().customizedStatesRoot(instanceName));
+        }
+        customizedStateRootChangeListener
+            .onCustomizedStateRootChange(instanceName, customizedStateTypes, changeContext);
+
+      } else if (_changeType == CUSTOMIZED_STATE) {
+        CustomizedStateChangeListener customizedStateChangeListener =
+            (CustomizedStateChangeListener) _listener;
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+        List<CustomizedState> customizedStates = preFetch(_propertyKey);
+        customizedStateChangeListener.onCustomizedStateChange(instanceName, customizedStates, changeContext);
+
       } else if (_changeType == MESSAGE) {
         MessageListener messageListener = (MessageListener) _listener;
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
@@ -533,49 +576,50 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
         try {
           switch (_changeType) {
-          case CURRENT_STATE:
-          case IDEAL_STATE:
-          case EXTERNAL_VIEW:
-          case CUSTOMIZED_VIEW:
-          case TARGET_EXTERNAL_VIEW: {
-            // check if bucketized
-            BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
-            List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
-            for (ZNRecord record : records) {
-              HelixProperty property = new HelixProperty(record);
-              String childPath = path + "/" + record.getId();
-
-              int bucketSize = property.getBucketSize();
-              if (bucketSize > 0) {
-                // subscribe both data-change and child-change on bucketized parent node
-                // data-change gives a delete-callback which is used to remove watch
-                subscribeChildChange(childPath, callbackType);
-                subscribeDataChange(childPath, callbackType);
-
-                // subscribe data-change on bucketized child
-                List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
-                if (bucketizedChildNames != null) {
-                  for (String bucketizedChildName : bucketizedChildNames) {
-                    String bucketizedChildPath = childPath + "/" + bucketizedChildName;
-                    subscribeDataChange(bucketizedChildPath, callbackType);
+            case CURRENT_STATE:
+            case CUSTOMIZED_STATE:
+            case IDEAL_STATE:
+            case EXTERNAL_VIEW:
+            case CUSTOMIZED_VIEW:
+            case TARGET_EXTERNAL_VIEW: {
+              // check if bucketized
+              BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
+              List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
+              for (ZNRecord record : records) {
+                HelixProperty property = new HelixProperty(record);
+                String childPath = path + "/" + record.getId();
+
+                int bucketSize = property.getBucketSize();
+                if (bucketSize > 0) {
+                  // subscribe both data-change and child-change on bucketized parent node
+                  // data-change gives a delete-callback which is used to remove watch
+                  subscribeChildChange(childPath, callbackType);
+                  subscribeDataChange(childPath, callbackType);
+
+                  // subscribe data-change on bucketized child
+                  List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
+                  if (bucketizedChildNames != null) {
+                    for (String bucketizedChildName : bucketizedChildNames) {
+                      String bucketizedChildPath = childPath + "/" + bucketizedChildName;
+                      subscribeDataChange(bucketizedChildPath, callbackType);
+                    }
                   }
+                } else {
+                  subscribeDataChange(childPath, callbackType);
                 }
-              } else {
-                subscribeDataChange(childPath, callbackType);
               }
+              break;
             }
-            break;
-          }
-          default: {
-            List<String> childNames = _zkClient.getChildren(path);
-            if (childNames != null) {
-              for (String childName : childNames) {
-                String childPath = path + "/" + childName;
-                subscribeDataChange(childPath, callbackType);
+            default: {
+              List<String> childNames = _zkClient.getChildren(path);
+              if (childNames != null) {
+                for (String childName : childNames) {
+                  String childPath = path + "/" + childName;
+                  subscribeDataChange(childPath, callbackType);
+                }
               }
+              break;
             }
-            break;
-          }
           }
         } catch (ZkNoNodeException e) {
           logger.warn(
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index 3b25f0f..ca7210c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -82,6 +82,7 @@ public class ControllerManagerHelper {
       _manager.addInstanceConfigChangeListener(controller);
       _manager.addResourceConfigChangeListener(controller);
       _manager.addClusterfigChangeListener(controller);
+      _manager.addCustomizedStateConfigChangeListener(controller);
       _manager.addLiveInstanceChangeListener(controller);
       _manager.addIdealStateChangeListener(controller);
     } catch (ZkInterruptedException e) {
@@ -99,6 +100,7 @@ public class ControllerManagerHelper {
      */
     _manager.removeListener(keyBuilder.idealStates(), controller);
     _manager.removeListener(keyBuilder.liveInstances(), controller);
+    _manager.removeListener(keyBuilder.customizedStateConfig(), controller);
     _manager.removeListener(keyBuilder.clusterConfig(), controller);
     _manager.removeListener(keyBuilder.resourceConfigs(), controller);
     _manager.removeListener(keyBuilder.instanceConfigs(), controller);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index af037f0..f100fec 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -798,6 +798,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     _zkClient.createPersistent(path);
     path = PropertyPathBuilder.resourceConfig(clusterName);
     _zkClient.createPersistent(path);
+    path = PropertyPathBuilder.customizedStateConfig(clusterName);
+    _zkClient.createPersistent(path);
     // PROPERTY STORE
     path = PropertyPathBuilder.propertyStore(clusterName);
     _zkClient.createPersistent(path);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 72928ef..ad56f95 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -57,9 +57,10 @@ import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.CustomizedViewChangeListener;
-import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
 import org.apache.helix.api.listeners.CustomizedStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
+import org.apache.helix.api.listeners.CustomizedViewChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
@@ -587,11 +588,18 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   @Override
+  public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
+      String instanceName) throws Exception {
+    addListener(listener, new Builder(_clusterName).customizedStatesRoot(instanceName),
+        ChangeType.CUSTOMIZED_STATE_ROOT, new EventType[]{EventType.NodeChildrenChanged});
+  }
+
+  @Override
   public void addCustomizedStateChangeListener(CustomizedStateChangeListener listener,
-      String instanceName, String customizedStateName) throws Exception {
-    addListener(listener, new Builder(_clusterName).customizedStates(instanceName, customizedStateName),
-        ChangeType.CUSTOMIZED_STATE, new EventType[] { EventType.NodeChildrenChanged
-        });
+      String instanceName, String customizedStateType) throws Exception {
+    addListener(listener,
+        new Builder(_clusterName).customizedStates(instanceName, customizedStateType),
+        ChangeType.CUSTOMIZED_STATE, new EventType[]{EventType.NodeChildrenChanged});
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index d9b6d8d..57c2579 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -21,16 +21,20 @@ package org.apache.helix;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -53,12 +57,17 @@ public class TestZKCallback extends ZkUnitTestBase {
   }
 
   public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
-      ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+                                               ConfigChangeListener, CurrentStateChangeListener,
+                                               CustomizedStateConfigChangeListener,
+                                               CustomizedStateRootChangeListener,
+                                               ExternalViewChangeListener,
       IdealStateChangeListener {
     boolean externalViewChangeReceived = false;
     boolean liveInstanceChangeReceived = false;
     boolean configChangeReceived = false;
     boolean currentStateChangeReceived = false;
+    boolean customizedStateConfigChangeReceived = false;
+    boolean customizedStateRootChangeReceived = false;
     boolean messageChangeReceived = false;
     boolean idealStateChangeReceived = false;
 
@@ -96,6 +105,8 @@ public class TestZKCallback extends ZkUnitTestBase {
       liveInstanceChangeReceived = false;
       configChangeReceived = false;
       currentStateChangeReceived = false;
+      customizedStateConfigChangeReceived = false;
+      customizedStateRootChangeReceived = false;
       messageChangeReceived = false;
       idealStateChangeReceived = false;
     }
@@ -105,6 +116,21 @@ public class TestZKCallback extends ZkUnitTestBase {
       // TODO Auto-generated method stub
       idealStateChangeReceived = true;
     }
+
+
+    @Override
+    public void onCustomizedStateRootChange(String instanceName, List<String> customizedStateTypes,
+        NotificationContext changeContext) {
+      // TODO Auto-generated method stub
+      customizedStateRootChangeReceived = true;
+    }
+
+    @Override
+    public void onCustomizedStateConfigChange(CustomizedStateConfig customizedStateConfig,
+        NotificationContext context) {
+      // TODO Auto-generated method stub
+      customizedStateConfigChangeReceived = true;
+    }
   }
 
   @Test()
@@ -122,6 +148,7 @@ public class TestZKCallback extends ZkUnitTestBase {
     testHelixManager.addMessageListener(testListener, "localhost_8900");
     testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
         testHelixManager.getSessionId());
+    testHelixManager.addCustomizedStateRootChangeListener(testListener, "localhost_8900");
     testHelixManager.addConfigChangeListener(testListener);
     testHelixManager.addIdealStateChangeListener(testListener);
     testHelixManager.addExternalViewChangeListener(testListener);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 655276b..83dd49e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.List;
 import java.util.Set;
 
 import org.apache.helix.ClusterMessagingService;
@@ -37,10 +36,11 @@ import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
 import org.apache.helix.api.listeners.CustomizedStateChangeListener;
-import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
 import org.apache.helix.api.listeners.CustomizedViewChangeListener;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
@@ -134,8 +134,15 @@ public class DummyClusterManager implements HelixManager {
   }
 
   @Override
+  public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
+      String instanceName) throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
   public void addCustomizedStateChangeListener(CustomizedStateChangeListener listener,
-      String instanceName, String sessionId) throws Exception {
+      String instanceName, String customizedStateType) throws Exception {
     // TODO Auto-generated method stub
 
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java b/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
new file mode 100644
index 0000000..6cdf72d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestComputeAndCleanupCustomizedView.java
@@ -0,0 +1,271 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CustomizedState;
+import org.apache.helix.model.CustomizedStateConfig;
+import org.apache.helix.model.CustomizedView;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test compute and clean customized view - if customized state is remove externally, controller
+ * should remove the
+ * orphan customized view
+ */
+public class TestComputeAndCleanupCustomizedView extends ZkUnitTestBase {
+
+  private final String RESOURCE_NAME = "TestDB0";
+  private final String PARTITION_NAME1 = "TestDB0_0";
+  private final String PARTITION_NAME2 = "TestDB0_1";
+  private final String CUSTOMIZED_STATE_NAME1 = "customizedState1";
+  private final String CUSTOMIZED_STATE_NAME2 = "customizedState2";
+  private final String INSTANCE_NAME1 = "localhost_12918";
+  private final String INSTANCE_NAME2 = "localhost_12919";
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // add CUSTOMIZED_STATE_NAME2 to aggregation enabled types
+    CustomizedStateConfig config = new CustomizedStateConfig();
+    List<String> aggregationEnabledTypes = new ArrayList<>();
+    aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME2);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    // set INSTANCE1 to "STARTED" for CUSTOMIZED_STATE_NAME1
+    CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME1, "STARTED");
+    accessor.setProperty(
+        keyBuilder.customizedState(INSTANCE_NAME1, CUSTOMIZED_STATE_NAME1, RESOURCE_NAME),
+        customizedState);
+
+    // verify the customized view is empty for CUSTOMIZED_STATE_NAME1
+    Boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+        if (customizedView == null) {
+          return true;
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result, String
+        .format("Customized view should not have state for" + " resource: %s, partition: %s",
+            RESOURCE_NAME, PARTITION_NAME1));
+
+    // add CUSTOMIZED_STATE_NAME1 to aggregation enabled types
+    aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME1);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    // verify the customized view should have "STARTED" for CUSTOMIZED_STATE_NAME1 for INSTANCE1,
+    // but no state for INSTANCE2
+    result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+        if (customizedView != null) {
+          Map<String, String> stateMap = customizedView.getRecord().getMapField(PARTITION_NAME1);
+          return (stateMap.get(INSTANCE_NAME1).equals("STARTED"));
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result, String.format(
+        "Customized view should have the state as STARTED for" + " instance: %s,"
+            + " resource: %s, partition: %s and state: %s", INSTANCE_NAME1, RESOURCE_NAME,
+        PARTITION_NAME1, CUSTOMIZED_STATE_NAME1));
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+        if (customizedView != null) {
+          Map<String, String> stateMap = customizedView.getRecord().getMapField(PARTITION_NAME1);
+          return !stateMap.containsKey(INSTANCE_NAME2);
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result, String.format("Customized view should not have state for instance: "
+            + "%s", INSTANCE_NAME2));
+
+    // set INSTANCE2 to "STARTED" for CUSTOMIZED_STATE_NAME1
+    customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME1, "STARTED");
+    accessor.setProperty(
+        keyBuilder.customizedState(INSTANCE_NAME2, CUSTOMIZED_STATE_NAME1, RESOURCE_NAME),
+        customizedState);
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+        if (customizedView != null) {
+          Map<String, String> stateMap = customizedView.getRecord().getMapField(PARTITION_NAME1);
+          if (stateMap.containsKey(INSTANCE_NAME2)) {
+            return (stateMap.get(INSTANCE_NAME1).equals("STARTED") && stateMap.get(INSTANCE_NAME2)
+                .equals("STARTED"));
+          }
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result, String.format(
+        "Customized view should have both instances state " + "as STARTED for"
+            + " resource: %s, partition: %s and state: %s", RESOURCE_NAME, PARTITION_NAME1,
+        CUSTOMIZED_STATE_NAME1));
+
+    // set INSTANCE2 to "STARTED" for CUSTOMIZED_STATE_NAME2
+    customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME2, "STARTED");
+    accessor.setProperty(
+        keyBuilder.customizedState(INSTANCE_NAME2, CUSTOMIZED_STATE_NAME2, RESOURCE_NAME),
+        customizedState);
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME2, RESOURCE_NAME));
+        if (customizedView != null) {
+          Map<String, String> stateMap = customizedView.getRecord().getMapField(PARTITION_NAME2);
+          return (stateMap.get(INSTANCE_NAME2).equals("STARTED"));
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result, String.format(
+        "Customized view should have state " + "as STARTED " + "for instance: %s, "
+            + " resource: %s, partition: %s and state: %s", INSTANCE_NAME2, RESOURCE_NAME,
+        PARTITION_NAME2, CUSTOMIZED_STATE_NAME2));
+
+    // disable controller
+    ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.enableCluster(clusterName, false);
+    ZkTestHelper.tryWaitZkEventsCleaned(controller.getZkClient());
+
+    // drop resource
+    admin.dropResource(clusterName, RESOURCE_NAME);
+
+    // delete customized state manually, controller shall remove customized view when cluster is
+    //enabled again
+
+    accessor.removeProperty(
+        keyBuilder.customizedState(INSTANCE_NAME1, CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+    accessor.removeProperty(
+        keyBuilder.currentState(INSTANCE_NAME2, CUSTOMIZED_STATE_NAME1, RESOURCE_NAME));
+
+    // re-enable controller shall remove orphan external view
+    // System.out.println("re-enabling controller");
+    admin.enableCluster(clusterName, true);
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        CustomizedView customizedView =
+            accessor.getProperty(keyBuilder.customizedView(CUSTOMIZED_STATE_NAME1));
+        if (customizedView == null) {
+          return true;
+        }
+        return false;
+      }
+    }, 12000);
+
+    Thread.sleep(50);
+    Assert.assertTrue(result, String
+        .format("customized view for should be null for  resource: %s, partition: %s and state: %s",
+            RESOURCE_NAME, PARTITION_NAME1, CUSTOMIZED_STATE_NAME1));
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+    TestHelper.dropCluster(clusterName, _gZkClient);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}
\ No newline at end of file
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 4682c52..7eb74a7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -98,7 +98,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         // System.out.println("controller watch paths: " + watchPaths);
 
         // where n is number of nodes and r is number of resources
-        return watchPaths.size() == (7 + r + ( 4 + r) * n);
+        return watchPaths.size() == (8 + r + ( 5 + r) * n);
       }
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect number of zk-watchers.");
@@ -123,8 +123,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     // printHandlers(participantManagerToExpire);
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 11,
-        "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant");
+    Assert.assertEquals(controllerHandlerNb, 14,
+        "HelixController should have 14 (8+3n) callback handlers for 2 (n) participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
@@ -135,7 +135,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     ZkTestHelper.expireSession(participantManagerToExpire.getZkClient());
     String newSessionId = participantManagerToExpire.getSessionId();
     System.out.println(
-        "Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: "
+        "Expired participant session. oldSessionId: " + oldSessionId + ", newSessionId: "
             + newSessionId);
 
     result =
@@ -153,7 +153,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         // System.out.println("controller watch paths after session expiry: " + watchPaths);
 
         // where n is number of nodes and r is number of resources
-        return watchPaths.size() == (7 + r + ( 4 + r) * n);
+        return watchPaths.size() == (8 + r + ( 5 + r) * n);
       }
     }, 2000);
     Assert.assertTrue(result, "Controller has incorrect number of zk-watchers after session expiry.");
@@ -247,8 +247,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 7 + 2 * n,
-        "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+    Assert.assertEquals(controllerHandlerNb, 8 + 3 * n,
+        "HelixController should have 14 (8+3n) callback handlers for 2 participant, but was "
             + controllerHandlerNb + ", " + printHandlers(controller));
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
@@ -276,7 +276,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         System.err.println("controller watch paths after session expiry: " + watchPaths.size());
 
         // where r is number of resources and n is number of nodes
-        int expected = (7 + r + (4 + r) * n);
+        int expected = (8 + r + (5 + r) * n);
         return watchPaths.size() == expected;
       }
     }, 2000);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index c7a3752..7922153 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -151,7 +151,7 @@ public class TestParticipantManager extends ZkTestBase {
     // check HelixCallback Monitor
     Set<ObjectInstance> objs =
         _server.queryMBeans(buildCallbackMonitorObjectName(type, clusterName, instanceName), null);
-    Assert.assertEquals(objs.size(), 16);
+    Assert.assertEquals(objs.size(), 17);
 
     // check HelixZkClient Monitors
     objs =
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 5bc6d4f..ee64524 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -99,8 +99,10 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
     HelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+    Assert.assertTrue(_gZkClient.exists(PropertyPathBuilder.customizedStateConfig(clusterName)));
     tool.addCluster(clusterName, true);
     Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient));
+    Assert.assertTrue(_gZkClient.exists(PropertyPathBuilder.customizedStateConfig(clusterName)));
 
     List<String> list = tool.getClusters();
     AssertJUnit.assertTrue(list.size() > 0);
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 29e64f6..18046c4 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -105,6 +105,10 @@ public class MockHelixAdmin implements HelixAdmin {
 
     path = PropertyPathBuilder.resourceConfig(clusterName);
     _baseDataAccessor.create(path, new ZNRecord(clusterName), 0);
+
+    path = PropertyPathBuilder.customizedStateConfig(clusterName);
+    _baseDataAccessor.create(path, new ZNRecord(clusterName), 0);
+
     // PROPERTY STORE
     path = PropertyPathBuilder.propertyStore(clusterName);
     _baseDataAccessor.create(path, new ZNRecord(clusterName), 0);
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
index 90c9bc1..51b932d 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
@@ -19,7 +19,6 @@ package org.apache.helix.mock;
  * under the License.
  */
 
-import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
@@ -39,9 +38,10 @@ import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.CustomizedViewChangeListener;
-import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
 import org.apache.helix.api.listeners.CustomizedStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
+import org.apache.helix.api.listeners.CustomizedViewChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
@@ -138,8 +138,15 @@ public class MockManager implements HelixManager {
   }
 
   @Override
+  public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
+      String instanceName) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
   public void addCustomizedStateChangeListener(CustomizedStateChangeListener listener,
-      String instanceName, String sessionId) {
+      String instanceName, String customizedStateType) {
     // TODO Auto-generated method stub
 
   }
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index b31ba82..e671482 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -19,7 +19,6 @@ package org.apache.helix.participant;
  * under the License.
  */
 
-import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
@@ -38,9 +37,10 @@ import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.CustomizedViewChangeListener;
-import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
 import org.apache.helix.api.listeners.CustomizedStateChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
+import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
+import org.apache.helix.api.listeners.CustomizedViewChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
@@ -170,8 +170,15 @@ public class MockZKHelixManager implements HelixManager {
   }
 
   @Override
+  public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
+      String instanceName) throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
   public void addCustomizedStateChangeListener(CustomizedStateChangeListener listener,
-      String instanceName, String sessionId) throws Exception {
+      String instanceName, String customizedStateType) throws Exception {
     // TODO Auto-generated method stub
 
   }