You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:20:15 UTC
[helix] 41/44: Add message latency record to
StateTransitionStatMonitor.
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit fa5767f2f4a83c05091e48a25a7a45ba947bc2f8
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Fri Feb 22 15:36:46 2019 -0800
Add message latency record to StateTransitionStatMonitor.
This record provides with additional breakdown to understand the state transition delay.
RB=1573606
BUG=HELIX-1625
G=helix-reviewers
A=jxue
Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
.../apache/helix/messaging/handling/HelixTask.java | 11 +++-
.../helix/monitoring/StateTransitionDataPoint.java | 9 ++-
.../monitoring/mbeans/ClusterMBeanObserver.java | 3 +-
.../mbeans/ParticipantStatusMonitor.java | 4 +-
.../mbeans/StateTransitionStatMonitor.java | 51 +++++++++++----
.../mbeans/StateTransitionStatMonitorMBean.java | 8 +++
.../helix/monitoring/TestParticipantMonitor.java | 72 ++++++++++++++--------
7 files changed, 111 insertions(+), 47 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index fb55e76..a8b74e5 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,6 +22,7 @@ package org.apache.helix.messaging.handling;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -317,7 +318,8 @@ public class HelixTask implements MessageTask {
if (msgReadTime != 0 && msgExecutionStartTime != 0) {
long totalDelay = now - msgReadTime;
long executionDelay = now - msgExecutionStartTime;
- if (totalDelay > 0 && executionDelay > 0) {
+ long msgLatency = msgReadTime - message.getCreateTimeStamp();
+ if (totalDelay >= 0 && executionDelay >= 0) {
String fromState = message.getFromState();
String toState = message.getToState();
String transition = fromState + "--" + toState;
@@ -327,11 +329,14 @@ public class HelixTask implements MessageTask {
message.getResourceName(), transition);
StateTransitionDataPoint data =
- new StateTransitionDataPoint(totalDelay, executionDelay, taskResult.isSuccess());
+ new StateTransitionDataPoint(totalDelay, executionDelay, msgLatency,
+ taskResult.isSuccess());
_executor.getParticipantMonitor().reportTransitionStat(cxt, data);
}
} else {
- logger.warn("message read time and start execution time not recorded.");
+ logger.warn(
+ "message read time and start execution time not recorded. State transition delay time is not available, message read time {}, Execute start time {}.",
+ msgReadTime, msgExecutionStartTime);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
index 1fe9264..0b4a675 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StateTransitionDataPoint.java
@@ -22,11 +22,14 @@ package org.apache.helix.monitoring;
public class StateTransitionDataPoint {
long _totalDelay;
long _executionDelay;
+ long _messageLatency;
boolean _isSuccess;
- public StateTransitionDataPoint(long totalDelay, long executionDelay, boolean isSuccess) {
+ public StateTransitionDataPoint(long totalDelay, long executionDelay, long messageLatency,
+ boolean isSuccess) {
_totalDelay = totalDelay;
_executionDelay = executionDelay;
+ _messageLatency = messageLatency;
_isSuccess = isSuccess;
}
@@ -38,6 +41,10 @@ public class StateTransitionDataPoint {
return _executionDelay;
}
+ public long getMessageLatency() {
+ return _messageLatency;
+ }
+
public boolean getSuccess() {
return _isSuccess;
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
index c9735c5..3320fff 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterMBeanObserver.java
@@ -43,8 +43,7 @@ public abstract class ClusterMBeanObserver implements NotificationListener {
protected MBeanServerConnection _server;
private static final Logger _logger = LoggerFactory.getLogger(ClusterMBeanObserver.class);
- public ClusterMBeanObserver(String domain) throws InstanceNotFoundException, IOException,
- MalformedObjectNameException, NullPointerException {
+ public ClusterMBeanObserver(String domain) throws IOException, InstanceNotFoundException {
// Get a reference to the target MBeanServer
_domain = domain;
_server = ManagementFactory.getPlatformMBeanServer();
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index d41b402..9eb29ff 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -37,7 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class ParticipantStatusMonitor {
private final ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor> _monitorMap =
- new ConcurrentHashMap<StateTransitionContext, StateTransitionStatMonitor>();
+ new ConcurrentHashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(ParticipantStatusMonitor.class);
private MBeanServer _beanServer;
@@ -116,11 +116,11 @@ public class ParticipantStatusMonitor {
}
private ObjectName getObjectName(String name) throws MalformedObjectNameException {
- LOG.info("Registering bean: " + name);
return new ObjectName(String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name));
}
private void register(Object bean, ObjectName name) {
+ LOG.info("Registering bean: " + name.toString());
if (_beanServer == null) {
LOG.warn("bean server is null, skip reporting");
return;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
index 4a4c89e..b1e93e6 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitor.java
@@ -20,30 +20,35 @@ package org.apache.helix.monitoring.mbeans;
*/
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import org.apache.helix.monitoring.StatCollector;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+// TODO convert StateTransitionStatMonitor to extends DynamicMBeanProvider.
+// Note this might change the attributes name.
public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBean {
+ private static final Logger _logger = LoggerFactory.getLogger(StateTransitionStatMonitor.class);
public enum LATENCY_TYPE {
TOTAL,
- EXECUTION
+ EXECUTION,
+ MESSAGE
}
private long _numDataPoints;
private long _successCount;
- private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap =
- new ConcurrentHashMap<LATENCY_TYPE, StatCollector>();
+ private ConcurrentHashMap<LATENCY_TYPE, StatCollector> _monitorMap = new ConcurrentHashMap<>();
StateTransitionContext _context;
public StateTransitionStatMonitor(StateTransitionContext context) {
_context = context;
- _monitorMap.put(LATENCY_TYPE.TOTAL, new StatCollector());
- _monitorMap.put(LATENCY_TYPE.EXECUTION, new StatCollector());
+ for (LATENCY_TYPE type : LATENCY_TYPE.values()) {
+ _monitorMap.put(type, new StatCollector());
+ }
reset();
}
@@ -63,17 +68,18 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe
}
addLatency(LATENCY_TYPE.TOTAL, data.getTotalDelay());
addLatency(LATENCY_TYPE.EXECUTION, data.getExecutionDelay());
+ addLatency(LATENCY_TYPE.MESSAGE, data.getMessageLatency());
}
- void addLatency(LATENCY_TYPE type, double latency) {
- assert (_monitorMap.containsKey(type));
+ private void addLatency(LATENCY_TYPE type, double latency) {
+ if (latency < 0) {
+ _logger.warn("Ignore negative latency data {} for type {}.", latency, type.name());
+ return;
+ }
+ assert(_monitorMap.containsKey(type));
_monitorMap.get(type).addData(latency);
}
- public long getNumDataPoints() {
- return _numDataPoints;
- }
-
public void reset() {
_numDataPoints = 0;
_successCount = 0;
@@ -136,4 +142,25 @@ public class StateTransitionStatMonitor implements StateTransitionStatMonitorMBe
public double getPercentileTransitionExecuteLatency(int percentage) {
return _monitorMap.get(LATENCY_TYPE.EXECUTION).getPercentile(percentage);
}
+
+ @Override
+ public double getMeanTransitionMessageLatency() {
+ return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMean();
+ }
+
+ @Override
+ public double getMaxTransitionMessageLatency() {
+ return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMax();
+ }
+
+ @Override
+ public double getMinTransitionMessageLatency() {
+ return _monitorMap.get(LATENCY_TYPE.MESSAGE).getMin();
+ }
+
+ @Override
+ public double getPercentileTransitionMessageLatency(int percentage) {
+ return _monitorMap.get(LATENCY_TYPE.MESSAGE).getPercentile(percentage);
+ }
+
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
index 60d6bc4..fe53344 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
@@ -44,5 +44,13 @@ public interface StateTransitionStatMonitorMBean extends SensorNameProvider {
double getPercentileTransitionExecuteLatency(int percentage);
+ double getMeanTransitionMessageLatency();
+
+ double getMaxTransitionMessageLatency();
+
+ double getMinTransitionMessageLatency();
+
+ double getPercentileTransitionMessageLatency(int percentage);
+
void reset();
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index b6e88b3..d7aed6e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -19,10 +19,6 @@ package org.apache.helix.monitoring;
* under the License.
*/
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
@@ -31,21 +27,28 @@ import javax.management.MBeanServerNotification;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class TestParticipantMonitor {
- static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class);
+ private static Logger _logger = LoggerFactory.getLogger(TestParticipantMonitor.class);
+ private static String CLUSTER_NAME = TestHelper.getTestClassName();
class ParticipantMonitorListener extends ClusterMBeanObserver {
- Map<String, Map<String, Object>> _beanValueMap = new HashMap<String, Map<String, Object>>();
+ Map<String, Map<String, Object>> _beanValueMap = new HashMap<>();
- public ParticipantMonitorListener(String domain) throws InstanceNotFoundException, IOException,
- MalformedObjectNameException, NullPointerException {
+ public ParticipantMonitorListener(String domain) throws IOException, InstanceNotFoundException {
super(domain);
init();
}
@@ -53,7 +56,7 @@ public class TestParticipantMonitor {
void init() {
try {
Set<ObjectInstance> existingInstances =
- _server.queryMBeans(new ObjectName(_domain + ":Cluster=cluster,*"), null);
+ _server.queryMBeans(new ObjectName(_domain + ":Cluster=" + CLUSTER_NAME + ",*"), null);
for (ObjectInstance instance : existingInstances) {
String mbeanName = instance.getObjectName().toString();
// System.out.println("mbeanName: " + mbeanName);
@@ -93,42 +96,57 @@ public class TestParticipantMonitor {
}
}
+ private ObjectName getObjectName(String name) throws MalformedObjectNameException {
+ return new ObjectName(
+ String.format("%s:%s", MonitorDomainNames.CLMParticipantReport.name(), name));
+ }
+
@Test()
- public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException,
- NullPointerException, IOException, InterruptedException {
+ public void testReportData()
+ throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
+ IOException, InterruptedException {
System.out.println("START TestParticipantMonitor");
ParticipantStatusMonitor monitor = new ParticipantStatusMonitor(false, null);
int monitorNum = 0;
- StateTransitionContext cxt = new StateTransitionContext("cluster", "instance", "db_1", "a-b");
- StateTransitionDataPoint data = new StateTransitionDataPoint(1000, 1000, true);
+ StateTransitionContext cxt = new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b");
+ StateTransitionDataPoint data = new StateTransitionDataPoint(2000, 1000, 600, true);
monitor.reportTransitionStat(cxt, data);
- data = new StateTransitionDataPoint(1000, 1200, true);
+ data = new StateTransitionDataPoint(2000, 1200, 600, true);
monitor.reportTransitionStat(cxt, data);
ParticipantMonitorListener monitorListener =
new ParticipantMonitorListener("CLMParticipantReport");
Thread.sleep(1000);
- AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1);
-
- data = new StateTransitionDataPoint(1000, 500, true);
+ Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
+
+ // Note the values in listener's map is the snapshot when the MBean is detected.
+ Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+ .get("MeanTransitionLatency"), 2000.0);
+ Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+ .get("MeanTransitionExecuteLatency"), 1100.0);
+ Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+ .get("MeanTransitionMessageLatency"), 600.0);
+ Assert.assertEquals(monitorListener._beanValueMap.get(getObjectName(cxt.toString()).toString())
+ .get("TotalStateTransitionGauge"), 2L);
+
+ data = new StateTransitionDataPoint(2000, 500, 600, true);
monitor.reportTransitionStat(cxt, data);
Thread.sleep(1000);
- AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1);
+ Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 1);
- data = new StateTransitionDataPoint(1000, 500, true);
- StateTransitionContext cxt2 = new StateTransitionContext("cluster", "instance", "db_2", "a-b");
+ data = new StateTransitionDataPoint(1000, 500, 300, true);
+ StateTransitionContext cxt2 = new StateTransitionContext(CLUSTER_NAME, "instance", "db_2", "a-b");
monitor.reportTransitionStat(cxt2, data);
monitor.reportTransitionStat(cxt2, data);
Thread.sleep(1000);
- AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 2);
+ Assert.assertEquals(monitorListener._beanValueMap.size(), monitorNum + 2);
- AssertJUnit.assertFalse(cxt.equals(cxt2));
- AssertJUnit.assertFalse(cxt.equals(new Object()));
- AssertJUnit.assertTrue(cxt.equals(new StateTransitionContext("cluster", "instance", "db_1",
- "a-b")));
+ Assert.assertFalse(cxt.equals(cxt2));
+ Assert.assertFalse(cxt.equals(new Object()));
+ Assert.assertTrue(cxt.equals(new StateTransitionContext(CLUSTER_NAME, "instance", "db_1", "a-b")));
cxt2.getInstanceName();
@@ -136,7 +154,7 @@ public class TestParticipantMonitor {
new ParticipantMonitorListener("CLMParticipantReport");
Thread.sleep(1000);
- AssertJUnit.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2);
+ Assert.assertEquals(monitorListener2._beanValueMap.size(), monitorNum + 2);
monitorListener2.disconnect();
monitorListener.disconnect();