You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/08/14 03:43:52 UTC
[helix] branch master updated: Add latency metric support for
customized view aggregation (#1187)
This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 918039b Add latency metric support for customized view aggregation (#1187)
918039b is described below
commit 918039bf5cdf1ce40c8e47910738ba541d4958e6
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Thu Aug 13 20:43:41 2020 -0700
Add latency metric support for customized view aggregation (#1187)
Add support for latency metric reporting between the time when user updates customized state for a partition using CustomizedStateProvider, and the time this change shows up in aggregated customized view and written to zk.
---
.../main/java/org/apache/helix/InstanceType.java | 6 +-
.../stages/CustomizedStateComputationStage.java | 4 +-
.../controller/stages/CustomizedStateOutput.java | 43 +++++++---
.../stages/CustomizedViewAggregationStage.java | 90 ++++++++++++++++++--
.../customizedstate/CustomizedStateProvider.java | 3 +
.../monitoring/mbeans/ClusterStatusMonitor.java | 19 +++++
.../monitoring/mbeans/CustomizedViewMonitor.java | 83 ++++++++++++++++++
.../controller/stages/TestCustomizedViewStage.java | 68 ++++++++++++++-
.../integration/TestCustomizedViewAggregation.java | 6 +-
.../paticipant/TestCustomizedStateUpdate.java | 21 +++--
.../mbeans/TestCustomizedViewMonitor.java | 99 ++++++++++++++++++++++
.../monitoring/mbeans/MonitorDomainNames.java | 3 +-
12 files changed, 406 insertions(+), 39 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/InstanceType.java b/helix-core/src/main/java/org/apache/helix/InstanceType.java
index ef77c59..8603c84 100644
--- a/helix-core/src/main/java/org/apache/helix/InstanceType.java
+++ b/helix-core/src/main/java/org/apache/helix/InstanceType.java
@@ -37,7 +37,8 @@ public enum InstanceType {
MonitorDomainNames.ClusterStatus.name(),
MonitorDomainNames.HelixZkClient.name(),
MonitorDomainNames.HelixCallback.name(),
- MonitorDomainNames.Rebalancer.name()
+ MonitorDomainNames.Rebalancer.name(),
+ MonitorDomainNames.AggregatedView.name()
}),
PARTICIPANT(new String[] {
@@ -53,7 +54,8 @@ public enum InstanceType {
MonitorDomainNames.HelixCallback.name(),
MonitorDomainNames.HelixThreadPoolExecutor.name(),
MonitorDomainNames.CLMParticipantReport.name(),
- MonitorDomainNames.Rebalancer.name()
+ MonitorDomainNames.Rebalancer.name(),
+ MonitorDomainNames.AggregatedView.name()
}),
SPECTATOR(new String[] {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
index cb5f1a3..8716997 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java
@@ -32,7 +32,6 @@ import org.apache.helix.model.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class CustomizedStateComputationStage extends AbstractBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CustomizedStateComputationStage.class);
@@ -85,7 +84,8 @@ public class CustomizedStateComputationStage extends AbstractBaseStage {
if (partition != null) {
customizedStateOutput
.setCustomizedState(customizedStateType, resourceName, partition, instanceName,
- customizedState.getState(partitionName));
+ customizedState.getState(partitionName),
+ customizedState.getStartTime(partitionName));
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
index 3863743..6ba1a51 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java
@@ -26,30 +26,35 @@ import java.util.Set;
import org.apache.helix.model.Partition;
-
public class CustomizedStateOutput {
// stateType -> (resourceName -> (Partition -> (instanceName -> customizedState)))
private final Map<String, Map<String, Map<Partition, Map<String, String>>>> _customizedStateMap;
+ // stateType -> (resourceName -> (Partition -> (instanceName -> startTime)))
+ private final Map<String, Map<String, Map<Partition, Map<String, Long>>>> _startTimeMap;
public CustomizedStateOutput() {
_customizedStateMap = new HashMap<>();
+ _startTimeMap = new HashMap<>();
}
public void setCustomizedState(String stateType, String resourceName, Partition partition,
+ String instanceName, String state, Long startTime) {
+ setCurrentState(stateType, resourceName, partition, instanceName, state);
+ setStartTime(stateType, resourceName, partition, instanceName, startTime);
+ }
+
+ private void setCurrentState(String stateType, String resourceName, Partition partition,
String instanceName, String state) {
- if (!_customizedStateMap.containsKey(stateType)) {
- _customizedStateMap
- .put(stateType, new HashMap<String, Map<Partition, Map<String, String>>>());
- }
- if (!_customizedStateMap.get(stateType).containsKey(resourceName)) {
- _customizedStateMap.get(stateType)
- .put(resourceName, new HashMap<Partition, Map<String, String>>());
- }
- if (!_customizedStateMap.get(stateType).get(resourceName).containsKey(partition)) {
- _customizedStateMap.get(stateType).get(resourceName)
- .put(partition, new HashMap<String, String>());
- }
- _customizedStateMap.get(stateType).get(resourceName).get(partition).put(instanceName, state);
+ _customizedStateMap.computeIfAbsent(stateType, k -> new HashMap<>())
+ .computeIfAbsent(resourceName, k -> new HashMap<>())
+ .computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, state);
+ }
+
+ private void setStartTime(String stateType, String resourceName, Partition partition,
+ String instanceName, Long startTime) {
+ _startTimeMap.computeIfAbsent(stateType, k -> new HashMap<>())
+ .computeIfAbsent(resourceName, k -> new HashMap<>())
+ .computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, startTime);
}
/**
@@ -65,6 +70,10 @@ public class CustomizedStateOutput {
return Collections.emptyMap();
}
+ private Map<String, Map<Partition, Map<String, Long>>> getStartTimeMap(String stateType) {
+ return _startTimeMap.getOrDefault(stateType, Collections.emptyMap());
+ }
+
/**
* given (stateType, resource), returns (partition -> instance-> customizedState) map
* @param stateType
@@ -114,6 +123,12 @@ public class CustomizedStateOutput {
return null;
}
+ public Map<Partition, Map<String, Long>> getResourceStartTimeMap(String stateType,
+ String resourceName) {
+ return Collections.unmodifiableMap(
+ getStartTimeMap(stateType).getOrDefault(resourceName, Collections.emptyMap()));
+ }
+
public Set<String> getAllStateTypes() {
return _customizedStateMap.keySet();
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
index 165a0af..1582630 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -40,10 +41,11 @@ import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CustomizedViewAggregationStage.class);
@@ -78,7 +80,9 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
Set<String> customizedTypesToRemove = new HashSet<>();
for (String stateType : customizedViewCacheMap.keySet()) {
if (!customizedStateOutput.getAllStateTypes().contains(stateType)) {
- LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType);
+ LogUtil.logInfo(LOG, _eventId,
+ "Remove customizedView for stateType: " + stateType + ", on cluster " + event
+ .getClusterName());
dataAccessor.removeProperty(keyBuilder.customizedView(stateType));
customizedTypesToRemove.add(stateType);
}
@@ -88,6 +92,7 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
// update customized view
for (String stateType : customizedStateOutput.getAllStateTypes()) {
List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
+ Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps = new HashMap<>();
Map<String, CustomizedView> curCustomizedViews = new HashMap<>();
CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType);
if (customizedViewCache != null) {
@@ -97,13 +102,13 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
for (Resource resource : resourceMap.values()) {
try {
computeCustomizedStateView(resource, stateType, customizedStateOutput, curCustomizedViews,
- updatedCustomizedViews);
+ updatedCustomizedViews, updatedStartTimestamps);
} catch (HelixException ex) {
LogUtil.logError(LOG, _eventId,
- "Failed to calculate customized view for resource " + resource.getResourceName(), ex);
+ "Failed to calculate customized view for resource " + resource.getResourceName()
+ + ", on cluster " + event.getClusterName(), ex);
}
}
-
List<PropertyKey> keys = new ArrayList<>();
for (Iterator<CustomizedView> it = updatedCustomizedViews.iterator(); it.hasNext(); ) {
CustomizedView view = it.next();
@@ -112,15 +117,19 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
}
// add/update customized-views from zk and cache
if (updatedCustomizedViews.size() > 0) {
- dataAccessor.setChildren(keys, updatedCustomizedViews);
+ boolean[] success = dataAccessor.setChildren(keys, updatedCustomizedViews);
+ reportLatency(event, updatedCustomizedViews, curCustomizedViews, updatedStartTimestamps,
+ success);
cache.updateCustomizedViews(stateType, updatedCustomizedViews);
}
// remove stale customized views from zk and cache
List<String> customizedViewToRemove = new ArrayList<>();
for (String resourceName : curCustomizedViews.keySet()) {
- if (!resourceMap.keySet().contains(resourceName)) {
- LogUtil.logInfo(LOG, _eventId, "Remove customizedView for resource: " + resourceName);
+ if (!resourceMap.containsKey(resourceName)) {
+ LogUtil.logInfo(LOG, _eventId,
+ "Remove customizedView for resource: " + resourceName + ", on cluster " + event
+ .getClusterName());
dataAccessor.removeProperty(keyBuilder.customizedView(stateType, resourceName));
customizedViewToRemove.add(resourceName);
}
@@ -132,7 +141,8 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
private void computeCustomizedStateView(final Resource resource, final String stateType,
CustomizedStateOutput customizedStateOutput,
final Map<String, CustomizedView> curCustomizedViews,
- List<CustomizedView> updatedCustomizedViews) {
+ List<CustomizedView> updatedCustomizedViews,
+ Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps) {
String resourceName = resource.getResourceName();
CustomizedView view = new CustomizedView(resource.getResourceName());
@@ -152,6 +162,68 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
if (curCustomizedView == null || !curCustomizedView.getRecord().equals(view.getRecord())) {
// Add customized view to the list which will be written to ZK later.
updatedCustomizedViews.add(view);
+ updatedStartTimestamps.put(resourceName,
+ customizedStateOutput.getResourceStartTimeMap(stateType, resourceName));
+ }
+ }
+
+ private void reportLatency(ClusterEvent event, List<CustomizedView> updatedCustomizedViews,
+ Map<String, CustomizedView> curCustomizedViews,
+ Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps,
+ boolean[] updateSuccess) {
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ if (clusterStatusMonitor == null) {
+ return;
+ }
+ long curTime = System.currentTimeMillis();
+ String clusterName = event.getClusterName();
+ CustomizedViewMonitor customizedViewMonitor =
+ clusterStatusMonitor.getOrCreateCustomizedViewMonitor(clusterName);
+
+ for (int i = 0; i < updatedCustomizedViews.size(); i++) {
+ CustomizedView newCV = updatedCustomizedViews.get(i);
+ String resourceName = newCV.getResourceName();
+
+ if (!updateSuccess[i]) {
+ LOG.warn("Customized views are not updated successfully for resource {} on cluster {}",
+ resourceName, clusterName);
+ continue;
+ }
+
+ CustomizedView oldCV =
+ curCustomizedViews.getOrDefault(resourceName, new CustomizedView(resourceName));
+
+ Map<String, Map<String, String>> newPartitionStateMaps = newCV.getRecord().getMapFields();
+ Map<String, Map<String, String>> oldPartitionStateMaps = oldCV.getRecord().getMapFields();
+ Map<Partition, Map<String, Long>> partitionStartTimeMaps =
+ updatedStartTimestamps.getOrDefault(resourceName, Collections.emptyMap());
+
+ for (Map.Entry<String, Map<String, String>> partitionStateMapEntry : newPartitionStateMaps
+ .entrySet()) {
+ String partitionName = partitionStateMapEntry.getKey();
+ Map<String, String> newStateMap = partitionStateMapEntry.getValue();
+ Map<String, String> oldStateMap =
+ oldPartitionStateMaps.getOrDefault(partitionName, Collections.emptyMap());
+ if (!newStateMap.equals(oldStateMap)) {
+ Map<String, Long> partitionStartTimeMap = partitionStartTimeMaps
+ .getOrDefault(new Partition(partitionName), Collections.emptyMap());
+
+ for (Map.Entry<String, String> stateMapEntry : newStateMap.entrySet()) {
+ String instanceName = stateMapEntry.getKey();
+ if (!stateMapEntry.getValue().equals(oldStateMap.get(instanceName))) {
+ long timestamp = partitionStartTimeMap.get(instanceName);
+ if (timestamp > 0) {
+ customizedViewMonitor.recordUpdateToAggregationLatency(curTime - timestamp);
+ } else {
+ LOG.warn(
+ "Failed to find customized state update time stamp for resource {} partition {}, instance {}, on cluster {} the number should be positive.",
+ resourceName, partitionName, instanceName, clusterName);
+ }
+ }
+ }
+ }
+ }
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
index 7684e05..0abf4a1 100644
--- a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
@@ -68,6 +68,9 @@ public class CustomizedStateProvider {
PropertyKey propertyKey =
keyBuilder.customizedState(_instanceName, customizedStateName, resourceName);
ZNRecord record = new ZNRecord(resourceName);
+ // Update start time field for monitoring purpose, updated value is current time
+ customizedStateMap.put(CustomizedState.CustomizedStateProperty.START_TIME.name(),
+ String.valueOf(System.currentTimeMillis()));
record.setMapField(partitionName, customizedStateMap);
if (!_helixDataAccessor.updateProperty(propertyKey, new CustomizedState(record))) {
throw new HelixException(String
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index da49041..2242b12 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -96,6 +96,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMonitorMap =
new ConcurrentHashMap<>();
+ private CustomizedViewMonitor _customizedViewMonitor;
+
/**
* PerInstanceResource monitor map: beanName->monitor
*/
@@ -332,6 +334,23 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
+ /**
+ * Lazy initialization of customized view monitor
+ * @param clusterName the cluster name of the cluster to be monitored
+ * @return a customized view monitor instance
+ */
+ public synchronized CustomizedViewMonitor getOrCreateCustomizedViewMonitor(String clusterName) {
+ if (_customizedViewMonitor == null) {
+ _customizedViewMonitor = new CustomizedViewMonitor(clusterName);
+ try {
+ _customizedViewMonitor.register();
+ } catch (JMException e) {
+ LOG.error("Failed to register CustomizedViewMonitorMBean for cluster " + _clusterName, e);
+ }
+ }
+ return _customizedViewMonitor;
+ }
+
private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
try {
if (!_clusterEventMonitorMap.containsKey(phase)) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
new file mode 100644
index 0000000..b038078
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
@@ -0,0 +1,83 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CustomizedViewMonitor extends DynamicMBeanProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(CustomizedViewMonitor.class);
+
+ private static final String MBEAN_DESCRIPTION = "Helix Customized View Aggregation Monitor";
+ private final String _clusterName;
+ private final String _sensorName;
+ private HistogramDynamicMetric _updateToAggregationLatencyGauge;
+ public static final String UPDATE_TO_AGGREGATION_LATENCY_GAUGE =
+ "UpdateToAggregationLatencyGauge";
+ private ClusterStatusMonitor _clusterStatusMonitor;
+ private static final String TYPE_KEY = "Type";
+ private static final String CLUSTER_KEY = "Cluster";
+ private static final String CUSTOMIZED_VIEW = "CustomizedView";
+
+ public CustomizedViewMonitor(String clusterName) {
+ _clusterName = clusterName;
+ _sensorName = String
+ .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
+ CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName);
+ _updateToAggregationLatencyGauge =
+ new HistogramDynamicMetric(UPDATE_TO_AGGREGATION_LATENCY_GAUGE, new Histogram(
+ new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
+ }
+
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_updateToAggregationLatencyGauge);
+ doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanObjectName());
+ return this;
+ }
+
+ private ObjectName getMBeanObjectName() throws MalformedObjectNameException {
+ return new ObjectName(String
+ .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
+ CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName));
+ }
+
+ @Override
+ public String getSensorName() {
+ return _sensorName;
+ }
+
+ public void recordUpdateToAggregationLatency(long latency) {
+ _updateToAggregationLatencyGauge.updateValue(latency);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index bcd5f75..0f14bc2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -21,6 +21,9 @@ package org.apache.helix.controller.stages;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.management.ObjectName;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -34,10 +37,12 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.testng.Assert;
import org.testng.annotations.Test;
-
public class TestCustomizedViewStage extends ZkUnitTestBase {
private final String RESOURCE_NAME = "TestDB";
private final String PARTITION_NAME = "TestDB_0";
@@ -112,4 +117,65 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
deleteLiveInstances(clusterName);
deleteCluster(clusterName);
}
+
+ @Test
+ public void testLatencyMetricReporting() throws Exception {
+ String clusterName = "CLUSTER_" + TestHelper.getTestMethodName();
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+ // ideal state: node0 is MASTER, node1 is SLAVE
+ // replica=2 means 1 master and 1 slave
+ setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+ setupLiveInstances(clusterName, new int[]{0, 1});
+ setupStateModel(clusterName);
+
+ ClusterStatusMonitor clusterStatusMonitor = new ClusterStatusMonitor(clusterName);
+ ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.Unknown);
+ ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ cache.setAsyncTasksThreadPool(executor);
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+ event.addAttribute(AttributeName.clusterStatusMonitor.name(), clusterStatusMonitor);
+
+ CustomizedStateConfig config = new CustomizedStateConfig();
+ List<String> aggregationEnabledTypes = new ArrayList<>();
+ aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME);
+ config.setAggregationEnabledTypes(aggregationEnabledTypes);
+
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+ CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+ customizedState.setState(PARTITION_NAME, "STATE");
+ customizedState.setStartTime(PARTITION_NAME, 1);
+ accessor.setProperty(
+ keyBuilder.customizedState(INSTANCE_NAME, CUSTOMIZED_STATE_NAME, RESOURCE_NAME),
+ customizedState);
+
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+ runPipeline(event, dataRefresh);
+ runStage(event, new ResourceComputationStage());
+ runStage(event, new CustomizedStateComputationStage());
+ runStage(event, new CustomizedViewAggregationStage());
+
+ ObjectName objectName = new ObjectName(String
+ .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
+ "CustomizedView", "Cluster", clusterName));
+ Assert.assertNotNull(
+ ((ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()))
+ .getOrCreateCustomizedViewMonitor(clusterName));
+
+ TestHelper.verify(() -> (long) _server.getAttribute(objectName,
+ CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0,
+ TestHelper.WAIT_DURATION);
+
+ deleteLiveInstances(clusterName);
+ deleteCluster(clusterName);
+ executor.shutdownNow();
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java
index 4eed9cc..258e4c4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java
@@ -222,7 +222,7 @@ public class TestCustomizedViewAggregation extends ZkUnitTestBase {
Map<String, Map<String, String>> localPerResourceCustomizedView = localSnapshot
.getOrDefault(resourceCustomizedView.getResourceName(), Maps.newHashMap());
- if (resourceStateMap.isEmpty() && !localPerResourceCustomizedView.isEmpty()) {
+ if (resourceStateMap.size() != localPerResourceCustomizedView.size()) {
return false;
}
@@ -252,7 +252,7 @@ public class TestCustomizedViewAggregation extends ZkUnitTestBase {
}
return true;
}
- }, 12000);
+ }, TestHelper.WAIT_DURATION);
Assert.assertTrue(result);
}
@@ -422,6 +422,8 @@ public class TestCustomizedViewAggregation extends ZkUnitTestBase {
Map<String, String> perPartitionCustomizedState = _customizedStateProvider_participant1
.getPerPartitionCustomizedState(CustomizedStateType.TYPE_A.name(), RESOURCE_1,
PARTITION_10);
+ // Remove this field because it's automatically updated for monitoring purpose and we don't need to compare it
+ perPartitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
Map<String, String> actualPerPartitionCustomizedState = Maps.newHashMap();
actualPerPartitionCustomizedState
.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index 0102375..ed1338a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -87,7 +87,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
Map<String, Map<String, String>> mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
- Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+ // Updated 2 fields + START_TIME field is automatically updated for monitoring
+ Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3);
Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "STARTED");
Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
@@ -103,7 +104,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
- Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+ Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3);
Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
@@ -120,7 +121,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
- Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2);
+ Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3);
Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "COMPLETED");
@@ -141,13 +142,13 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
Map<String, String> partitionMap1 = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
- Assert.assertEquals(partitionMap1.keySet().size(), 2);
+ Assert.assertEquals(partitionMap1.keySet().size(), 3);
Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED");
Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED");
Map<String, String> partitionMap2 = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2);
- Assert.assertEquals(partitionMap2.keySet().size(), 2);
+ Assert.assertEquals(partitionMap2.keySet().size(), 3);
Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED");
Assert.assertEquals(partitionMap2.get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED");
@@ -170,6 +171,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
// get customized state
CustomizedState customizedState =
_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ // START_TIME field is automatically updated for monitoring
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE)
.size(), 1);
@@ -178,8 +180,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
Assert.assertEquals(customizedState
.getPartitionStateMap(CustomizedState.CustomizedStateProperty.PREVIOUS_STATE), map);
Assert.assertEquals(
- customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME),
- map);
+ customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME).size(),
+ 1);
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.END_TIME),
map);
@@ -192,6 +194,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), PARTITION_STATE);
Map<String, String> partitionCustomizedState = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+ partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
Assert.assertEquals(partitionCustomizedState, map);
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
@@ -218,6 +221,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), null);
Map<String, String> partitionCustomizedState = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
+ partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
Assert.assertEquals(partitionCustomizedState, map);
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
@@ -244,7 +248,8 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
// get per partition customized state
Map<String, String> partitionCustomizedState = _mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
- Assert.assertEquals(partitionCustomizedState.size(), 0);
+ // START_TIME field is automatically updated for monitoring
+ Assert.assertEquals(partitionCustomizedState.size(), 1);
Assert.assertNull(_mockProvider
.getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2));
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java
new file mode 100644
index 0000000..248ed56
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java
@@ -0,0 +1,99 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Stack;
+import javax.management.JMException;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestCustomizedViewMonitor {
+ private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+ private final String TEST_CLUSTER = "test_cluster";
+ private final String MAX_SUFFIX = ".Max";
+ private final String MEAN_SUFFIX = ".Mean";
+
+ private ObjectName buildObjectName(int duplicateNum) throws MalformedObjectNameException {
+ ObjectName objectName = new ObjectName(String
+ .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
+ "CustomizedView", "Cluster", TEST_CLUSTER));
+ if (duplicateNum == 0) {
+ return objectName;
+ } else {
+ return new ObjectName(
+ String.format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE, duplicateNum));
+ }
+ }
+
+ @Test
+ public void testMBeanRegistration() throws JMException, IOException {
+ int numOfMonitors = 5;
+
+ Stack<CustomizedViewMonitor> monitors = new Stack<>();
+ for (int i = 0; i < numOfMonitors; i++) {
+ CustomizedViewMonitor monitor = new CustomizedViewMonitor(TEST_CLUSTER);
+ monitor.register();
+ monitors.push(monitor);
+ }
+
+ for (int i = 0; i < numOfMonitors; i++) {
+ if (i == numOfMonitors - 1) {
+ Assert.assertTrue(_server.isRegistered(buildObjectName(0)));
+ } else {
+ Assert.assertTrue(_server.isRegistered(buildObjectName(numOfMonitors - i - 1)));
+ }
+ CustomizedViewMonitor monitor = monitors.pop();
+ assert monitor != null;
+ monitor.unregister();
+ if (i == numOfMonitors - 1) {
+ Assert.assertFalse(_server.isRegistered(buildObjectName(0)));
+ } else {
+ Assert.assertFalse(_server.isRegistered(buildObjectName(numOfMonitors - i - 1)));
+ }
+ }
+ }
+
+ @Test
+ public void testMetricInitialization() throws Exception {
+ CustomizedViewMonitor monitor = new CustomizedViewMonitor(TEST_CLUSTER);
+ monitor.register();
+ int sum = 0;
+ for (int i = 0; i < 10; i++) {
+ monitor.recordUpdateToAggregationLatency(i);
+ sum += i;
+ int expectedMax = i;
+ double expectedMean = sum / (i + 1.0);
+ Assert.assertTrue(TestHelper.verify(() -> (long) _server.getAttribute(buildObjectName(0),
+ CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + MAX_SUFFIX) == expectedMax,
+ TestHelper.WAIT_DURATION));
+ Assert.assertTrue(TestHelper.verify(() -> (double) _server.getAttribute(buildObjectName(0),
+ CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + MEAN_SUFFIX) == expectedMean,
+ TestHelper.WAIT_DURATION));
+ }
+ monitor.unregister();
+ }
+}
diff --git a/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
index a499d79..7a94e7f 100644
--- a/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
@@ -29,5 +29,6 @@ public enum MonitorDomainNames {
HelixCallback,
RoutingTableProvider,
CLMParticipantReport,
- Rebalancer
+ Rebalancer,
+ AggregatedView
}