You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:20:15 UTC

[helix] 41/44: Add message latency record to StateTransitionStatMonitor.

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fa5767f2f4a83c05091e48a25a7a45ba947bc2f8
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Fri Feb 22 15:36:46 2019 -0800

    Add message latency record to StateTransitionStatMonitor.
    
    This record provides with additional breakdown to understand the state transition delay.
    
    RB=1573606
    BUG=HELIX-1625
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../apache/helix/messaging/handling/HelixTask.java | 11 +++-
 .../helix/monitoring/StateTransitionDataPoint.java |  9 ++-
 .../monitoring/mbeans/ClusterMBeanObserver.java    |  3 +-
 .../mbeans/ParticipantStatusMonitor.java           |  4 +-
 .../mbeans/StateTransitionStatMonitor.java         | 51 +++++++++++----
 .../mbeans/StateTransitionStatMonitorMBean.java    |  8 +++
 .../helix/monitoring/TestParticipantMonitor.java   | 72 ++++++++++++++--------
 7 files changed, 111 insertions(+), 47 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index fb55e76..a8b74e5 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,6 +22,7 @@ package org.apache.helix.messaging.handling;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -317,7 +318,8 @@ public class HelixTask implements MessageTask {
     if (msgReadTime != 0 && msgExecutionStartTime != 0) {
       long totalDelay = now - msgReadTime;
       long executionDelay = now - msgExecutionStartTime;
-      if (totalDelay > 0 && executionDelay > 0) {
+      long msgLatency = msgReadTime - message.getCreateTimeStamp();
+      if (totalDelay >= 0 && executionDelay >= 0) {
         String fromState = message.getFromState();
         String toState = message.getToState();
         String transition = fromState + "--" + toState;
@@ -327,11 +329,14 @@ public class HelixTask implements MessageTask {
                 message.getResourceName(), transition);
 
         StateTransitionDataPoint data =
-            new StateTransitionDataPoint(totalDelay, executionDelay, taskResult.isSuccess());
+            new StateTransitionDataPoint(totalDelay, executionDelay, msgLatency,
+                taskResult.isSuccess());
         _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
       }
     } else {
-      logger.warn("message read time and start execution time not recorded.");
+      logger.warn(
+          "message read time and start execution time not recorded. State transition delay time is not available, message read time {}, Execute start time {}.",
+          msgReadTime, msgExecutionStartTime);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
index 1fe9264..0b4a675 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
@@ -22,11 +22,14 @@ package org.apache.helix.monitoring;
 public class StateTransitionDataPoint {
   long _totalDelay;
   long _executionDelay;
+  long _messageLatency;
   boolean _isSuccess;
 
-  public StateTransitionDataPoint(long totalDelay, long executionDelay, boolean isSuccess) {
+  public StateTransitionDataPoint(long totalDelay, long executionDelay, long messageLatency,
+      boolean isSuccess) {
     _totalDelay = totalDelay;
     _executionDelay = executionDelay;
+    _messageLatency = messageLatency;
     _isSuccess = isSuccess;
   }
 
@@ -38,6 +41,10 @@ public class StateTransitionDataPoint {
     return _executionDelay;
   }
 
+  public long getMessageLatency() {
+    return _messageLatency;
+  }
+
   public boolean getSuccess() {
     return _isSuccess;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
index c9735c5..3320fff 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
@@ -43,8 +43,7 @@ public abstract class ClusterMBeanObserver implements NotificationListener {
   protected MBeanServerConnection _server;
   private static final Logger _logger = LoggerFactory.getLogger(ClusterMBeanObserver.class);
 
-  public ClusterMBeanObserver(String domain) throws InstanceNotFoundException, IOException,
-      MalformedObjectNameException, NullPointerException {
+  public ClusterMBeanObserver(String domain) throws IOException, InstanceNotFoundException {
     // Get a reference to the target MBeanServer
     _domain = domain;
     _server = ManagementFactory.getPlatformMBeanServer();
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index d41b402..9eb29ff 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -37,7 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 public class ParticipantStatusMonitor {
   private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap =
-      new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>();
+      new ConcurrentHashMap<>();
   private static final Logger LOG = LoggerFactory.getLogger(ParticipantStatusMonitor.class);
 
   private MBeanServer _beanServer;
@@ -116,11 +116,11 @@ public class ParticipantStatusMonitor {
   }
 
   private ObjectName getObjectName(String name) throws MalformedObjectNameException {
-    LOG.info("Registering bean: " + name);
     return new ObjectName(String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name));
   }
 
   private void register(Object bean, ObjectName name) {
+    LOG.info("Registering bean: " + name.toString());
     if (_beanServer == null) {
       LOG.warn("bean server is null, skip reporting");
       return;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
index 4a4c89e..b1e93e6 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -20,30 +20,35 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.monitoring.StatCollector;
 import org.apache.helix.monitoring.StateTransitionContext;
 import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+// TODO convert StateTransitionStatMonitor to extends DynamicMBeanProvider.
+// Note this might change the attributes name.
 public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean {
+  private static final Logger _logger = LoggerFactory.getLogger(StateTransitionStatMonitor.class);
   public enum LATENCY_TYPE {
     TOTAL,
-    EXECUTION
+    EXECUTION,
+    MESSAGE
   }
 
   private long _numDataPoints;
   private long _successCount;
 
-  private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap =
-      new ConcurrentHashMap<LATENCY_TYPE, StatCollector>();
+  private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap = new ConcurrentHashMap<>();
 
   StateTransitionContext _context;
 
   public StateTransitionStatMonitor(StateTransitionContext context) {
     _context = context;
-    _monitorMap.put(LATENCY_TYPE.TOTAL, new StatCollector());
-    _monitorMap.put(LATENCY_TYPE.EXECUTION, new StatCollector());
+    for (LATENCY_TYPE type : LATENCY_TYPE.values()) {
+      _monitorMap.put(type, new StatCollector());
+    }
     reset();
   }
 
@@ -63,17 +68,18 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe
     }
     addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay());
     addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay());
+    addLatency(LATENCY_TYPE.MESSAGE, data.getMessageLatency());
   }
 
-  void addLatency(LATENCY_TYPE type, double latency) {
-    assert (_monitorMap.containsKey(type));
+  private void addLatency(LATENCY_TYPE type, double latency) {
+    if (latency < 0) {
+      _logger.warn("Ignore negative latency data {} for type {}.", latency, type.name());
+      return;
+    }
+    assert(_monitorMap.containsKey(type));
     _monitorMap.get(type).addData(latency);
   }
 
-  public long getNumDataPoints() {
-    return _numDataPoints;
-  }
-
   public void reset() {
     _numDataPoints = 0;
     _successCount = 0;
@@ -136,4 +142,25 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe
   public double getPercentileTransitionExecuteLatency(int percentage) {
     return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage);
   }
+
+  @Override
+  public double getMeanTransitionMessageLatency() {
+    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMean();
+  }
+
+  @Override
+  public double getMaxTransitionMessageLatency() {
+    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMax();
+  }
+
+  @Override
+  public double getMinTransitionMessageLatency() {
+    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMin();
+  }
+
+  @Override
+  public double getPercentileTransitionMessageLatency(int percentage) {
+    return _monitorMap.get(LATENCY_TYPE.MESSAGE).getPercentile(percentage);
+  }
+
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
index 60d6bc4..fe53344 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
@@ -44,5 +44,13 @@ public interface StateTransitionStatMonitorMBean extends SensorNameProvider {
 
   double getPercentileTransitionExecuteLatency(int percentage);
 
+  double getMeanTransitionMessageLatency();
+
+  double getMaxTransitionMessageLatency();
+
+  double getMinTransitionMessageLatency();
+
+  double getPercentileTransitionMessageLatency(int percentage);
+
   void reset();
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index b6e88b3..d7aed6e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -19,10 +19,6 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
@@ -31,21 +27,28 @@ import javax.management.MBeanServerNotification;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestParticipantMonitor {
-  static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class);
+  private static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class);
+  private static String CLUSTER_NAME = TestHelper.getTestClassName();
 
   class ParticipantMonitorListener extends ClusterMBeanObserver {
-    Map<String, Map<String, Object>> _beanValueMap = new HashMap<String, Map<String, Object>>();
+    Map<String, Map<String, Object>> _beanValueMap = new HashMap<>();
 
-    public ParticipantMonitorListener(String domain) throws InstanceNotFoundException, IOException,
-        MalformedObjectNameException, NullPointerException {
+    public ParticipantMonitorListener(String domain) throws IOException, InstanceNotFoundException {
       super(domain);
       init();
     }
@@ -53,7 +56,7 @@ public class TestParticipantMonitor {
     void init() {
       try {
         Set<ObjectInstance> existingInstances =
-            _server.queryMBeans(new ObjectName(_domain + ":Cluster=cluster,*"), null);
+            _server.queryMBeans(new ObjectName(_domain + ":Cluster=" + CLUSTER_NAME + ",*"), null);
         for (ObjectInstance instance : existingInstances) {
           String mbeanName = instance.getObjectName().toString();
           // System.out.println("mbeanName: " + mbeanName);
@@ -93,42 +96,57 @@ public class TestParticipantMonitor {
     }
   }
 
+  private ObjectName getObjectName(String name) throws MalformedObjectNameException {
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name));
+  }
+
   @Test()
-  public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException,
-      NullPointerException, IOException, InterruptedException {
+  public void testReportData()
+      throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
+      IOException, InterruptedException {
     System.out.println("START TestParticipantMonitor");
     ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(false, null);
 
     int monitorNum = 0;
 
-    StateTransitionContext cxt = new StateTransitionContext("cluster", "instance", "db_1", "a-b");
-    StateTransitionDataPoint data = new StateTransitionDataPoint(1000, 1000, true);
+    StateTransitionContext cxt = new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b");
+    StateTransitionDataPoint data = new StateTransitionDataPoint(2000, 1000, 600, true);
     monitor.reportTransitionStat(cxt, data);
 
-    data = new StateTransitionDataPoint(1000, 1200, true);
+    data = new StateTransitionDataPoint(2000, 1200, 600, true);
     monitor.reportTransitionStat(cxt, data);
 
     ParticipantMonitorListener monitorListener =
         new ParticipantMonitorListener("CLMParticipantReport");
     Thread.sleep(1000);
-    AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1);
-
-    data = new StateTransitionDataPoint(1000, 500, true);
+    Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
+
+    // Note the values in listener's map is the snapshot when the MBean is detected.
+    Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+        .get("MeanTransitionLatency"), 2000.0);
+    Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+        .get("MeanTransitionExecuteLatency"), 1100.0);
+    Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+        .get("MeanTransitionMessageLatency"), 600.0);
+    Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+        .get("TotalStateTransitionGauge"), 2L);
+
+    data = new StateTransitionDataPoint(2000, 500, 600, true);
     monitor.reportTransitionStat(cxt, data);
     Thread.sleep(1000);
-    AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1);
+    Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
 
-    data = new StateTransitionDataPoint(1000, 500, true);
-    StateTransitionContext cxt2 = new StateTransitionContext("cluster", "instance", "db_2", "a-b");
+    data = new StateTransitionDataPoint(1000, 500, 300, true);
+    StateTransitionContext cxt2 = new StateTransitionContext(CLUSTER_NAME, "instance", "db_2", "a-b");
     monitor.reportTransitionStat(cxt2, data);
     monitor.reportTransitionStat(cxt2, data);
     Thread.sleep(1000);
-    AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 2);
+    Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 2);
 
-    AssertJUnit.assertFalse(cxt.equals(cxt2));
-    AssertJUnit.assertFalse(cxt.equals(new Object()));
-    AssertJUnit.assertTrue(cxt.equals(new StateTransitionContext("cluster", "instance", "db_1",
-        "a-b")));
+    Assert.assertFalse(cxt.equals(cxt2));
+    Assert.assertFalse(cxt.equals(new Object()));
+    Assert.assertTrue(cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b")));
 
     cxt2.getInstanceName();
 
@@ -136,7 +154,7 @@ public class TestParticipantMonitor {
         new ParticipantMonitorListener("CLMParticipantReport");
 
     Thread.sleep(1000);
-    AssertJUnit.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2);
+    Assert.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2);
 
     monitorListener2.disconnect();
     monitorListener.disconnect();