You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/03/30 09:17:29 UTC

[15/19] storm git commit: Coda Hale for the stats

 Coda Hale for the stats


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6a89d3c3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6a89d3c3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6a89d3c3

Branch: refs/heads/master
Commit: 6a89d3c34f9eb7bd8e44da4619a81d3f424be1fd
Parents: 5269ff5
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Wed Mar 23 12:54:38 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Wed Mar 23 12:54:38 2016 +0800

----------------------------------------------------------------------
 .../storm/metric/StormMetricsRegistry.java      |  10 +-
 .../org/apache/storm/pacemaker/Pacemaker.java   | 174 +++++++------------
 .../jvm/org/apache/storm/PacemakerTest.java     |  18 +-
 3 files changed, 78 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6a89d3c3/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
index 28f334b..9de1fee 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java
@@ -18,11 +18,11 @@
 package org.apache.storm.metric;
 
 import clojure.lang.IFn;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.*;
+
 import java.util.Map;
+import java.util.concurrent.Callable;
+
 import org.apache.storm.daemon.metrics.MetricsUtils;
 import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 import org.slf4j.Logger;
@@ -39,7 +39,7 @@ public class StormMetricsRegistry {
     }
 
     // TODO: should replace fn to Gauge<Integer> when nimbus.clj is translated to java
-    public static Gauge<Integer> registerGauge(final String name, final IFn fn) {
+    public static Gauge<Integer> registerGauge(final String name, final Callable fn) {
         Gauge<Integer> gauge = new Gauge<Integer>() {
             @Override
             public Integer getValue() {

http://git-wip-us.apache.org/repos/asf/storm/blob/6a89d3c3/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
index 06cbb07..218c67c 100644
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java
@@ -22,6 +22,7 @@ import org.apache.storm.generated.HBMessageData;
 import org.apache.storm.generated.HBPulse;
 import org.apache.storm.generated.HBNodes;
 import org.apache.storm.generated.HBServerMessageType;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
@@ -29,8 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 
-import javax.management.*;
-import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
@@ -62,107 +61,71 @@ public class Pacemaker implements IServerMessageHandler {
         public AtomicInteger averageHeartbeatSize = new AtomicInteger();
         private AtomicInteger totalKeys = new AtomicInteger();
     }
-    private static class PacemakerDynamicMBean implements DynamicMBean {
 
-        private final MBeanInfo mBeanInfo;
-        private final static String [] attributeNames = new String []{
-                "send-pulse-count",
-                "total-received-size",
-                "get-pulse-count",
-                "total-sent-size",
-                "largest-heartbeat-size",
-                "average-heartbeat-size",
-                "total-keys"
-        };
-        private static String attributeType = "java.util.concurrent.atomic.AtomicInteger";
-
-        private static final MBeanAttributeInfo[] attributeInfos = new MBeanAttributeInfo[] {
-                        new MBeanAttributeInfo("send-pulse-count", attributeType, "send-pulse-count", true, false, false),
-                        new MBeanAttributeInfo("total-received-size", attributeType, "total-received-size", true, false, false),
-                        new MBeanAttributeInfo("get-pulse-count", attributeType, "get-pulse-count", true, false, false),
-                        new MBeanAttributeInfo("total-sent-size", attributeType, "total-sent-size", true, false, false),
-                        new MBeanAttributeInfo("largest-heartbeat-size", attributeType, "largest-heartbeat-size", true, false, false),
-                        new MBeanAttributeInfo("average-heartbeat-size", attributeType, "average-heartbeat-size", true, false, false),
-                        new MBeanAttributeInfo("total-keys", attributeType, "total-keys", true, false, false)
-        };
-        private PacemakerStats stats;
-
-        public PacemakerDynamicMBean(PacemakerStats stats) {
-            this.stats = stats;
-            this.mBeanInfo = new MBeanInfo("org.apache.storm.pacemaker.PaceMakerDynamicMBean", "Java Pacemaker Dynamic MBean",
-                    PacemakerDynamicMBean.attributeInfos, null, null, null);
-        }
-
-        @Override
-        public MBeanInfo getMBeanInfo() {
-            return mBeanInfo;
-        }
-
-        @Override
-        public AttributeList getAttributes(String[] attributes) {
-            AttributeList list = new AttributeList();
-            if (attributes == null)
-                return list;
-            final int len = attributes.length;
-            try {
-                for (int i = 0; i < len; i++) {
-                    final Attribute a = new Attribute(attributes[i], getAttribute(attributes[i]));
-                    list.add(a);
-
-                }
-            } catch (Exception e) {
-                throw Utils.wrapInRuntime(e);
-            }
-            return list;
-        }
-
-        @Override
-        public Object getAttribute(String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException {
-            if (attribute == null)
-                throw new AttributeNotFoundException("null");
-            if (attribute.equals("send-pulse-count"))
-                return stats.sendPulseCount.get();
-            else if (attribute.equals("total-received-size"))
-                return stats.totalReceivedSize.get();
-            else if (attribute.equals("get-pulse-count"))
-                return stats.getPulseCount.get();
-            else if (attribute.equals("total-sent-size"))
-                return stats.totalSentSize.get();
-            else if (attribute.equals("largest-heartbeat-size"))
-                return stats.largestHeartbeatSize.get();
-            else if (attribute.equals("average-heartbeat-size"))
-                return stats.averageHeartbeatSize.get();
-            else if (attribute.equals("total-keys"))
-                return stats.totalKeys.get();
-            else
-                throw new AttributeNotFoundException("null");
-        }
-
-        @Override
-        public void setAttribute(Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
-
-        }
-
-        @Override
-        public AttributeList setAttributes(AttributeList attributes) {
-            return null;
-        }
-
-        @Override
-        public Object invoke(String actionName, Object[] params, String[] signature) throws MBeanException, ReflectionException {
-            return null;
-        }
-    }
-
-    public Pacemaker(Map conf, boolean isRegisterJmx) {
+    public Pacemaker(Map conf) {
         heartbeats = new ConcurrentHashMap();
         pacemakerStats = new PacemakerStats();
         lastOneMinStats = new PacemakerStats();
         this.conf = conf;
         startStatsThread();
-        if (isRegisterJmx){
-            registerJmx(lastOneMinStats);
-        }
+        startRegisterStatsGauge();
+        StormMetricsRegistry.startMetricsReporters(conf);
+    }
+
+    public void startRegisterStatsGauge(){
+        StormMetricsRegistry.registerGauge("pacemaker:num-send-pulse-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.sendPulseCount.get();
+                    }
+                });
+
+        StormMetricsRegistry.registerGauge("pacemaker:num-total-receive-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.totalReceivedSize.get();
+                    }
+                });
+
+        StormMetricsRegistry.registerGauge("pacemaker:num-get-pulse-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.getPulseCount.get();
+                    }
+                });
+
+        StormMetricsRegistry.registerGauge("pacemaker:num-total-sent-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.totalSentSize.get();
+                    }
+                });
+
+        StormMetricsRegistry.registerGauge("pacemaker:size-largest-heartbeat-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.largestHeartbeatSize.get();
+                    }
+                });
+        StormMetricsRegistry.registerGauge("pacemaker:size-average-heartbeat-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.averageHeartbeatSize.get();
+                    }
+                });
+        StormMetricsRegistry.registerGauge("pacemaker:size-total-keys-1min",
+                new Callable(){
+                    @Override
+                    public Integer call() throws Exception {
+                        return lastOneMinStats.totalKeys.get();
+                    }
+                });
     }
 
     @Override
@@ -203,16 +166,6 @@ public class Pacemaker implements IServerMessageHandler {
         return response;
     }
 
-    private void registerJmx (PacemakerStats lastOneMinStats){
-        try {
-            MBeanServer mbServer = ManagementFactory.getPlatformMBeanServer();
-            DynamicMBean dynamicMBean = new PacemakerDynamicMBean(lastOneMinStats);
-            ObjectName objectname = new ObjectName("org.apache.storm.pacemaker.Pacemaker:stats=lastOneMinStats");
-            mbServer.registerMBean(dynamicMBean, objectname);
-        }catch (Exception e){
-            throw Utils.wrapInRuntime(e);
-        }
-    }
 
     private HBMessage createPath(String path) {
         return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null);
@@ -343,9 +296,10 @@ public class Pacemaker implements IServerMessageHandler {
                 int average = pacemakerStats.averageHeartbeatSize.getAndSet(0);
                 int totalKeys = heartbeats.size();
                 LOG.debug("\nReceived {} heartbeats totaling {} bytes,\nSent {} heartbeats totaling {} bytes," +
-                          "\nThe largest heartbeat was {} bytes,\nThe average heartbeat was {} bytes,\n" +
-                          "Pacemaker contained {} total keys\nin the last {} second(s)",
+                                "\nThe largest heartbeat was {} bytes,\nThe average heartbeat was {} bytes,\n" +
+                                "Pacemaker contained {} total keys\nin the last {} second(s)",
                         sendCount, receivedSize, getCount, sentSize, largest, average, totalKeys, sleepSeconds);
+                
                 lastOneMinStats.sendPulseCount.set(sendCount);
                 lastOneMinStats.totalReceivedSize.set(receivedSize);
                 lastOneMinStats.getPulseCount.set(getCount);
@@ -367,7 +321,7 @@ public class Pacemaker implements IServerMessageHandler {
     public static void main(String[] args) {
         SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
         Map conf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readStormConfig());
-        final Pacemaker serverHandler = new Pacemaker(conf, true);
+        final Pacemaker serverHandler = new Pacemaker(conf);
         serverHandler.launchServer();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6a89d3c3/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/PacemakerTest.java b/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
index 0992dc4..7a00f77 100644
--- a/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/PacemakerTest.java
@@ -43,7 +43,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerCreatePath() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         messageWithRandId(HBServerMessageType.CREATE_PATH, HBMessageData.path("/testpath"));
         HBMessage response = handler.handleMessage(hbMessage, true);
         Assert.assertEquals(mid, response.get_message_id());
@@ -53,7 +53,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerExistsFalse() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         messageWithRandId(HBServerMessageType.EXISTS, HBMessageData.path("/testpath"));
         HBMessage badResponse = handler.handleMessage(hbMessage, false);
         HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -69,7 +69,7 @@ public class PacemakerTest {
     public void testServerExistsTrue() {
         String path = "/exists_path";
         String dataString = "pulse data";
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         HBPulse hbPulse = new HBPulse();
         hbPulse.set_id(path);
         hbPulse.set_details(Utils.javaSerialize(dataString));
@@ -91,7 +91,7 @@ public class PacemakerTest {
     public void testServerSendPulseGetPulse() {
         String path = "/pulsepath";
         String dataString = "pulse data";
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         HBPulse hbPulse = new HBPulse();
         hbPulse.set_id(path);
         hbPulse.set_details(Utils.javaSerialize(dataString));
@@ -110,7 +110,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerGetAllPulseForPath() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         messageWithRandId(HBServerMessageType.GET_ALL_PULSE_FOR_PATH, HBMessageData.path("/testpath"));
         HBMessage badResponse = handler.handleMessage(hbMessage, false);
         HBMessage goodResponse = handler.handleMessage(hbMessage, true);
@@ -124,7 +124,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerGetAllNodesForPath() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root-path/foo");
         makeNode(handler, "/some-root-path/bar");
         makeNode(handler, "/some-root-path/baz");
@@ -166,7 +166,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerGetPulse() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/GET_PULSE");
         messageWithRandId(HBServerMessageType.GET_PULSE, HBMessageData.path("/some-root/GET_PULSE"));
         HBMessage badResponse = handler.handleMessage(hbMessage, false);
@@ -184,7 +184,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerDeletePath() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/DELETE_PATH/foo");
         makeNode(handler, "/some-root/DELETE_PATH/bar");
         makeNode(handler, "/some-root/DELETE_PATH/baz");
@@ -206,7 +206,7 @@ public class PacemakerTest {
 
     @Test
     public void testServerDeletePulseId() {
-        Pacemaker handler = new Pacemaker(new ConcurrentHashMap(), false);
+        Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/DELETE_PULSE_ID/foo");
         makeNode(handler, "/some-root/DELETE_PULSE_ID/bar");
         makeNode(handler, "/some-root/DELETE_PULSE_ID/baz");