You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/08/26 04:44:22 UTC

[helix] branch master updated: Fix customized view aggregation latency calculation (#1314)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 767d055  Fix customized view aggregation latency calculation (#1314)
767d055 is described below

commit 767d055d050a6bfc6a467996ef9c6075386582fb
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Tue Aug 25 21:44:14 2020 -0700

    Fix customized view aggregation latency calculation (#1314)
    
    Fix customized view aggregation latency calculation
---
 .../stages/CustomizedViewAggregationStage.java     | 11 +--
 .../monitoring/mbeans/CustomizedViewMonitor.java   |  5 +-
 .../controller/stages/TestCustomizedViewStage.java | 79 ++++++++++++++++++----
 3 files changed, 76 insertions(+), 19 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
index 1582630..74e3b80 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java
@@ -211,12 +211,15 @@ public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
 
           for (Map.Entry<String, String> stateMapEntry : newStateMap.entrySet()) {
             String instanceName = stateMapEntry.getKey();
-            if (!stateMapEntry.getValue().equals(oldStateMap.get(instanceName))) {
-              long timestamp = partitionStartTimeMap.get(instanceName);
-              if (timestamp > 0) {
+            String newVal = stateMapEntry.getValue();
+            // We do not calculate the latency for deleting customized state
+            // So new value shouldn't be empty
+            if (!newVal.isEmpty() && !newVal.equals(oldStateMap.get(instanceName))) {
+              Long timestamp = partitionStartTimeMap.get(instanceName);
+              if (timestamp != null && timestamp > 0) {
                 customizedViewMonitor.recordUpdateToAggregationLatency(curTime - timestamp);
               } else {
-                LOG.warn(
+                LOG.info(
                     "Failed to find customized state update time stamp for resource {} partition {}, instance {}, on cluster {} the number should be positive.",
                     resourceName, partitionName, instanceName, clusterName);
               }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
index b038078..92ba934 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java
@@ -43,7 +43,6 @@ public class CustomizedViewMonitor extends DynamicMBeanProvider {
   private HistogramDynamicMetric _updateToAggregationLatencyGauge;
   public static final String UPDATE_TO_AGGREGATION_LATENCY_GAUGE =
       "UpdateToAggregationLatencyGauge";
-  private ClusterStatusMonitor _clusterStatusMonitor;
   private static final String TYPE_KEY = "Type";
   private static final String CLUSTER_KEY = "Cluster";
   private static final String CUSTOMIZED_VIEW = "CustomizedView";
@@ -51,8 +50,8 @@ public class CustomizedViewMonitor extends DynamicMBeanProvider {
   public CustomizedViewMonitor(String clusterName) {
     _clusterName = clusterName;
     _sensorName = String
-        .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
-            CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName);
+        .format("%s.%s.%s", MonitorDomainNames.AggregatedView.name(),
+            CUSTOMIZED_VIEW, _clusterName);
     _updateToAggregationLatencyGauge =
         new HistogramDynamicMetric(UPDATE_TO_AGGREGATION_LATENCY_GAUGE, new Histogram(
             new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index 0f14bc2..1e46a99 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -59,14 +60,8 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
 
     // ideal state: node0 is MASTER, node1 is SLAVE
     // replica=2 means 1 master and 1 slave
-    setupIdealState(clusterName, new int[] {
-        0, 1
-    }, new String[] {
-        "TestDB"
-    }, 1, 2);
-    setupLiveInstances(clusterName, new int[] {
-        0, 1
-    });
+    setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+    setupLiveInstances(clusterName, new int[]{0, 1});
     setupStateModel(clusterName);
 
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
@@ -166,13 +161,73 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
     ObjectName objectName = new ObjectName(String
         .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
             "CustomizedView", "Cluster", clusterName));
-    Assert.assertNotNull(
-        ((ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name()))
-            .getOrCreateCustomizedViewMonitor(clusterName));
+    Field customizedViewMonitor =
+        ClusterStatusMonitor.class.getDeclaredField("_customizedViewMonitor");
+    Assert.assertNotNull(customizedViewMonitor);
+
+    boolean hasLatencyReported = TestHelper.verify(() -> (long) _server.getAttribute(objectName,
+        CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0,
+        TestHelper.WAIT_DURATION);
+    Assert.assertTrue(hasLatencyReported);
+
+    deleteLiveInstances(clusterName);
+    deleteCluster(clusterName);
+    executor.shutdownNow();
+  }
+
+  @Test
+  public void testLatencyCalculationWithEmptyTimestamp() throws Exception {
+    String clusterName = "CLUSTER_" + TestHelper.getTestMethodName();
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+
+    setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+    setupLiveInstances(clusterName, new int[]{0, 1});
+    setupStateModel(clusterName);
+
+    ClusterStatusMonitor clusterStatusMonitor = new ClusterStatusMonitor(clusterName);
+    ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.Unknown);
+    ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    cache.setAsyncTasksThreadPool(executor);
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), clusterStatusMonitor);
+
+    CustomizedStateConfig config = new CustomizedStateConfig();
+    List<String> aggregationEnabledTypes = new ArrayList<>();
+    aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME);
+    config.setAggregationEnabledTypes(aggregationEnabledTypes);
+
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.customizedStateConfig(), config);
+
+    CustomizedState customizedState = new CustomizedState(RESOURCE_NAME);
+    customizedState.setState(PARTITION_NAME, "STATE");
+    accessor.setProperty(
+        keyBuilder.customizedState(INSTANCE_NAME, CUSTOMIZED_STATE_NAME, RESOURCE_NAME),
+        customizedState);
+
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+    runPipeline(event, dataRefresh);
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CustomizedStateComputationStage());
+    runStage(event, new CustomizedViewAggregationStage());
+
+    ObjectName objectName = new ObjectName(String
+        .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type",
+            "CustomizedView", "Cluster", clusterName));
+    Field customizedViewMonitor =
+        ClusterStatusMonitor.class.getDeclaredField("_customizedViewMonitor");
+    Assert.assertNotNull(customizedViewMonitor);
 
-    TestHelper.verify(() -> (long) _server.getAttribute(objectName,
+    boolean hasLatencyReported = TestHelper.verify(() -> (long) _server.getAttribute(objectName,
         CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0,
         TestHelper.WAIT_DURATION);
+    Assert.assertFalse(hasLatencyReported);
 
     deleteLiveInstances(clusterName);
     deleteCluster(clusterName);