You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/08/26 04:44:22 UTC
[helix] branch master updated: Fix customized view aggregation
latency calculation (#1314)
This is an automated email from the ASF dual-hosted git repository.
hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 767d055 Fix customized view aggregation latency calculation (#1314)
767d055 is described below
commit 767d055d050a6bfc6a467996ef9c6075386582fb
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Tue Aug 25 21:44:14 2020 -0700
Fix customized view aggregation latency calculation (#1314)
Fix customized view aggregation latency calculation
---
.../stages/CustomizedViewAggregationStage.java | 11 +--
.../monitoring/mbeans/CustomizedViewMonitor.java | 5 +-
.../controller/stages/TestCustomizedViewStage.java | 79 ++++++++++++++++++----
3 files changed, 76 insertions(+), 19 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
index 1582630..74e3b80 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -211,12 +211,15 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
for (Map.Entry<String, String> stateMapEntry : newStateMap.entrySet()) {
String instanceName = stateMapEntry.getKey();
- if (!stateMapEntry.getValue().equals(oldStateMap.get(instanceName))) {
- long timestamp = partitionStartTimeMap.get(instanceName);
- if (timestamp > 0) {
+ String newVal = stateMapEntry.getValue();
+ // We do not calculate the latency for deleting customized state
+ // So new value shouldn't be empty
+ if (!newVal.isEmpty() && !newVal.equals(oldStateMap.get(instanceName))) {
+ Long timestamp = partitionStartTimeMap.get(instanceName);
+ if (timestamp != null && timestamp > 0) {
customizedViewMonitor.recordUpdateToAggregationLatency(curTime - timestamp);
} else {
- LOG.warn(
+ LOG.info(
"Failed to find customized state update time stamp for resource {} partition {}, instance {}, on cluster {} the number should be positive.",
resourceName, partitionName, instanceName, clusterName);
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
index b038078..92ba934 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
@@ -43,7 +43,6 @@ public class CustomizedViewMonitor extends DynamicMBeanProvider {
private HistogramDynamicMetric _updateToAggregationLatencyGauge;
public static final String UPDATE_TO_AGGREGATION_LATENCY_GAUGE =
"UpdateToAggregationLatencyGauge";
- private ClusterStatusMonitor _clusterStatusMonitor;
private static final String TYPE_KEY = "Type";
private static final String CLUSTER_KEY = "Cluster";
private static final String CUSTOMIZED_VIEW = "CustomizedView";
@@ -51,8 +50,8 @@ public class CustomizedViewMonitor extends DynamicMBeanProvider {
public CustomizedViewMonitor(String clusterName) {
_clusterName = clusterName;
_sensorName = String
- .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
- CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName);
+ .format("%s.%s.%s", MonitorDomainNames.AggregatedView.name(),
+ CUSTOMIZED_VIEW, _clusterName);
_updateToAggregationLatencyGauge =
new HistogramDynamicMetric(UPDATE_TO_AGGREGATION_LATENCY_GAUGE, new Histogram(
new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index 0f14bc2..1e46a99 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -59,14 +60,8 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
// ideal state: node0 is MASTER, node1 is SLAVE
// replica=2 means 1 master and 1 slave
- setupIdealState(clusterName, new int[] {
- 0, 1
- }, new String[] {
- "TestDB"
- }, 1, 2);
- setupLiveInstances(clusterName, new int[] {
- 0, 1
- });
+ setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+ setupLiveInstances(clusterName, new int[]{0, 1});
setupStateModel(clusterName);
ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
@@ -166,13 +161,73 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
ObjectName objectName = new ObjectName(String
.format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
"CustomizedView", "Cluster", clusterName));
- Assert.assertNotNull(
- ((ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()))
- .getOrCreateCustomizedViewMonitor(clusterName));
+ Field customizedViewMonitor =
+ ClusterStatusMonitor.class.getDeclaredField("_customizedViewMonitor");
+ Assert.assertNotNull(customizedViewMonitor);
+
+ boolean hasLatencyReported = TestHelper.verify(() -> (long) _server.getAttribute(objectName,
+ CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0,
+ TestHelper.WAIT_DURATION);
+ Assert.assertTrue(hasLatencyReported);
+
+ deleteLiveInstances(clusterName);
+ deleteCluster(clusterName);
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testLatencyCalculationWithEmptyTimestamp() throws Exception {
+ String clusterName = "CLUSTER_" + TestHelper.getTestMethodName();
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+ HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+ setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+ setupLiveInstances(clusterName, new int[]{0, 1});
+ setupStateModel(clusterName);
+
+ ClusterStatusMonitor clusterStatusMonitor = new ClusterStatusMonitor(clusterName);
+ ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.Unknown);
+ ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ cache.setAsyncTasksThreadPool(executor);
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+ event.addAttribute(AttributeName.clusterStatusMonitor.name(), clusterStatusMonitor);
+
+ CustomizedStateConfig config = new CustomizedStateConfig();
+ List<String> aggregationEnabledTypes = new ArrayList<>();
+ aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME);
+ config.setAggregationEnabledTypes(aggregationEnabledTypes);
+
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+ CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+ customizedState.setState(PARTITION_NAME, "STATE");
+ accessor.setProperty(
+ keyBuilder.customizedState(INSTANCE_NAME, CUSTOMIZED_STATE_NAME, RESOURCE_NAME),
+ customizedState);
+
+ Pipeline dataRefresh = new Pipeline();
+ dataRefresh.addStage(new ReadClusterDataStage());
+ runPipeline(event, dataRefresh);
+ runStage(event, new ResourceComputationStage());
+ runStage(event, new CustomizedStateComputationStage());
+ runStage(event, new CustomizedViewAggregationStage());
+
+ ObjectName objectName = new ObjectName(String
+ .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
+ "CustomizedView", "Cluster", clusterName));
+ Field customizedViewMonitor =
+ ClusterStatusMonitor.class.getDeclaredField("_customizedViewMonitor");
+ Assert.assertNotNull(customizedViewMonitor);
- TestHelper.verify(() -> (long) _server.getAttribute(objectName,
+ boolean hasLatencyReported = TestHelper.verify(() -> (long) _server.getAttribute(objectName,
CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0,
TestHelper.WAIT_DURATION);
+ Assert.assertFalse(hasLatencyReported);
deleteLiveInstances(clusterName);
deleteCluster(clusterName);