You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/08/14 03:43:52 UTC

[helix] branch master updated: Add latency metric support for customized view aggregation (#1187)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 918039b  Add latency metric support for customized view aggregation (#1187)
918039b is described below

commit 918039bf5cdf1ce40c8e47910738ba541d4958e6
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Thu Aug 13 20:43:41 2020 -0700

    Add latency metric support for customized view aggregation (#1187)
    
    Add support for latency metric reporting between the time when user updates customized state for a partition using CustomizedStateProvider, and the time this change shows up in aggregated customized view and written to zk.
---
 .../main/java/org/apache/helix/InstanceType.java   |  6 +-
 .../stages/CustomizedStateComputationStage.java    |  4 +-
 .../controller/stages/CustomizedStateOutput.java   | 43 +++++++---
 .../stages/CustomizedViewAggregationStage.java     | 90 ++++++++++++++++++--
 .../customizedstate/CustomizedStateProvider.java   |  3 +
 .../monitoring/mbeans/ClusterStatusMonitor.java    | 19 +++++
 .../monitoring/mbeans/CustomizedViewMonitor.java   | 83 ++++++++++++++++++
 .../controller/stages/TestCustomizedViewStage.java | 68 ++++++++++++++-
 .../integration/TestCustomizedViewAggregation.java |  6 +-
 .../paticipant/TestCustomizedStateUpdate.java      | 21 +++--
 .../mbeans/TestCustomizedViewMonitor.java          | 99 ++++++++++++++++++++++
 .../monitoring/mbeans/MonitorDomainNames.java      |  3 +-
 12 files changed, 406 insertions(+), 39 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/InstanceType.java b/helix-core/src/main/java/org/apache/helix/InstanceType.java
index ef77c59..8603c84 100644
--- a/helix-core/src/main/java/org/apache/helix/InstanceType.java
+++ b/helix-core/src/main/java/org/apache/helix/InstanceType.java
@@ -37,7 +37,8 @@ public enum InstanceType {
       MonitorDomainNames.ClusterStatus.name(),
       MonitorDomainNames.HelixZkClient.name(),
       MonitorDomainNames.HelixCallback.name(),
-      MonitorDomainNames.Rebalancer.name()
+      MonitorDomainNames.Rebalancer.name(),
+      MonitorDomainNames.AggregatedView.name()
   }),
 
   PARTICIPANT(new String[] {
@@ -53,7 +54,8 @@ public enum InstanceType {
       MonitorDomainNames.HelixCallback.name(),
       MonitorDomainNames.HelixThreadPoolExecutor.name(),
       MonitorDomainNames.CLMParticipantReport.name(),
-      MonitorDomainNames.Rebalancer.name()
+      MonitorDomainNames.Rebalancer.name(),
+      MonitorDomainNames.AggregatedView.name()
   }),
 
   SPECTATOR(new String[] {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
index cb5f1a3..8716997 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
@@ -32,7 +32,6 @@ import org.apache.helix.model.Resource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class CustomizedStateComputationStage extends AbstractBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(CustomizedStateComputationStage.class);
 
@@ -85,7 +84,8 @@ public class CustomizedStateComputationStage extends AbstractBaseStage {
         if (partition != null) {
           customizedStateOutput
               .setCustomizedState(customizedStateType, resourceName, partition, instanceName,
-                  customizedState.getState(partitionName));
+                  customizedState.getState(partitionName),
+                  customizedState.getStartTime(partitionName));
         }
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
index 3863743..6ba1a51 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
@@ -26,30 +26,35 @@ import java.util.Set;
 
 import org.apache.helix.model.Partition;
 
-
 public class CustomizedStateOutput {
   // stateType -> (resourceName -> (Partition -> (instanceName -> customizedState)))
   private final Map<String, Map<String, Map<Partition, Map<String, String>>>> _customizedStateMap;
+  // stateType -> (resourceName -> (Partition -> (instanceName -> startTime)))
+  private final Map<String, Map<String, Map<Partition, Map<String, Long>>>> _startTimeMap;
 
   public CustomizedStateOutput() {
     _customizedStateMap = new HashMap<>();
+    _startTimeMap = new HashMap<>();
   }
 
   public void setCustomizedState(String stateType, String resourceName, Partition partition,
+      String instanceName, String state, Long startTime) {
+    setCurrentState(stateType, resourceName, partition, instanceName, state);
+    setStartTime(stateType, resourceName, partition, instanceName, startTime);
+  }
+
+  private void setCurrentState(String stateType, String resourceName, Partition partition,
       String instanceName, String state) {
-    if (!_customizedStateMap.containsKey(stateType)) {
-      _customizedStateMap
-          .put(stateType, new HashMap<String, Map<Partition, Map<String, String>>>());
-    }
-    if (!_customizedStateMap.get(stateType).containsKey(resourceName)) {
-      _customizedStateMap.get(stateType)
-          .put(resourceName, new HashMap<Partition, Map<String, String>>());
-    }
-    if (!_customizedStateMap.get(stateType).get(resourceName).containsKey(partition)) {
-      _customizedStateMap.get(stateType).get(resourceName)
-          .put(partition, new HashMap<String, String>());
-    }
-    _customizedStateMap.get(stateType).get(resourceName).get(partition).put(instanceName, state);
+    _customizedStateMap.computeIfAbsent(stateType, k -> new HashMap<>())
+        .computeIfAbsent(resourceName, k -> new HashMap<>())
+        .computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, state);
+  }
+
+  private void setStartTime(String stateType, String resourceName, Partition partition,
+      String instanceName, Long startTime) {
+    _startTimeMap.computeIfAbsent(stateType, k -> new HashMap<>())
+        .computeIfAbsent(resourceName, k -> new HashMap<>())
+        .computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, startTime);
   }
 
   /**
@@ -65,6 +70,10 @@ public class CustomizedStateOutput {
     return Collections.emptyMap();
   }
 
+  private Map<String, Map<Partition, Map<String, Long>>> getStartTimeMap(String stateType) {
+    return _startTimeMap.getOrDefault(stateType, Collections.emptyMap());
+  }
+
   /**
    * given (stateType, resource), returns (partition -> instance-> customizedState) map
    * @param stateType
@@ -114,6 +123,12 @@ public class CustomizedStateOutput {
     return null;
   }
 
+  public Map<Partition, Map<String, Long>> getResourceStartTimeMap(String stateType,
+      String resourceName) {
+    return Collections.unmodifiableMap(
+        getStartTimeMap(stateType).getOrDefault(resourceName, Collections.emptyMap()));
+  }
+
   public Set<String> getAllStateTypes() {
     return _customizedStateMap.keySet();
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
index 165a0af..1582630 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -40,10 +41,11 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.CustomizedView;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(CustomizedViewAggregationStage.class);
 
@@ -78,7 +80,9 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
     Set<String> customizedTypesToRemove = new HashSet<>();
     for (String stateType : customizedViewCacheMap.keySet()) {
       if (!customizedStateOutput.getAllStateTypes().contains(stateType)) {
-        LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType);
+        LogUtil.logInfo(LOG, _eventId,
+            "Remove customizedView for stateType: " + stateType + ", on cluster " + event
+                .getClusterName());
         dataAccessor.removeProperty(keyBuilder.customizedView(stateType));
         customizedTypesToRemove.add(stateType);
       }
@@ -88,6 +92,7 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
     // update customized view
     for (String stateType : customizedStateOutput.getAllStateTypes()) {
       List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
+      Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps = new HashMap<>();
       Map<String, CustomizedView> curCustomizedViews = new HashMap<>();
       CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType);
       if (customizedViewCache != null) {
@@ -97,13 +102,13 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
       for (Resource resource : resourceMap.values()) {
         try {
           computeCustomizedStateView(resource, stateType, customizedStateOutput, curCustomizedViews,
-              updatedCustomizedViews);
+              updatedCustomizedViews, updatedStartTimestamps);
         } catch (HelixException ex) {
           LogUtil.logError(LOG, _eventId,
-              "Failed to calculate customized view for resource " + resource.getResourceName(), ex);
+              "Failed to calculate customized view for resource " + resource.getResourceName()
+                  + ", on cluster " + event.getClusterName(), ex);
         }
       }
-
       List<PropertyKey> keys = new ArrayList<>();
       for (Iterator<CustomizedView> it = updatedCustomizedViews.iterator(); it.hasNext(); ) {
         CustomizedView view = it.next();
@@ -112,15 +117,19 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
       }
       // add/update customized-views from zk and cache
       if (updatedCustomizedViews.size() > 0) {
-        dataAccessor.setChildren(keys, updatedCustomizedViews);
+        boolean[] success = dataAccessor.setChildren(keys, updatedCustomizedViews);
+        reportLatency(event, updatedCustomizedViews, curCustomizedViews, updatedStartTimestamps,
+            success);
         cache.updateCustomizedViews(stateType, updatedCustomizedViews);
       }
 
       // remove stale customized views from zk and cache
       List<String> customizedViewToRemove = new ArrayList<>();
       for (String resourceName : curCustomizedViews.keySet()) {
-        if (!resourceMap.keySet().contains(resourceName)) {
-          LogUtil.logInfo(LOG, _eventId, "Remove customizedView for resource: " + resourceName);
+        if (!resourceMap.containsKey(resourceName)) {
+          LogUtil.logInfo(LOG, _eventId,
+              "Remove customizedView for resource: " + resourceName + ", on cluster " + event
+                  .getClusterName());
           dataAccessor.removeProperty(keyBuilder.customizedView(stateType, resourceName));
           customizedViewToRemove.add(resourceName);
         }
@@ -132,7 +141,8 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
   private void computeCustomizedStateView(final Resource resource, final String stateType,
       CustomizedStateOutput customizedStateOutput,
       final Map<String, CustomizedView> curCustomizedViews,
-      List<CustomizedView> updatedCustomizedViews) {
+      List<CustomizedView> updatedCustomizedViews,
+      Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps) {
     String resourceName = resource.getResourceName();
     CustomizedView view = new CustomizedView(resource.getResourceName());
 
@@ -152,6 +162,68 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
     if (curCustomizedView == null || !curCustomizedView.getRecord().equals(view.getRecord())) {
       // Add customized view to the list which will be written to ZK later.
       updatedCustomizedViews.add(view);
+      updatedStartTimestamps.put(resourceName,
+          customizedStateOutput.getResourceStartTimeMap(stateType, resourceName));
+    }
+  }
+
+  private void reportLatency(ClusterEvent event, List<CustomizedView> updatedCustomizedViews,
+      Map<String, CustomizedView> curCustomizedViews,
+      Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps,
+      boolean[] updateSuccess) {
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    if (clusterStatusMonitor == null) {
+      return;
+    }
+    long curTime = System.currentTimeMillis();
+    String clusterName = event.getClusterName();
+    CustomizedViewMonitor customizedViewMonitor =
+        clusterStatusMonitor.getOrCreateCustomizedViewMonitor(clusterName);
+
+    for (int i = 0; i < updatedCustomizedViews.size(); i++) {
+      CustomizedView newCV = updatedCustomizedViews.get(i);
+      String resourceName = newCV.getResourceName();
+
+      if (!updateSuccess[i]) {
+        LOG.warn("Customized views are not updated successfully for resource {} on cluster {}",
+            resourceName, clusterName);
+        continue;
+      }
+
+      CustomizedView oldCV =
+          curCustomizedViews.getOrDefault(resourceName, new CustomizedView(resourceName));
+
+      Map<String, Map<String, String>> newPartitionStateMaps = newCV.getRecord().getMapFields();
+      Map<String, Map<String, String>> oldPartitionStateMaps = oldCV.getRecord().getMapFields();
+      Map<Partition, Map<String, Long>> partitionStartTimeMaps =
+          updatedStartTimestamps.getOrDefault(resourceName, Collections.emptyMap());
+
+      for (Map.Entry<String, Map<String, String>> partitionStateMapEntry : newPartitionStateMaps
+          .entrySet()) {
+        String partitionName = partitionStateMapEntry.getKey();
+        Map<String, String> newStateMap = partitionStateMapEntry.getValue();
+        Map<String, String> oldStateMap =
+            oldPartitionStateMaps.getOrDefault(partitionName, Collections.emptyMap());
+        if (!newStateMap.equals(oldStateMap)) {
+          Map<String, Long> partitionStartTimeMap = partitionStartTimeMaps
+              .getOrDefault(new Partition(partitionName), Collections.emptyMap());
+
+          for (Map.Entry<String, String> stateMapEntry : newStateMap.entrySet()) {
+            String instanceName = stateMapEntry.getKey();
+            if (!stateMapEntry.getValue().equals(oldStateMap.get(instanceName))) {
+              long timestamp = partitionStartTimeMap.get(instanceName);
+              if (timestamp > 0) {
+                customizedViewMonitor.recordUpdateToAggregationLatency(curTime - timestamp);
+              } else {
+                LOG.warn(
+                    "Failed to find customized state update time stamp for resource {} partition {}, instance {}, on cluster {} the number should be positive.",
+                    resourceName, partitionName, instanceName, clusterName);
+              }
+            }
+          }
+        }
+      }
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
index 7684e05..0abf4a1 100644
--- a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
@@ -68,6 +68,9 @@ public class CustomizedStateProvider {
     PropertyKey propertyKey =
         keyBuilder.customizedState(_instanceName, customizedStateName, resourceName);
     ZNRecord record = new ZNRecord(resourceName);
+    // Update start time field for monitoring purpose, updated value is current time
+    customizedStateMap.put(CustomizedState.CustomizedStateProperty.START_TIME.name(),
+        String.valueOf(System.currentTimeMillis()));
     record.setMapField(partitionName, customizedStateMap);
     if (!_helixDataAccessor.updateProperty(propertyKey, new CustomizedState(record))) {
       throw new HelixException(String
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index da49041..2242b12 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -96,6 +96,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMonitorMap =
       new ConcurrentHashMap<>();
 
+  private CustomizedViewMonitor _customizedViewMonitor;
+
   /**
    * PerInstanceResource monitor map: beanName->monitor
    */
@@ -332,6 +334,23 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
+  /**
+   * Lazy initialization of customized view monitor
+   * @param clusterName the cluster name of the cluster to be monitored
+   * @return a customized view monitor instance
+   */
+  public synchronized CustomizedViewMonitor getOrCreateCustomizedViewMonitor(String clusterName) {
+    if (_customizedViewMonitor == null) {
+      _customizedViewMonitor = new CustomizedViewMonitor(clusterName);
+      try {
+        _customizedViewMonitor.register();
+      } catch (JMException e) {
+        LOG.error("Failed to register CustomizedViewMonitorMBean for cluster " + _clusterName, e);
+      }
+    }
+    return _customizedViewMonitor;
+  }
+
   private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
     try {
       if (!_clusterEventMonitorMap.containsKey(phase)) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
new file mode 100644
index 0000000..b038078
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
@@ -0,0 +1,83 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CustomizedViewMonitor extends DynamicMBeanProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(CustomizedViewMonitor.class);
+
+  private static final String MBEAN_DESCRIPTION = "Helix Customized View Aggregation Monitor";
+  private final String _clusterName;
+  private final String _sensorName;
+  private HistogramDynamicMetric _updateToAggregationLatencyGauge;
+  public static final String UPDATE_TO_AGGREGATION_LATENCY_GAUGE =
+      "UpdateToAggregationLatencyGauge";
+  private ClusterStatusMonitor _clusterStatusMonitor;
+  private static final String TYPE_KEY = "Type";
+  private static final String CLUSTER_KEY = "Cluster";
+  private static final String CUSTOMIZED_VIEW = "CustomizedView";
+
+  public CustomizedViewMonitor(String clusterName) {
+    _clusterName = clusterName;
+    _sensorName = String
+        .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
+            CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName);
+    _updateToAggregationLatencyGauge =
+        new HistogramDynamicMetric(UPDATE_TO_AGGREGATION_LATENCY_GAUGE, new Histogram(
+            new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_updateToAggregationLatencyGauge);
+    doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanObjectName());
+    return this;
+  }
+
+  private ObjectName getMBeanObjectName() throws MalformedObjectNameException {
+    return new ObjectName(String
+        .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
+            CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName));
+  }
+
+  @Override
+  public String getSensorName() {
+    return _sensorName;
+  }
+
+  public void recordUpdateToAggregationLatency(long latency) {
+    _updateToAggregationLatencyGauge.updateValue(latency);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index bcd5f75..0f14bc2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -21,6 +21,9 @@ package org.apache.helix.controller.stages;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.management.ObjectName;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -34,10 +37,12 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.CustomizedStateConfig;
 import org.apache.helix.model.CustomizedView;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-
 public class TestCustomizedViewStage extends ZkUnitTestBase {
   private final String RESOURCE_NAME = "TestDB";
   private final String PARTITION_NAME = "TestDB_0";
@@ -112,4 +117,65 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
     deleteLiveInstances(clusterName);
     deleteCluster(clusterName);
   }
+
+  @Test
+  public void testLatencyMetricReporting() throws Exception {
+    String clusterName = "CLUSTER_" + TestHelper.getTestMethodName();
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+    // ideal state: node0 is MASTER, node1 is SLAVE
+    // replica=2 means 1 master and 1 slave
+    setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+    setupLiveInstances(clusterName, new int[]{0, 1});
+    setupStateModel(clusterName);
+
+    ClusterStatusMonitor clusterStatusMonitor = new ClusterStatusMonitor(clusterName);
+    ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.Unknown);
+    ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    cache.setAsyncTasksThreadPool(executor);
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), clusterStatusMonitor);
+
+    CustomizedStateConfig config = new CustomizedStateConfig();
+    List<String> aggregationEnabledTypes = new ArrayList<>();
+    aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME, "STATE");
+    customizedState.setStartTime(PARTITION_NAME, 1);
+    accessor.setProperty(
+        keyBuilder.customizedState(INSTANCE_NAME, CUSTOMIZED_STATE_NAME, RESOURCE_NAME),
+        customizedState);
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh);
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CustomizedStateComputationStage());
+    runStage(event, new CustomizedViewAggregationStage());
+
+    ObjectName objectName = new ObjectName(String
+        .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
+            "CustomizedView", "Cluster", clusterName));
+    Assert.assertNotNull(
+        ((ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()))
+            .getOrCreateCustomizedViewMonitor(clusterName));
+
+    TestHelper.verify(() -> (long) _server.getAttribute(objectName,
+        CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0,
+        TestHelper.WAIT_DURATION);
+
+    deleteLiveInstances(clusterName);
+    deleteCluster(clusterName);
+    executor.shutdownNow();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java
index 4eed9cc..258e4c4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java
@@ -222,7 +222,7 @@ public class TestCustomizedViewAggregation extends ZkUnitTestBase {
             Map<String, Map<String, String>> localPerResourceCustomizedView = localSnapshot
                 .getOrDefault(resourceCustomizedView.getResourceName(), Maps.newHashMap());
 
-            if (resourceStateMap.isEmpty() && !localPerResourceCustomizedView.isEmpty()) {
+            if (resourceStateMap.size() != localPerResourceCustomizedView.size()) {
               return false;
             }
 
@@ -252,7 +252,7 @@ public class TestCustomizedViewAggregation extends ZkUnitTestBase {
         }
         return true;
       }
-    }, 12000);
+    }, TestHelper.WAIT_DURATION);
 
     Assert.assertTrue(result);
   }
@@ -422,6 +422,8 @@ public class TestCustomizedViewAggregation extends ZkUnitTestBase {
     Map<String, String> perPartitionCustomizedState = _customizedStateProvider_participant1
         .getPerPartitionCustomizedState(CustomizedStateType.TYPE_A.name(), RESOURCE_1,
             PARTITION_10);
+    // Remove this field because it's automatically updated for monitoring purpose and we don't need to compare it
+    perPartitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
     Map<String, String> actualPerPartitionCustomizedState = Maps.newHashMap();
     actualPerPartitionCustomizedState
         .put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index 0102375..ed1338a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -87,7 +87,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     Map<String, Map<String, String>> mapView = customizedState.getRecord().getMapFields();
     Assert.assertEquals(mapView.keySet().size(), 1);
     Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
-    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+    // Updated 2 fields + START_TIME field is automatically updated for monitoring
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3);
     Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "STARTED");
     Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
 
@@ -103,7 +104,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     mapView = customizedState.getRecord().getMapFields();
     Assert.assertEquals(mapView.keySet().size(), 1);
     Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
-    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3);
     Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
     Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
 
@@ -120,7 +121,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     mapView = customizedState.getRecord().getMapFields();
     Assert.assertEquals(mapView.keySet().size(), 1);
     Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
-    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+    Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3);
     Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
     Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "COMPLETED");
 
@@ -141,13 +142,13 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
 
     Map<String, String> partitionMap1 = _mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
-    Assert.assertEquals(partitionMap1.keySet().size(), 2);
+    Assert.assertEquals(partitionMap1.keySet().size(), 3);
     Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
     Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED");
 
     Map<String, String> partitionMap2 = _mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
-    Assert.assertEquals(partitionMap2.keySet().size(), 2);
+    Assert.assertEquals(partitionMap2.keySet().size(), 3);
     Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED");
     Assert.assertEquals(partitionMap2.get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
 
@@ -170,6 +171,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     // get customized state
     CustomizedState customizedState =
         _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+    // START_TIME field is automatically updated for monitoring
     Assert.assertEquals(
         customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE)
             .size(), 1);
@@ -178,8 +180,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     Assert.assertEquals(customizedState
         .getPartitionStateMap(CustomizedState.CustomizedStateProperty.PREVIOUS_STATE), map);
     Assert.assertEquals(
-        customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME),
-        map);
+        customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME).size(),
+        1);
     Assert.assertEquals(
         customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.END_TIME),
         map);
@@ -192,6 +194,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), PARTITION_STATE);
     Map<String, String> partitionCustomizedState = _mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+    partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
     Assert.assertEquals(partitionCustomizedState, map);
     Assert.assertNull(_mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
@@ -218,6 +221,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), null);
     Map<String, String> partitionCustomizedState = _mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+    partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
     Assert.assertEquals(partitionCustomizedState, map);
     Assert.assertNull(_mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
@@ -244,7 +248,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     // get per partition customized state
     Map<String, String> partitionCustomizedState = _mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
-    Assert.assertEquals(partitionCustomizedState.size(), 0);
+    // START_TIME field is automatically updated for monitoring
+    Assert.assertEquals(partitionCustomizedState.size(), 1);
     Assert.assertNull(_mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
   }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java
new file mode 100644
index 0000000..248ed56
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java
@@ -0,0 +1,99 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Stack;
+import javax.management.JMException;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestCustomizedViewMonitor {
+  private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+  private final String TEST_CLUSTER = "test_cluster";
+  private final String MAX_SUFFIX = ".Max";
+  private final String MEAN_SUFFIX = ".Mean";
+
+  private ObjectName buildObjectName(int duplicateNum) throws MalformedObjectNameException {
+    ObjectName objectName = new ObjectName(String
+        .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
+            "CustomizedView", "Cluster", TEST_CLUSTER));
+    if (duplicateNum == 0) {
+      return objectName;
+    } else {
+      return new ObjectName(
+          String.format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE, duplicateNum));
+    }
+  }
+
+  @Test
+  public void testMBeanRegistration() throws JMException, IOException {
+    int numOfMonitors = 5;
+
+    Stack<CustomizedViewMonitor> monitors = new Stack<>();
+    for (int i = 0; i < numOfMonitors; i++) {
+      CustomizedViewMonitor monitor = new CustomizedViewMonitor(TEST_CLUSTER);
+      monitor.register();
+      monitors.push(monitor);
+    }
+
+    for (int i = 0; i < numOfMonitors; i++) {
+      if (i == numOfMonitors - 1) {
+        Assert.assertTrue(_server.isRegistered(buildObjectName(0)));
+      } else {
+        Assert.assertTrue(_server.isRegistered(buildObjectName(numOfMonitors - i - 1)));
+      }
+      CustomizedViewMonitor monitor = monitors.pop();
+      assert monitor != null;
+      monitor.unregister();
+      if (i == numOfMonitors - 1) {
+        Assert.assertFalse(_server.isRegistered(buildObjectName(0)));
+      } else {
+        Assert.assertFalse(_server.isRegistered(buildObjectName(numOfMonitors - i - 1)));
+      }
+    }
+  }
+
+  @Test
+  public void testMetricInitialization() throws Exception {
+    CustomizedViewMonitor monitor = new CustomizedViewMonitor(TEST_CLUSTER);
+    monitor.register();
+    int sum = 0;
+    for (int i = 0; i < 10; i++) {
+      monitor.recordUpdateToAggregationLatency(i);
+      sum += i;
+      int expectedMax = i;
+      double expectedMean = sum / (i + 1.0);
+      Assert.assertTrue(TestHelper.verify(() -> (long) _server.getAttribute(buildObjectName(0),
+          CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + MAX_SUFFIX) == expectedMax,
+          TestHelper.WAIT_DURATION));
+      Assert.assertTrue(TestHelper.verify(() -> (double) _server.getAttribute(buildObjectName(0),
+          CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + MEAN_SUFFIX) == expectedMean,
+          TestHelper.WAIT_DURATION));
+    }
+    monitor.unregister();
+  }
+}
diff --git a/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
index a499d79..7a94e7f 100644
--- a/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
@@ -29,5 +29,6 @@ public enum MonitorDomainNames {
   HelixCallback,
   RoutingTableProvider,
   CLMParticipantReport,
-  Rebalancer
+  Rebalancer,
+  AggregatedView
 }