You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/02/27 16:04:04 UTC

[1/8] storm git commit: STORM-2333: CGroup memory and CPU metrics

Repository: storm
Updated Branches:
  refs/heads/master 34406ec81 -> f0bfe0d3d


STORM-2333: CGroup memory and CPU metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ecbd21c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ecbd21c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ecbd21c0

Branch: refs/heads/master
Commit: ecbd21c00030f66783834fc00781bd13b49d7196
Parents: 4c5e34e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jan 31 16:05:22 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Feb 9 14:06:55 2017 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |  5 +
 .../jvm/org/apache/storm/metric/SystemBolt.java | 55 +++++------
 .../org/apache/storm/metric/api/IMetric.java    |  8 ++
 .../apache/storm/metric/cgroup/CGroupCpu.java   | 68 ++++++++++++++
 .../storm/metric/cgroup/CGroupCpuGuarantee.java | 48 ++++++++++
 .../storm/metric/cgroup/CGroupMemoryLimit.java  | 35 +++++++
 .../storm/metric/cgroup/CGroupMemoryUsage.java  | 35 +++++++
 .../storm/metric/cgroup/CGroupMetricsBase.java  | 96 ++++++++++++++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   | 21 +++++
 9 files changed, 341 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c71797e..facd0bd 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -307,3 +307,8 @@ storm.supervisor.cgroup.rootdir: "storm"
 storm.cgroup.cgexec.cmd: "/bin/cgexec"
 storm.cgroup.memory.limit.tolerance.margin.mb: 128.0
 storm.topology.classpath.beginning.enabled: false
+worker.metrics:
+    "CGroupMemory": "org.apache.storm.metric.cgroup.CGroupMemoryUsage"
+    "CGroupMemoryLimit": "org.apache.storm.metric.cgroup.CGroupMemoryLimit"
+    "CGroupCpu": "org.apache.storm.metric.cgroup.CGroupCpu"
+    "CGroupCpuGuarantee": "org.apache.storm.metric.cgroup.CGroupCpuGuarantee"

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/SystemBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/SystemBolt.java b/storm-core/src/jvm/org/apache/storm/metric/SystemBolt.java
index c50dc71..2d6a095 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -17,6 +17,15 @@
  */
 package org.apache.storm.metric;
 
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.task.IBolt;
@@ -24,13 +33,6 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Utils;
-import clojure.lang.AFn;
-import clojure.lang.IFn;
-import clojure.lang.RT;
-
-import java.lang.management.*;
-import java.util.HashMap;
-import java.util.Map;
 
 
 // There is one task inside one executor for each worker of the topology.
@@ -40,14 +42,14 @@ public class SystemBolt implements IBolt {
     private static boolean _prepareWasCalled = false;
 
     private static class MemoryUsageMetric implements IMetric {
-        IFn _getUsage;
-        public MemoryUsageMetric(IFn getUsage) {
+        Supplier<MemoryUsage> _getUsage;
+        public MemoryUsageMetric(Supplier<MemoryUsage> getUsage) {
             _getUsage = getUsage;
         }
         @Override
         public Object getValueAndReset() {
-            MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
-            HashMap m = new HashMap();
+            MemoryUsage memUsage = _getUsage.get();
+            HashMap<String, Object> m = new HashMap<>();
             m.put("maxBytes", memUsage.getMax());
             m.put("committedBytes", memUsage.getCommitted());
             m.put("initBytes", memUsage.getInit());
@@ -72,9 +74,9 @@ public class SystemBolt implements IBolt {
             Long collectionCountP = _gcBean.getCollectionCount();
             Long collectionTimeP = _gcBean.getCollectionTime();
 
-            Map ret = null;
+            Map<String, Object> ret = null;
             if(_collectionCount!=null && _collectionTime!=null) {
-                ret = new HashMap();
+                ret = new HashMap<>();
                 ret.put("count", collectionCountP - _collectionCount);
                 ret.put("timeMs", collectionTimeP - _collectionTime);
             }
@@ -85,14 +87,15 @@ public class SystemBolt implements IBolt {
         }
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
-    public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
-        if(_prepareWasCalled && !"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) {
+    public void prepare(final Map topoConf, TopologyContext context, OutputCollector collector) {
+        if(_prepareWasCalled && !"local".equals(topoConf.get(Config.STORM_CLUSTER_MODE))) {
             throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
         }
         _prepareWasCalled = true;
 
-        int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+        int bucketSize = Utils.getInt(topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
 
         final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
 
@@ -124,30 +127,22 @@ public class SystemBolt implements IBolt {
 
         final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
 
-        context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
-            public Object invoke() {
-                return jvmMemRT.getHeapMemoryUsage();
-            }
-        }), bucketSize);
-        context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
-            public Object invoke() {
-                return jvmMemRT.getNonHeapMemoryUsage();
-            }
-        }), bucketSize);
+        context.registerMetric("memory/heap", new MemoryUsageMetric(jvmMemRT::getHeapMemoryUsage), bucketSize);
+        context.registerMetric("memory/nonHeap", new MemoryUsageMetric(jvmMemRT::getNonHeapMemoryUsage), bucketSize);
 
         for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
             context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
         }
 
-        registerMetrics(context, (Map<String,String>)stormConf.get(Config.WORKER_METRICS), bucketSize);
-        registerMetrics(context, (Map<String,String>)stormConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize);
+        registerMetrics(context, (Map<String,String>)topoConf.get(Config.WORKER_METRICS), bucketSize, topoConf);
+        registerMetrics(context, (Map<String,String>)topoConf.get(Config.TOPOLOGY_WORKER_METRICS), bucketSize, topoConf);
     }
 
-    private void registerMetrics(TopologyContext context, Map<String, String> metrics, int bucketSize) {
+    private void registerMetrics(TopologyContext context, Map<String, String> metrics, int bucketSize, Map<String, Object> conf) {
         if (metrics == null) return;
         for (Map.Entry<String, String> metric: metrics.entrySet()) {
             try {
-                context.registerMetric(metric.getKey(), (IMetric)Utils.newInstance(metric.getValue()), bucketSize);
+                context.registerMetric(metric.getKey(), (IMetric)Utils.newInstance(metric.getValue(), conf), bucketSize);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/api/IMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/IMetric.java b/storm-core/src/jvm/org/apache/storm/metric/api/IMetric.java
index 12cc7ff..9e365c1 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/api/IMetric.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/api/IMetric.java
@@ -17,6 +17,14 @@
  */
 package org.apache.storm.metric.api;
 
+/**
+ * Produces metrics
+ */
 public interface IMetric {
+    /**
+     * @return an object that will be sent sent to {@link IMetricsConsumer#handleDataPoints(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo, java.util.Collection)}.
+     * If null is returned nothing will be sent.
+     * If this value can be reset, like with a counter, a side effect of calling this should be that the value is reset.
+     */
     public Object getValueAndReset();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
new file mode 100644
index 0000000..0429c97
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.cgroup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Report CPU used in the cgroup
+ */
+public class CGroupCpu extends CGroupMetricsBase<Map<String,Long>> {
+    long previousSystem = 0;
+    long previousUser = 0;
+    
+    public CGroupCpu(Map<String, Object> conf) {
+        super(conf, "cpuacct.stat");
+    }
+
+    public int getUserHZ() {
+        return 100; // On most systems (x86) this is fine.
+        // If someone really does want full support
+        // we need to run `getconf CLK_TCK` and cache the result.
+    }
+
+    @Override
+    public Map<String, Long> parseFileContents(String contents) {
+        try {
+            long systemHz = 0;
+            long userHz = 0;
+            for (String line: contents.split("\n")) {
+                if (!line.isEmpty()) {
+                    String [] parts = line.toLowerCase().split("\\s+");
+                    if (parts[0].contains("system")) {
+                        systemHz = Long.parseLong(parts[1].trim());
+                    } else if (parts[0].contains("user")) {
+                        userHz = Long.parseLong(parts[1].trim());
+                    }
+                }   
+            }
+            long user = userHz - previousUser;
+            long sys = systemHz - previousSystem;
+            previousUser = userHz;
+            previousSystem = systemHz;
+            long hz = getUserHZ();
+            HashMap<String, Long> ret = new HashMap<>();
+            ret.put("user-ms", user * 1000/hz); //Convert to millis
+            ret.put("sys-ms", sys * 1000/hz);
+            return ret;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
new file mode 100644
index 0000000..1c924cd
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.cgroup;
+
+import java.util.Map;
+
+import org.apache.storm.container.cgroup.core.CpuCore;
+
+/**
+ * Report the guaranteed number of ms this worker has requested.
+ */
+public class CGroupCpuGuarantee extends CGroupMetricsBase<Long> {
+    long previousTime = -1;
+    
+    public CGroupCpuGuarantee(Map<String, Object> conf) {
+        super(conf, CpuCore.CPU_SHARES);
+    }
+
+    @Override
+    public Long parseFileContents(String contents) {
+        Long msGuarantee = null;
+        long now = System.currentTimeMillis();
+        if (previousTime > 0) {
+            long shares = Long.valueOf(contents.trim());
+            //By convention each share corresponds to 1% of a CPU core
+            // or 100 = 1 core full time. So the guaranteed number of ms
+            // (approximately) should be ...
+            msGuarantee = (shares * (now - previousTime))/100;
+        }
+        previousTime = now;
+        return msGuarantee;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
new file mode 100644
index 0000000..caa5c51
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.cgroup;
+
+import java.util.Map;
+
+/**
+ * Reports the current memory limit of the cgroup for this worker
+ */
+public class CGroupMemoryLimit extends CGroupMetricsBase<Long> {
+
+    public CGroupMemoryLimit(Map<String, Object> conf) {
+        super(conf, "memory.limit_in_bytes");
+    }
+
+    @Override
+    public Long parseFileContents(String contents) {
+        return Long.parseLong(contents.trim());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
new file mode 100644
index 0000000..bb00514
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.cgroup;
+
+import java.util.Map;
+
+/**
+ * Reports the current memory usage of the cgroup for this worker
+ */
+public class CGroupMemoryUsage extends CGroupMetricsBase<Long> {
+
+    public CGroupMemoryUsage(Map<String, Object> conf) {
+        super(conf, "memory.usage_in_bytes");
+    }
+
+    @Override
+    public Long parseFileContents(String contents) {
+        return Long.parseLong(contents.trim());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
new file mode 100644
index 0000000..74bb42c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.cgroup;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for checking if CGroups are enabled, etc.
+ */
+public abstract class CGroupMetricsBase<T> implements IMetric {
+    private static final Logger LOG = LoggerFactory.getLogger(CGroupMetricsBase.class);
+    private boolean enabled;
+    private File fullFile;
+    
+    public CGroupMetricsBase(Map<String, Object> conf, String fileName) {
+        enabled = false;
+        String hierarchyDir = (String)conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
+        if (hierarchyDir == null || hierarchyDir.isEmpty()) {
+            LOG.warn("{} is disabled {} is not set", getClass().getSimpleName(), Config.STORM_CGROUP_HIERARCHY_DIR);
+            return;
+        }
+        //Good so far, check if we are in a CGroup
+        File cgroupFile = new File("/proc/self/cgroup");
+        if (!cgroupFile.exists()) {
+            LOG.warn("{} is disabled we do not appear to be a part of a CGroup", getClass().getSimpleName());
+            return;
+        }
+        
+        try (BufferedReader reader = new BufferedReader(new FileReader(cgroupFile))) {
+            //There can be more then one line if cgroups are mounted in more then one place, but we assume the first is good enough
+            String line = reader.readLine();
+            //hierarchy-ID:controller-list:cgroup-path
+            String[] parts = line.split(":");
+            //parts[0] == 0 for CGroup V2, else maps to hierarchy in /proc/cgroups
+            //parts[1] is empty for CGroups V2 else what is mapped that we are looking for
+            String cgroupPath = parts[2];
+            
+            fullFile = new File(new File(new File(hierarchyDir), cgroupPath), fileName);
+            if (!fullFile.exists()) {
+                LOG.warn("{} is disabled {} does not exist", getClass().getSimpleName(), fullFile);
+                return;
+            }
+        } catch (Exception e) {
+            LOG.warn("{} is disabled error trying to read or parse {}", getClass().getSimpleName(), cgroupFile);
+            return;
+        }
+        enabled = true;
+        LOG.info("{} is ENABLED {} exists...", getClass().getSimpleName(), fullFile);
+    }
+    
+    @Override
+    public Object getValueAndReset() {
+        if (!enabled) {
+            return null;
+        }
+        StringBuffer contents = new StringBuffer();
+        try (BufferedReader reader = new BufferedReader(new FileReader(fullFile))) {
+            char[] buf = new char[4096];
+            int len;
+            while((len = reader.read(buf)) > 0) {
+                contents.append(buf, 0, len);
+            }
+            Object ret = parseFileContents(contents.toString());
+            LOG.debug("{} is returning {} from {}", getClass().getSimpleName(), ret, fullFile);
+            return ret;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public abstract T parseFileContents(String contents);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ecbd21c0/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index a430336..3cb8ee1 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -197,6 +197,27 @@ public class Utils {
             throw new RuntimeException(e);
         }
     }
+    
+    @SuppressWarnings("unchecked")
+    public static <T> T newInstance(String klass, Map<String, Object> conf) {
+        try {
+            return newInstance((Class<T>)Class.forName(klass), conf);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static <T> T newInstance(Class<T> klass, Map<String, Object> conf) {
+        try {
+            try {
+                return klass.getConstructor(Map.class).newInstance(conf);
+            } catch (Exception e) {
+                return klass.newInstance();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     public static JarTransformer jarTransformer(String klass) {
         JarTransformer ret = null;


[4/8] storm git commit: Fixed issue with test not always having cgroups configured correctly

Posted by bo...@apache.org.
Fixed issue with test not always having cgroups configured correctly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d973754
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d973754
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d973754

Branch: refs/heads/master
Commit: 0d9737547d38400c37b534fd54cce141fa51ead2
Parents: 86e0f52
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Feb 21 16:26:33 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Feb 21 16:26:33 2017 -0600

----------------------------------------------------------------------
 .../jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java   | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0d973754/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
index 3f29d0f..5c66119 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
@@ -57,6 +57,11 @@ public abstract class CGroupMetricsBase<T> implements IMetric {
             LOG.warn("{} is disabled {} is not set", simpleName, Config.STORM_CGROUP_HIERARCHY_DIR);
             return;
         }
+
+        if (!new File(hierarchyDir).exists()) {
+            LOG.warn("{} is disabled {} does not exist", simpleName, hierarchyDir);
+            return;
+        }
         
         //Good so far, check if we are in a CGroup
         File cgroupFile = new File("/proc/self/cgroup");


[3/8] storm git commit: STORM-2333: Addressed some review comments

Posted by bo...@apache.org.
STORM-2333: Addressed some review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/86e0f526
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/86e0f526
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/86e0f526

Branch: refs/heads/master
Commit: 86e0f5268e892a8c897576e9c63254414757a9cf
Parents: b89984c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Feb 21 15:25:18 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Feb 21 15:25:18 2017 -0600

----------------------------------------------------------------------
 docs/cgroups_in_storm.md                        | 11 +++-
 .../container/cgroup/CgroupCoreFactory.java     | 58 +++++++++---------
 .../storm/container/cgroup/Hierarchy.java       |  2 +-
 .../apache/storm/metric/cgroup/CGroupCpu.java   | 64 ++++++++++----------
 .../storm/metric/cgroup/CGroupCpuGuarantee.java | 34 +++++++----
 .../storm/metric/cgroup/CGroupMemoryLimit.java  | 10 ++-
 .../storm/metric/cgroup/CGroupMemoryUsage.java  | 10 ++-
 .../storm/metric/cgroup/CGroupMetricsBase.java  | 50 +++++++--------
 8 files changed, 133 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/docs/cgroups_in_storm.md
----------------------------------------------------------------------
diff --git a/docs/cgroups_in_storm.md b/docs/cgroups_in_storm.md
index 445ae83..5eac9dc 100644
--- a/docs/cgroups_in_storm.md
+++ b/docs/cgroups_in_storm.md
@@ -22,7 +22,7 @@ A sample/default cgconfig.conf file is supplied in the <stormroot>/conf director
 mount {
 	cpuset	= /cgroup/cpuset;
 	cpu	= /cgroup/storm_resources;
-	cpuacct	= /cgroup/cpuacct;
+	cpuacct	= /cgroup/storm_resources;
 	memory	= /cgroup/storm_resources;
 	devices	= /cgroup/devices;
 	freezer	= /cgroup/freezer;
@@ -54,6 +54,13 @@ For a more detailed explanation of the format and configs for the cgconfig.conf
 
 https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/ch-Using_Control_Groups.html#The_cgconfig.conf_File
 
+To let storm manage the cgroups for indavidual workers you need to make sure that the resources you want to control are mounted under the same directory as in the example above.
+If they are not in the same directory the supervisor will throw an exception.
+
+The perm section needs to be configured so that the user the supervisor is running as can modify the group.
+
+If run as user is enabled so the supervisor spawns other processes as the user that launched the topology make sure that the permissions are such that indavidual users have read access but not write access.
+
 # Settings Related To CGroups in Storm
 
 | Setting                       | Function                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
@@ -89,6 +96,8 @@ org.apache.storm.metric.cgroup.CGroupCPU reports back metrics similar to org.apa
 
 CGroup reports these as CLK_TCK counts, and not milliseconds so the accuracy is determined by what CLK_TCK is set to.  On most systems it is 100 times a second so at most the accuracy is 10 ms.
 
+To make this metric work cpuacct must be mounted.
+
 ## CGroupCpuGuarantee
 
 org.apache.storm.metric.cgroup.CGroupCpuGuarantee reports back an approximate number of ms of CPU time that this worker is guaranteed to get.  This is calculated from the resources requested by the tasks in that given worker.

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
index 53a8a7f..4d35c77 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
@@ -34,39 +34,37 @@ import java.util.Set;
 
 public class CgroupCoreFactory {
 
+    public static CgroupCore getInstance(SubSystemType type, String dir) {
+        switch (type) {
+        case blkio:
+            return new BlkioCore(dir);
+        case cpuacct:
+            return new CpuacctCore(dir);
+        case cpuset:
+            return new CpusetCore(dir);
+        case cpu:
+            return new CpuCore(dir);
+        case devices:
+            return new DevicesCore(dir);
+        case freezer:
+            return new FreezerCore(dir);
+        case memory:
+            return new MemoryCore(dir);
+        case net_cls:
+            return new NetClsCore(dir);
+        case net_prio:
+            return new NetPrioCore(dir);
+        default:
+           return null;
+        }
+    }
+
     public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) {
         Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>();
         for (SubSystemType type : types) {
-            switch (type) {
-            case blkio:
-                result.put(SubSystemType.blkio, new BlkioCore(dir));
-                break;
-            case cpuacct:
-                result.put(SubSystemType.cpuacct, new CpuacctCore(dir));
-                break;
-            case cpuset:
-                result.put(SubSystemType.cpuset, new CpusetCore(dir));
-                break;
-            case cpu:
-                result.put(SubSystemType.cpu, new CpuCore(dir));
-                break;
-            case devices:
-                result.put(SubSystemType.devices, new DevicesCore(dir));
-                break;
-            case freezer:
-                result.put(SubSystemType.freezer, new FreezerCore(dir));
-                break;
-            case memory:
-                result.put(SubSystemType.memory, new MemoryCore(dir));
-                break;
-            case net_cls:
-                result.put(SubSystemType.net_cls, new NetClsCore(dir));
-                break;
-            case net_prio:
-                result.put(SubSystemType.net_prio, new NetPrioCore(dir));
-                break;
-            default:
-                break;
+            CgroupCore inst = getInstance(type, dir);
+            if (inst != null) {
+                result.put(type, inst);
             }
         }
         return result;

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
index 440531a..b2b245c 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
@@ -20,7 +20,7 @@ package org.apache.storm.container.cgroup;
 import java.util.Set;
 
 /**
- * A class that describes a cgroup hiearchy
+ * A class that describes a cgroup hierarchy
  */
 public class Hierarchy {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
index 0429c97..bf474be 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
@@ -17,52 +17,54 @@
  */
 package org.apache.storm.metric.cgroup;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.CpuacctCore;
+import org.apache.storm.container.cgroup.core.CpuacctCore.StatType;
+
 /**
  * Report CPU used in the cgroup
  */
 public class CGroupCpu extends CGroupMetricsBase<Map<String,Long>> {
     long previousSystem = 0;
     long previousUser = 0;
+    private int userHz = -1;
     
     public CGroupCpu(Map<String, Object> conf) {
-        super(conf, "cpuacct.stat");
+        super(conf, SubSystemType.cpuacct);
     }
 
-    public int getUserHZ() {
-        return 100; // On most systems (x86) this is fine.
-        // If someone really does want full support
-        // we need to run `getconf CLK_TCK` and cache the result.
+    public synchronized int getUserHZ() throws IOException {
+        if (userHz < 0) {
+            ProcessBuilder pb = new ProcessBuilder("getconf", "CLK_TCK");
+            Process p = pb.start();
+            BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            String line = in.readLine().trim();
+            userHz = Integer.valueOf(line);
+        }
+        return userHz;
     }
 
     @Override
-    public Map<String, Long> parseFileContents(String contents) {
-        try {
-            long systemHz = 0;
-            long userHz = 0;
-            for (String line: contents.split("\n")) {
-                if (!line.isEmpty()) {
-                    String [] parts = line.toLowerCase().split("\\s+");
-                    if (parts[0].contains("system")) {
-                        systemHz = Long.parseLong(parts[1].trim());
-                    } else if (parts[0].contains("user")) {
-                        userHz = Long.parseLong(parts[1].trim());
-                    }
-                }   
-            }
-            long user = userHz - previousUser;
-            long sys = systemHz - previousSystem;
-            previousUser = userHz;
-            previousSystem = systemHz;
-            long hz = getUserHZ();
-            HashMap<String, Long> ret = new HashMap<>();
-            ret.put("user-ms", user * 1000/hz); //Convert to millis
-            ret.put("sys-ms", sys * 1000/hz);
-            return ret;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+    public Map<String, Long> getDataFrom(CgroupCore core) throws IOException {
+        CpuacctCore cpu = (CpuacctCore) core;
+        Map<StatType, Long> stat = cpu.getCpuStat();
+        long systemHz = stat.get(StatType.system);
+        long userHz = stat.get(StatType.user);
+        long user = userHz - previousUser;
+        long sys = systemHz - previousSystem;
+        previousUser = userHz;
+        previousSystem = systemHz;
+        long hz = getUserHZ();
+        HashMap<String, Long> ret = new HashMap<>();
+        ret.put("user-ms", user * 1000/hz); //Convert to millis
+        ret.put("sys-ms", sys * 1000/hz);
+        return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
index 1c924cd..47ac153 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
@@ -17,8 +17,11 @@
  */
 package org.apache.storm.metric.cgroup;
 
+import java.io.IOException;
 import java.util.Map;
 
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CgroupCore;
 import org.apache.storm.container.cgroup.core.CpuCore;
 
 /**
@@ -26,23 +29,28 @@ import org.apache.storm.container.cgroup.core.CpuCore;
  */
 public class CGroupCpuGuarantee extends CGroupMetricsBase<Long> {
     long previousTime = -1;
-    
+
     public CGroupCpuGuarantee(Map<String, Object> conf) {
-        super(conf, CpuCore.CPU_SHARES);
+        super(conf, SubSystemType.cpu);
     }
 
     @Override
-    public Long parseFileContents(String contents) {
-        Long msGuarantee = null;
-        long now = System.currentTimeMillis();
-        if (previousTime > 0) {
-            long shares = Long.valueOf(contents.trim());
-            //By convention each share corresponds to 1% of a CPU core
-            // or 100 = 1 core full time. So the guaranteed number of ms
-            // (approximately) should be ...
-            msGuarantee = (shares * (now - previousTime))/100;
+    public Long getDataFrom(CgroupCore core) {
+        try {
+            CpuCore cpu = (CpuCore)core;
+            Long msGuarantee = null;
+            long now = System.currentTimeMillis();
+            if (previousTime > 0) {
+                long shares = cpu.getCpuShares();
+                //By convention each share corresponds to 1% of a CPU core
+                // or 100 = 1 core full time. So the guaranteed number of ms
+                // (approximately) should be ...
+                msGuarantee = (shares * (now - previousTime))/100;
+            }
+            previousTime = now;
+            return msGuarantee;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
-        previousTime = now;
-        return msGuarantee;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
index caa5c51..bfd87a9 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
@@ -19,17 +19,21 @@ package org.apache.storm.metric.cgroup;
 
 import java.util.Map;
 
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+
 /**
  * Reports the current memory limit of the cgroup for this worker
  */
 public class CGroupMemoryLimit extends CGroupMetricsBase<Long> {
 
     public CGroupMemoryLimit(Map<String, Object> conf) {
-        super(conf, "memory.limit_in_bytes");
+        super(conf, SubSystemType.memory);
     }
 
     @Override
-    public Long parseFileContents(String contents) {
-        return Long.parseLong(contents.trim());
+    public Long getDataFrom(CgroupCore core) throws Exception {
+        return ((MemoryCore) core).getPhysicalUsageLimit();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
index bb00514..9462166 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
@@ -19,17 +19,21 @@ package org.apache.storm.metric.cgroup;
 
 import java.util.Map;
 
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+
 /**
  * Reports the current memory usage of the cgroup for this worker
  */
 public class CGroupMemoryUsage extends CGroupMetricsBase<Long> {
 
     public CGroupMemoryUsage(Map<String, Object> conf) {
-        super(conf, "memory.usage_in_bytes");
+        super(conf, SubSystemType.memory);
     }
 
     @Override
-    public Long parseFileContents(String contents) {
-        return Long.parseLong(contents.trim());
+    public Long getDataFrom(CgroupCore core) throws Exception {
+        return ((MemoryCore) core).getPhysicalUsage();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/86e0f526/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
index 74bb42c..3f29d0f 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
@@ -20,10 +20,13 @@ package org.apache.storm.metric.cgroup;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
-import java.io.IOException;
 import java.util.Map;
 
 import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupCenter;
+import org.apache.storm.container.cgroup.CgroupCoreFactory;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CgroupCore;
 import org.apache.storm.metric.api.IMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,15 +37,27 @@ import org.slf4j.LoggerFactory;
 public abstract class CGroupMetricsBase<T> implements IMetric {
     private static final Logger LOG = LoggerFactory.getLogger(CGroupMetricsBase.class);
     private boolean enabled;
-    private File fullFile;
+    private CgroupCore core = null;
     
-    public CGroupMetricsBase(Map<String, Object> conf, String fileName) {
+    public CGroupMetricsBase(Map<String, Object> conf, SubSystemType type) {
+        final String simpleName = getClass().getSimpleName();
         enabled = false;
+        CgroupCenter center = CgroupCenter.getInstance();
+        if (center == null) {
+            LOG.warn("{} is diabled. cgroups do not appear to be enabled on this system", simpleName);
+            return;
+        }
+        if (!center.isSubSystemEnabled(type)) {
+            LOG.warn("{} is disabled. {} is not an enabled subsystem", simpleName, type);
+            return;
+        }
+        
         String hierarchyDir = (String)conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
         if (hierarchyDir == null || hierarchyDir.isEmpty()) {
-            LOG.warn("{} is disabled {} is not set", getClass().getSimpleName(), Config.STORM_CGROUP_HIERARCHY_DIR);
+            LOG.warn("{} is disabled {} is not set", simpleName, Config.STORM_CGROUP_HIERARCHY_DIR);
             return;
         }
+        
         //Good so far, check if we are in a CGroup
         File cgroupFile = new File("/proc/self/cgroup");
         if (!cgroupFile.exists()) {
@@ -58,18 +73,13 @@ public abstract class CGroupMetricsBase<T> implements IMetric {
             //parts[0] == 0 for CGroup V2, else maps to hierarchy in /proc/cgroups
             //parts[1] is empty for CGroups V2 else what is mapped that we are looking for
             String cgroupPath = parts[2];
-            
-            fullFile = new File(new File(new File(hierarchyDir), cgroupPath), fileName);
-            if (!fullFile.exists()) {
-                LOG.warn("{} is disabled {} does not exist", getClass().getSimpleName(), fullFile);
-                return;
-            }
+            core = CgroupCoreFactory.getInstance(type, new File(hierarchyDir, cgroupPath).getAbsolutePath());
         } catch (Exception e) {
-            LOG.warn("{} is disabled error trying to read or parse {}", getClass().getSimpleName(), cgroupFile);
+            LOG.warn("{} is disabled error trying to read or parse {}", simpleName, cgroupFile);
             return;
         }
         enabled = true;
-        LOG.info("{} is ENABLED {} exists...", getClass().getSimpleName(), fullFile);
+        LOG.info("{} is ENABLED {} exists...", simpleName);
     }
     
     @Override
@@ -77,20 +87,12 @@ public abstract class CGroupMetricsBase<T> implements IMetric {
         if (!enabled) {
             return null;
         }
-        StringBuffer contents = new StringBuffer();
-        try (BufferedReader reader = new BufferedReader(new FileReader(fullFile))) {
-            char[] buf = new char[4096];
-            int len;
-            while((len = reader.read(buf)) > 0) {
-                contents.append(buf, 0, len);
-            }
-            Object ret = parseFileContents(contents.toString());
-            LOG.debug("{} is returning {} from {}", getClass().getSimpleName(), ret, fullFile);
-            return ret;
-        } catch (IOException e) {
+        try {
+            return getDataFrom(core);
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
     
-    public abstract T parseFileContents(String contents);
+    public abstract T getDataFrom(CgroupCore core) throws Exception;
 }


[8/8] storm git commit: Added STORM-2333 to changelog

Posted by bo...@apache.org.
Added STORM-2333 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f0bfe0d3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f0bfe0d3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f0bfe0d3

Branch: refs/heads/master
Commit: f0bfe0d3d1e7446a9de289e0b07d84885eaf01ff
Parents: 3a0e953
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 27 10:03:31 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 27 10:03:31 2017 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f0bfe0d3/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b0a27a0..8ef71ed 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-2333: CGroup memory and CPU metrics
  * STORM-2374: Storm Kafka Client Test Topologies Must be Serializable
  * STORM-2372: Pacemaker client doesn't clean up heartbeats properly
  * STORM-2334: Join Bolt implementation


[2/8] storm git commit: STORM-2333: Added in some docs

Posted by bo...@apache.org.
STORM-2333: Added in some docs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b89984c2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b89984c2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b89984c2

Branch: refs/heads/master
Commit: b89984c225f3a44d9231fa3dc43e451f3c6a2746
Parents: ecbd21c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Feb 15 14:11:17 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Feb 15 14:11:17 2017 -0600

----------------------------------------------------------------------
 docs/cgroups_in_storm.md | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b89984c2/docs/cgroups_in_storm.md
----------------------------------------------------------------------
diff --git a/docs/cgroups_in_storm.md b/docs/cgroups_in_storm.md
index 745d812..445ae83 100644
--- a/docs/cgroups_in_storm.md
+++ b/docs/cgroups_in_storm.md
@@ -43,6 +43,10 @@ group storm {
        }
        cpu {
        }
+       memory {
+       }
+       cpuacct {
+       }
 }
 ```
 
@@ -68,4 +72,31 @@ Since limiting CPU usage via cpu.shares only limits the proportional CPU usage o
 
 CGroups can be used in conjunction with the Resource Aware Scheduler.  CGroups will then enforce the resource usage of workers as allocated by the Resource Aware Scheduler.  To use cgroups with the Resource Aware Scheduler, simply enable cgroups and be sure NOT to set storm.worker.cgroup.memory.mb.limit and storm.worker.cgroup.cpu.limit configs.
 
+# CGroup Metrics
+
+CGroups not only can limit the amount of resources a worker has access to, but it can also help monitor the resource consumption of a worker.  There are several metrics enabled by default that will check if the worker is a part of a CGroup and report correspndign metrics.
+
+## CGroupCPU
+
+org.apache.storm.metric.cgroup.CGroupCPU reports back metrics similar to org.apache.storm.metrics.sigar.CPUMetric, except for everything within the CGroup.  It reports both user and system CPU usage in ms as a map
+
+```
+{
+   "user-ms": number
+   "sys-ms": number
+}
+```
+
+CGroup reports these as CLK_TCK counts, and not milliseconds so the accuracy is determined by what CLK_TCK is set to.  On most systems it is 100 times a second so at most the accuracy is 10 ms.
+
+## CGroupCpuGuarantee
+
+org.apache.storm.metric.cgroup.CGroupCpuGuarantee reports back an approximate number of ms of CPU time that this worker is guaranteed to get.  This is calculated from the resources requested by the tasks in that given worker.
+
+## CGroupMemory
+
+org.apache.storm.metric.cgroup.CGroupMemoryUsage reports the current memory usage of all processes in the cgroup in bytes
+
+## CGroupMemoryLimit
 
+org.apache.storm.metric.cgroup.CGroupMemoryLimit report the current limit in bytes for all of the processes in the cgroup.  If running with CGroups enabeld in storm this is the on-heap request + the off-heap request for all tasks within the worker + any extra slop space given to workers.


[6/8] storm git commit: STORM-2333: Needed to check if the subsystem was mounted

Posted by bo...@apache.org.
STORM-2333: Needed to check if the subsystem was mounted


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c01b04d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c01b04d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c01b04d4

Branch: refs/heads/master
Commit: c01b04d4d772b1bdb779cb5d2649620ad82d4b18
Parents: debb3b8
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Feb 23 09:54:46 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Feb 23 09:54:46 2017 -0600

----------------------------------------------------------------------
 .../apache/storm/metric/cgroup/CGroupMetricsBase.java    | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c01b04d4/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
index 5c66119..bc11e3a 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
@@ -19,6 +19,7 @@ package org.apache.storm.metric.cgroup;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.util.Map;
 
@@ -51,6 +52,12 @@ public abstract class CGroupMetricsBase<T> implements IMetric {
             LOG.warn("{} is disabled. {} is not an enabled subsystem", simpleName, type);
             return;
         }
+
+        //Check to see if the CGroup is mounted at all
+        if (null == center.getHierarchyWithSubSystem(type)) {
+            LOG.warn("{} is disabled. {} is not a mounted subsystem", simpleName, type);
+            return;
+        }
         
         String hierarchyDir = (String)conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
         if (hierarchyDir == null || hierarchyDir.isEmpty()) {
@@ -94,6 +101,10 @@ public abstract class CGroupMetricsBase<T> implements IMetric {
         }
         try {
             return getDataFrom(core);
+        } catch (FileNotFoundException e) {
+             LOG.warn("Exception trying to read a file {}", e);
+             //Something happened and we couldn't find the file, so ignore it for now.
+            return null;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }


[5/8] storm git commit: More rework

Posted by bo...@apache.org.
More rework


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/debb3b81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/debb3b81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/debb3b81

Branch: refs/heads/master
Commit: debb3b81e88104fa63c21be473f883352ae13c03
Parents: 0d97375
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Feb 22 10:26:53 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Feb 22 10:26:53 2017 -0600

----------------------------------------------------------------------
 docs/cgroups_in_storm.md                        | 52 ++++++++++++++++++--
 .../storm/metric/cgroup/CGroupCpuGuarantee.java | 28 +++++------
 2 files changed, 60 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/debb3b81/docs/cgroups_in_storm.md
----------------------------------------------------------------------
diff --git a/docs/cgroups_in_storm.md b/docs/cgroups_in_storm.md
index 5eac9dc..ce86d81 100644
--- a/docs/cgroups_in_storm.md
+++ b/docs/cgroups_in_storm.md
@@ -54,12 +54,12 @@ For a more detailed explanation of the format and configs for the cgconfig.conf
 
 https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/ch-Using_Control_Groups.html#The_cgconfig.conf_File
 
-To let storm manage the cgroups for indavidual workers you need to make sure that the resources you want to control are mounted under the same directory as in the example above.
+To let storm manage the cgroups for individual workers you need to make sure that the resources you want to control are mounted under the same directory as in the example above.
 If they are not in the same directory the supervisor will throw an exception.
 
 The perm section needs to be configured so that the user the supervisor is running as can modify the group.
 
-If run as user is enabled so the supervisor spawns other processes as the user that launched the topology make sure that the permissions are such that indavidual users have read access but not write access.
+If "run as user" is enabled so that the supervisor spawns other processes as the user that launched the topology, make sure that the permissions are such that individual users have read access but not write access.
 
 # Settings Related To CGroups in Storm
 
@@ -81,7 +81,7 @@ CGroups can be used in conjunction with the Resource Aware Scheduler.  CGroups w
 
 # CGroup Metrics
 
-CGroups not only can limit the amount of resources a worker has access to, but it can also help monitor the resource consumption of a worker.  There are several metrics enabled by default that will check if the worker is a part of a CGroup and report correspndign metrics.
+CGroups not only can limit the amount of resources a worker has access to, but it can also help monitor the resource consumption of a worker.  There are several metrics enabled by default that will check if the worker is a part of a CGroup and report corresponding metrics.
 
 ## CGroupCPU
 
@@ -108,4 +108,48 @@ org.apache.storm.metric.cgroup.CGroupMemoryUsage reports the current memory usag
 
 ## CGroupMemoryLimit
 
-org.apache.storm.metric.cgroup.CGroupMemoryLimit report the current limit in bytes for all of the processes in the cgroup.  If running with CGroups enabeld in storm this is the on-heap request + the off-heap request for all tasks within the worker + any extra slop space given to workers.
+org.apache.storm.metric.cgroup.CGroupMemoryLimit report the current limit in bytes for all of the processes in the cgroup.  If running with CGroups enabled in storm this is the on-heap request + the off-heap request for all tasks within the worker + any extra slop space given to workers.
+
+## Usage/Debugging CGroups in your topology
+
+These metrics can be very helpful in debugging what has happened or is happening to your code when it is running under a CGroup.
+
+### CPU
+
+CPU guarantees under storm are soft.  It means that a worker can ea sly go over their guarantee if there is free CPU available.  To detect that your worker is using more CPU then it requested you can sum up the values in CGroupCPU and compare them to CGroupCpuGuarantee.  
+If CGroupCPU is consistently higher then or equal to CGroupCpuGuarantee you probably want to look at requesting more CPU as your worker may be starved for CPU if more load is placed on the cluster.  Being equal to CGroupCpuGuarantee means your worker may already
+be throttled.  If the used CPU is much smaller than CGroupCpuGuarantee then you are probably wasting resources and may want to reduce your CPU ask.
+
+If you do have high CPU you probably also want to check out the GC metrics and/or the GC log for your worker.  Memory pressure on the heap can result in increased CPU as garbage collection happens.
+
+### Memory
+
+Memory debugging of java under a cgroup can be difficult for multiple reasons.
+
+1. JVM memory management is complex
+2. As of the writing of this documentation only experimental support for cgroups is in a few JVMs
+3. JNI and other processes can use up memory within the cgroup that the JVM is not always aware of.
+4. Memory pressure within the heap can result in increased CPU load instead of increased memory allocation.
+
+There are several metrics that storm�provides by default that can help you understand what is happening within your worker.
+
+If CGroupMemory gets close to CGroupMemoryLimit then you know that bad things are likely to start happening soon with this worker.  Memory is not a soft guarantee like CPU.
+If you go over the OOM killer on Linux will start to shoot processes withing your worker.  Please pay attention to these metrics.  If you are running a version of java that
+is cgroup aware then going over the limit typically means that you will need to increase your off heap request.  If you are not, it could be that you need more off heap
+memory or it could be that java has allocated more memory then it should have as part of the garbage collection process.  Figuring out which is typically best done with
+trial and error (sorry).
+
+Storm also reports the JVM's on heap and off heap usage through the "memory/heap" and "memory/nonHeap" metrics respectively.  These can be used to give you a hint on 
+which to increase.  Looking at the "usedBytes" field under each can help you understand how much memory the JVM is currently using.  Although, like I said the off heap
+portion is not always accurate and when the heap grows it can result in unrecorded off heap memory that will cause the cgroup to shoot processes.
+
+The name of the GC metrics vary based off of the garbage collector you use, but they all start with "GC/".  If you sum up all of the "GC/*.timeMs" metrics for a given worker/window pair
+you should be able to see how much of the CPU guarantee went to GC.  By default java allows 98% of cpu time to go towards GC before it throws an OutOfMemoryError.  This is far from ideal
+for a near real time streaming system so pay attention to this ratio.
+
+If the ratio is at a fairly steady state and your memory usage is not even close to the limit you might want to look at reducing your memory request.  This too can be complicated to figure
+out.
+
+## Future Work
+
+There is a lot of work on adding in elasticity to storm.  Eventually we hope to be able to do all of the above analysis for you and grow/shrink your topology on demand.

http://git-wip-us.apache.org/repos/asf/storm/blob/debb3b81/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
index 47ac153..f663152 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
@@ -35,22 +35,18 @@ public class CGroupCpuGuarantee extends CGroupMetricsBase<Long> {
     }
 
     @Override
-    public Long getDataFrom(CgroupCore core) {
-        try {
-            CpuCore cpu = (CpuCore)core;
-            Long msGuarantee = null;
-            long now = System.currentTimeMillis();
-            if (previousTime > 0) {
-                long shares = cpu.getCpuShares();
-                //By convention each share corresponds to 1% of a CPU core
-                // or 100 = 1 core full time. So the guaranteed number of ms
-                // (approximately) should be ...
-                msGuarantee = (shares * (now - previousTime))/100;
-            }
-            previousTime = now;
-            return msGuarantee;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+    public Long getDataFrom(CgroupCore core) throws IOException {
+        CpuCore cpu = (CpuCore)core;
+        Long msGuarantee = null;
+        long now = System.currentTimeMillis();
+        if (previousTime > 0) {
+            long shares = cpu.getCpuShares();
+            //By convention each share corresponds to 1% of a CPU core
+            // or 100 = 1 core full time. So the guaranteed number of ms
+            // (approximately) should be ...
+            msGuarantee = (shares * (now - previousTime))/100;
         }
+        previousTime = now;
+        return msGuarantee;
     }
 }


[7/8] storm git commit: Merge branch 'STORM-2333' of https://github.com/revans2/incubator-storm into STORM-2333

Posted by bo...@apache.org.
Merge branch 'STORM-2333' of https://github.com/revans2/incubator-storm into STORM-2333

STORM-2333: CGroup memory and CPU metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a0e953a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a0e953a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a0e953a

Branch: refs/heads/master
Commit: 3a0e953a27e21fed2ddee219e8bc08e92d51d586
Parents: 34406ec c01b04d
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 27 09:51:56 2017 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 27 09:51:56 2017 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |   5 +
 docs/cgroups_in_storm.md                        |  86 +++++++++++++-
 .../container/cgroup/CgroupCoreFactory.java     |  58 +++++-----
 .../storm/container/cgroup/Hierarchy.java       |   2 +-
 .../jvm/org/apache/storm/metric/SystemBolt.java |  55 ++++-----
 .../org/apache/storm/metric/api/IMetric.java    |   8 ++
 .../apache/storm/metric/cgroup/CGroupCpu.java   |  70 ++++++++++++
 .../storm/metric/cgroup/CGroupCpuGuarantee.java |  52 +++++++++
 .../storm/metric/cgroup/CGroupMemoryLimit.java  |  39 +++++++
 .../storm/metric/cgroup/CGroupMemoryUsage.java  |  39 +++++++
 .../storm/metric/cgroup/CGroupMetricsBase.java  | 114 +++++++++++++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  21 ++++
 12 files changed, 487 insertions(+), 62 deletions(-)
----------------------------------------------------------------------