You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/03/30 09:17:29 UTC
[15/19] storm git commit: Coda Hale for the stats
Coda Hale for the stats
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a89d3c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a89d3c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a89d3c3
Branch: refs/heads/master
Commit: 6a89d3c34f9eb7bd8e44da4619a81d3f424be1fd
Parents: 5269ff5
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Wed Mar 23 12:54:38 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Wed Mar 23 12:54:38 2016 +0800
----------------------------------------------------------------------
.../storm/metric/StormMetricsRegistry.java | 10 +-
.../org/apache/storm/pacemaker/Pacemaker.java | 174 +++++++------------
.../jvm/org/apache/storm/PacemakerTest.java | 18 +-
3 files changed, 78 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6a89d3c3/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
index 28f334b..9de1fee 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
@@ -18,11 +18,11 @@
package org.apache.storm.metric;
import clojure.lang.IFn;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.*;
+
import java.util.Map;
+import java.util.concurrent.Callable;
+
import org.apache.storm.daemon.metrics.MetricsUtils;
import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
import org.slf4j.Logger;
@@ -39,7 +39,7 @@ public class StormMetricsRegistry {
}
// TODO: should replace fn to Gauge<Integer> when nimbus.clj is translated to java
- public static Gauge<Integer> registerGauge(final String name, final IFn fn) {
+ public static Gauge<Integer> registerGauge(final String name, final Callable fn) {
Gauge<Integer> gauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
http://git-wip-us.apache.org/repos/asf/storm/blob/6a89d3c3/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
index 06cbb07..218c67c 100644
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
@@ -22,6 +22,7 @@ import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBNodes;
import org.apache.storm.generated.HBServerMessageType;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
@@ -29,8 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
-import javax.management.*;
-import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
@@ -62,107 +61,71 @@ public class Pacemaker implements IServerMessageHandler {
public AtomicInteger averageHeartbeatSize = new AtomicInteger();
private AtomicInteger totalKeys = new AtomicInteger();
}
- private static class PacemakerDynamicMBean implements DynamicMBean {
- private final MBeanInfo mBeanInfo;
- private final static String [] attributeNames = new String []{
- "send-pulse-count",
- "total-received-size",
- "get-pulse-count",
- "total-sent-size",
- "largest-heartbeat-size",
- "average-heartbeat-size",
- "total-keys"
- };
- private static String attributeType = "java.util.concurrent.atomic.AtomicInteger";
-
- private static final MBeanAttributeInfo[] attributeInfos = new MBeanAttributeInfo[] {
- new MBeanAttributeInfo("send-pulse-count", attributeType, "send-pulse-count", true, false, false),
- new MBeanAttributeInfo("total-received-size", attributeType, "total-received-size", true, false, false),
- new MBeanAttributeInfo("get-pulse-count", attributeType, "get-pulse-count", true, false, false),
- new MBeanAttributeInfo("total-sent-size", attributeType, "total-sent-size", true, false, false),
- new MBeanAttributeInfo("largest-heartbeat-size", attributeType, "largest-heartbeat-size", true, false, false),
- new MBeanAttributeInfo("average-heartbeat-size", attributeType, "average-heartbeat-size", true, false, false),
- new MBeanAttributeInfo("total-keys", attributeType, "total-keys", true, false, false)
- };
- private PacemakerStats stats;
-
- public PacemakerDynamicMBean(PacemakerStats stats) {
- this.stats = stats;
- this.mBeanInfo = new MBeanInfo("org.apache.storm.pacemaker.PaceMakerDynamicMBean", "Java Pacemaker Dynamic MBean",
- PacemakerDynamicMBean.attributeInfos, null, null, null);
- }
-
- @Override
- public MBeanInfo getMBeanInfo() {
- return mBeanInfo;
- }
-
- @Override
- public AttributeList getAttributes(String[] attributes) {
- AttributeList list = new AttributeList();
- if (attributes == null)
- return list;
- final int len = attributes.length;
- try {
- for (int i = 0; i < len; i++) {
- final Attribute a = new Attribute(attributes[i], getAttribute(attributes[i]));
- list.add(a);
-
- }
- } catch (Exception e) {
- throw Utils.wrapInRuntime(e);
- }
- return list;
- }
-
- @Override
- public Object getAttribute(String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException {
- if (attribute == null)
- throw new AttributeNotFoundException("null");
- if (attribute.equals("send-pulse-count"))
- return stats.sendPulseCount.get();
- else if (attribute.equals("total-received-size"))
- return stats.totalReceivedSize.get();
- else if (attribute.equals("get-pulse-count"))
- return stats.getPulseCount.get();
- else if (attribute.equals("total-sent-size"))
- return stats.totalSentSize.get();
- else if (attribute.equals("largest-heartbeat-size"))
- return stats.largestHeartbeatSize.get();
- else if (attribute.equals("average-heartbeat-size"))
- return stats.averageHeartbeatSize.get();
- else if (attribute.equals("total-keys"))
- return stats.totalKeys.get();
- else
- throw new AttributeNotFoundException("null");
- }
-
- @Override
- public void setAttribute(Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
-
- }
-
- @Override
- public AttributeList setAttributes(AttributeList attributes) {
- return null;
- }
-
- @Override
- public Object invoke(String actionName, Object[] params, String[] signature) throws MBeanException, ReflectionException {
- return null;
- }
- }
-
- public Pacemaker(Map conf, boolean isRegisterJmx) {
+ public Pacemaker(Map conf) {
heartbeats = new ConcurrentHashMap();
pacemakerStats = new PacemakerStats();
lastOneMinStats = new PacemakerStats();
this.conf = conf;
startStatsThread();
- if (isRegisterJmx){
- registerJmx(lastOneMinStats);
- }
+ startRegisterStatsGauge();
+ StormMetricsRegistry.startMetricsReporters(conf);
+ }
+
+ public void startRegisterStatsGauge(){
+ StormMetricsRegistry.registerGauge("pacemaker:num-send-pulse-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.sendPulseCount.get();
+ }
+ });
+
+ StormMetricsRegistry.registerGauge("pacemaker:num-total-receive-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.totalReceivedSize.get();
+ }
+ });
+
+ StormMetricsRegistry.registerGauge("pacemaker:num-get-pulse-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.getPulseCount.get();
+ }
+ });
+
+ StormMetricsRegistry.registerGauge("pacemaker:num-total-sent-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.totalSentSize.get();
+ }
+ });
+
+ StormMetricsRegistry.registerGauge("pacemaker:size-largest-heartbeat-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.largestHeartbeatSize.get();
+ }
+ });
+ StormMetricsRegistry.registerGauge("pacemaker:size-average-heartbeat-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.averageHeartbeatSize.get();
+ }
+ });
+ StormMetricsRegistry.registerGauge("pacemaker:size-total-keys-1min",
+ new Callable(){
+ @Override
+ public Integer call() throws Exception {
+ return lastOneMinStats.totalKeys.get();
+ }
+ });
}
@Override
@@ -203,16 +166,6 @@ public class Pacemaker implements IServerMessageHandler {
return response;
}
- private void registerJmx (PacemakerStats lastOneMinStats){
- try {
- MBeanServer mbServer = ManagementFactory.getPlatformMBeanServer();
- DynamicMBean dynamicMBean = new PacemakerDynamicMBean(lastOneMinStats);
- ObjectName objectname = new ObjectName("org.apache.storm.pacemaker.Pacemaker:stats=lastOneMinStats");
- mbServer.registerMBean(dynamicMBean, objectname);
- }catch (Exception e){
- throw Utils.wrapInRuntime(e);
- }
- }
private HBMessage createPath(String path) {
return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null);
@@ -343,9 +296,10 @@ public class Pacemaker implements IServerMessageHandler {
int average = pacemakerStats.averageHeartbeatSize.getAndSet(0);
int totalKeys = heartbeats.size();
LOG.debug("\nReceived {} heartbeats totaling {} bytes,\nSent {} heartbeats totaling {} bytes," +
- "\nThe largest heartbeat was {} bytes,\nThe average heartbeat was {} bytes,\n" +
- "Pacemaker contained {} total keys\nin the last {} second(s)",
+ "\nThe largest heartbeat was {} bytes,\nThe average heartbeat was {} bytes,\n" +
+ "Pacemaker contained {} total keys\nin the last {} second(s)",
sendCount, receivedSize, getCount, sentSize, largest, average, totalKeys, sleepSeconds);
+
lastOneMinStats.sendPulseCount.set(sendCount);
lastOneMinStats.totalReceivedSize.set(receivedSize);
lastOneMinStats.getPulseCount.set(getCount);
@@ -367,7 +321,7 @@ public class Pacemaker implements IServerMessageHandler {
public static void main(String[] args) {
SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
Map conf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readStormConfig());
- final Pacemaker serverHandler = new Pacemaker(conf, true);
+ final Pacemaker serverHandler = new Pacemaker(conf);
serverHandler.launchServer();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6a89d3c3/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/PacemakerTest.java b/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
index 0992dc4..7a00f77 100644
--- a/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
@@ -43,7 +43,7 @@ public class PacemakerTest {
@Test
public void testServerCreatePath() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
messageWithRandId(HBServerMessageType.CREATE_PATH, HBMessageData.path("/testpath"));
HBMessage response = handler.handleMessage(hbMessage, true);
Assert.assertEquals(mid, response.get_message_id());
@@ -53,7 +53,7 @@ public class PacemakerTest {
@Test
public void testServerExistsFalse() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
messageWithRandId(HBServerMessageType.EXISTS, HBMessageData.path("/testpath"));
HBMessage badResponse = handler.handleMessage(hbMessage, false);
HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -69,7 +69,7 @@ public class PacemakerTest {
public void testServerExistsTrue() {
String path = "/exists_path";
String dataString = "pulse data";
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
HBPulse hbPulse = new HBPulse();
hbPulse.set_id(path);
hbPulse.set_details(Utils.javaSerialize(dataString));
@@ -91,7 +91,7 @@ public class PacemakerTest {
public void testServerSendPulseGetPulse() {
String path = "/pulsepath";
String dataString = "pulse data";
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
HBPulse hbPulse = new HBPulse();
hbPulse.set_id(path);
hbPulse.set_details(Utils.javaSerialize(dataString));
@@ -110,7 +110,7 @@ public class PacemakerTest {
@Test
public void testServerGetAllPulseForPath() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
messageWithRandId(HBServerMessageType.GET_ALL_PULSE_FOR_PATH, HBMessageData.path("/testpath"));
HBMessage badResponse = handler.handleMessage(hbMessage, false);
HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -124,7 +124,7 @@ public class PacemakerTest {
@Test
public void testServerGetAllNodesForPath() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root-path/foo");
makeNode(handler, "/some-root-path/bar");
makeNode(handler, "/some-root-path/baz");
@@ -166,7 +166,7 @@ public class PacemakerTest {
@Test
public void testServerGetPulse() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root/GET_PULSE");
messageWithRandId(HBServerMessageType.GET_PULSE, HBMessageData.path("/some-root/GET_PULSE"));
HBMessage badResponse = handler.handleMessage(hbMessage, false);
@@ -184,7 +184,7 @@ public class PacemakerTest {
@Test
public void testServerDeletePath() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root/DELETE_PATH/foo");
makeNode(handler, "/some-root/DELETE_PATH/bar");
makeNode(handler, "/some-root/DELETE_PATH/baz");
@@ -206,7 +206,7 @@ public class PacemakerTest {
@Test
public void testServerDeletePulseId() {
- Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+ Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
makeNode(handler, "/some-root/DELETE_PULSE_ID/foo");
makeNode(handler, "/some-root/DELETE_PULSE_ID/bar");
makeNode(handler, "/some-root/DELETE_PULSE_ID/baz");