You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:32 UTC

[43/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
new file mode 100644
index 0000000..b255dad
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.ClientZookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ZKStateStorage implements IStateStorage {
+
+    private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
+
+    private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>();
+    private CuratorFramework zkWriter;
+    private CuratorFramework zkReader;
+    private AtomicBoolean active;
+
+    private boolean isNimbus;
+    private Map authConf;
+    private Map<Object, Object> conf;
+
+    private class ZkWatcherCallBack implements WatcherCallBack{
+        @Override
+        public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+            if (active.get()) {
+                if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                    LOG.debug("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+                } else {
+                    LOG.debug("Received event {} : {} : {}", state, type, path);
+                }
+
+                if (!type.equals(Watcher.Event.EventType.None)) {
+                    for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+                        ZKStateChangedCallback fn = e.getValue();
+                        fn.changed(type, path);
+                    }
+                }
+            }
+        }
+    }
+
+    public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        this.conf = conf;
+        this.authConf = authConf;
+        if (context.getDaemonType().equals(DaemonType.NIMBUS))
+            this.isNimbus = true;
+
+        // just mkdir STORM_ZOOKEEPER_ROOT dir
+        CuratorFramework zkTemp = mkZk();
+        String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+        ClientZookeeper.mkdirs(zkTemp, rootPath, acls);
+        zkTemp.close();
+
+        active = new AtomicBoolean(true);
+        zkWriter = mkZk(new ZkWatcherCallBack());
+        if (isNimbus) {
+            zkReader = mkZk(new ZkWatcherCallBack());
+        } else {
+            zkReader = zkWriter;
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk() throws IOException {
+        return ClientZookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
+                new DefaultWatcherCallBack(), authConf);
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+        return ClientZookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
+                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+    }
+
+    @Override
+    public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+        ClientZookeeper.deleteNodeBlobstore(zkWriter, path, nimbusHostPortInfo);
+    }
+
+    @Override
+    public String register(ZKStateChangedCallback callback) {
+        String id = UUID.randomUUID().toString();
+        this.callbacks.put(id, callback);
+        return id;
+    }
+
+    @Override
+    public void unregister(String id) {
+        this.callbacks.remove(id);
+    }
+
+    @Override
+    public String create_sequential(String path, byte[] data, List<ACL> acls) {
+        return ClientZookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
+    }
+
+    @Override
+    public void mkdirs(String path, List<ACL> acls) {
+        ClientZookeeper.mkdirs(zkWriter, path, acls);
+    }
+
+    @Override
+    public void delete_node(String path) {
+        ClientZookeeper.deleteNode(zkWriter, path);
+    }
+
+    @Override
+    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+        ClientZookeeper.mkdirs(zkWriter, ClientZookeeper.parentPath(path), acls);
+        if (ClientZookeeper.exists(zkWriter, path, false)) {
+            try {
+                ClientZookeeper.setData(zkWriter, path, data);
+            } catch (RuntimeException e) {
+                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+                    ClientZookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+                } else {
+                    throw e;
+                }
+            }
+
+        } else {
+            ClientZookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+        }
+    }
+
+    @Override
+    public Integer get_version(String path, boolean watch) throws Exception {
+        Integer ret = ClientZookeeper.getVersion(zkReader, path, watch);
+        return ret;
+    }
+
+    @Override
+    public boolean node_exists(String path, boolean watch) {
+        return ClientZookeeper.existsNode(zkReader, path, watch);
+    }
+
+    @Override
+    public List<String> get_children(String path, boolean watch) {
+        return ClientZookeeper.getChildren(zkReader, path, watch);
+    }
+
+    @Override
+    public void close() {
+        this.active.set(false);
+        zkWriter.close();
+        if (isNimbus) {
+            zkReader.close();
+        }
+    }
+
+    @Override
+    public void set_data(String path, byte[] data, List<ACL> acls) {
+        if (ClientZookeeper.exists(zkWriter, path, false)) {
+            ClientZookeeper.setData(zkWriter, path, data);
+        } else {
+            ClientZookeeper.mkdirs(zkWriter, ClientZookeeper.parentPath(path), acls);
+            try {
+                ClientZookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
+            } catch (RuntimeException e) {
+                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
+                    ClientZookeeper.setData(zkWriter, path, data);
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Override
+    public byte[] get_data(String path, boolean watch) {
+        byte[] ret = null;
+
+        ret = ClientZookeeper.getData(zkReader, path, watch);
+
+        return ret;
+    }
+
+    @Override
+    public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
+        return ClientZookeeper.getDataWithVersion(zkReader, path, watch);
+    }
+
+    @Override
+    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+        set_data(path, data, acls);
+    }
+
+    @Override
+    public byte[] get_worker_hb(String path, boolean watch) {
+        return ClientZookeeper.getData(zkReader, path, watch);
+    }
+
+    @Override
+    public List<String> get_worker_hb_children(String path, boolean watch) {
+        return get_children(path, watch);
+    }
+
+    @Override
+    public void delete_worker_hb(String path) {
+        delete_node(path);
+    }
+
+    @Override
+    public void add_listener(final ConnectionStateListener listener) {
+        ClientZookeeper.addListener(zkReader, new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                listener.stateChanged(curatorFramework, connectionState);
+            }
+        });
+    }
+
+    @Override
+    public void sync_path(String path) {
+        ClientZookeeper.syncPath(zkWriter, path);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
new file mode 100644
index 0000000..3715e48
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+import java.util.Map;
+
+public class ZKStateStorageFactory implements StateStorageFactory {
+
+    @Override
+    public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
+        try {
+            return new ZKStateStorage(config, auth_conf, acls, context);
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
new file mode 100644
index 0000000..ba6417f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCenter implements CgroupOperation {
+
+    private static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class);
+
+    private static CgroupCenter instance;
+
+    private CgroupCenter() {
+
+    }
+
+    public synchronized static CgroupCenter getInstance() {
+        if (CgroupUtils.enabled()) {
+            instance = new CgroupCenter();
+            return instance;
+        }
+        return null;
+    }
+
+    @Override
+    public List<Hierarchy> getHierarchies() {
+        Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
+        try (FileReader reader = new FileReader(CgroupUtils.MOUNT_STATUS_FILE);
+             BufferedReader br = new BufferedReader(reader)) {
+            String str = null;
+            while ((str = br.readLine()) != null) {
+                String[] strSplit = str.split(" ");
+                if (!strSplit[2].equals("cgroup")) {
+                    continue;
+                }
+                String name = strSplit[0];
+                String type = strSplit[3];
+                String dir = strSplit[1];
+                //Some mount options (i.e. rw and relatime) in type are not cgroups related
+                Hierarchy h = new Hierarchy(name, CgroupUtils.getSubSystemsFromString(type), dir);
+                hierarchies.put(type, h);
+            }
+            return new ArrayList<Hierarchy>(hierarchies.values());
+        } catch (Exception e) {
+            LOG.error("Get hierarchies error {}", e);
+        }
+        return null;
+    }
+
+    @Override
+    public Set<SubSystem> getSubSystems() {
+        Set<SubSystem> subSystems = new HashSet<SubSystem>();
+        try (FileReader reader = new FileReader(CgroupUtils.CGROUP_STATUS_FILE);
+             BufferedReader br = new BufferedReader(reader)){
+            String str = null;
+            while ((str = br.readLine()) != null) {
+                String[] split = str.split("\t");
+                SubSystemType type = SubSystemType.getSubSystem(split[0]);
+                if (type == null) {
+                    continue;
+                }
+                int hierarchyID = Integer.valueOf(split[1]);
+                int cgroupNum = Integer.valueOf(split[2]);
+                boolean enable =  Integer.valueOf(split[3]).intValue() == 1 ? true : false;
+                subSystems.add(new SubSystem(type, hierarchyID, cgroupNum, enable));
+            }
+            return subSystems;
+        } catch (Exception e) {
+            LOG.error("Get subSystems error {}", e);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isSubSystemEnabled(SubSystemType subSystemType) {
+        Set<SubSystem> subSystems = this.getSubSystems();
+        for (SubSystem subSystem : subSystems) {
+            if (subSystem.getType() == subSystemType) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Hierarchy getHierarchyWithSubSystem(SubSystemType subSystem) {
+        return getHierarchyWithSubSystems(Arrays.asList(subSystem));
+    }
+
+    @Override
+    public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> subSystems) {
+        List<Hierarchy> hierarchies = this.getHierarchies();
+        for (Hierarchy hierarchy : hierarchies) {
+            Hierarchy ret = hierarchy;
+            for (SubSystemType subSystem : subSystems) {
+                if (!hierarchy.getSubSystems().contains(subSystem)) {
+                    ret = null;
+                    break;
+                }
+            }
+            if (ret != null) {
+                return ret;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isMounted(Hierarchy hierarchy) {
+        if (Utils.checkDirExists(hierarchy.getDir())) {
+            List<Hierarchy> hierarchies = this.getHierarchies();
+            for (Hierarchy h : hierarchies) {
+                if (h.equals(hierarchy)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void mount(Hierarchy hierarchy) throws IOException {
+        if (this.isMounted(hierarchy)) {
+            LOG.error("{} is already mounted", hierarchy.getDir());
+            return;
+        }
+        Set<SubSystemType> subSystems = hierarchy.getSubSystems();
+        for (SubSystemType type : subSystems) {
+            Hierarchy hierarchyWithSubSystem = this.getHierarchyWithSubSystem(type);
+            if (hierarchyWithSubSystem != null) {
+                LOG.error("subSystem: {} is already mounted on hierarchy: {}", type.name(), hierarchyWithSubSystem);
+                subSystems.remove(type);
+            }
+        }
+        if (subSystems.size() == 0) {
+            return;
+        }
+        if (!Utils.checkDirExists(hierarchy.getDir())) {
+            new File(hierarchy.getDir()).mkdirs();
+        }
+        String subSystemsName = CgroupUtils.subSystemsToString(subSystems);
+        SystemOperation.mount(subSystemsName, hierarchy.getDir(), "cgroup", subSystemsName);
+
+    }
+
+    @Override
+    public void umount(Hierarchy hierarchy) throws IOException {
+        if (this.isMounted(hierarchy)) {
+            hierarchy.getRootCgroups().delete();
+            SystemOperation.umount(hierarchy.getDir());
+            CgroupUtils.deleteDir(hierarchy.getDir());
+        } else {
+            LOG.error("{} is not mounted", hierarchy.getDir());
+        }
+    }
+
+    @Override
+    public void createCgroup(CgroupCommon cgroup) throws SecurityException {
+        if (cgroup.isRoot()) {
+            LOG.error("You can't create rootCgroup in this function");
+            throw new RuntimeException("You can't create rootCgroup in this function");
+        }
+        CgroupCommon parent = cgroup.getParent();
+        while (parent != null) {
+            if (!Utils.checkDirExists(parent.getDir())) {
+                throw new RuntimeException("Parent " + parent.getDir() + "does not exist");
+            }
+            parent = parent.getParent();
+        }
+        Hierarchy h = cgroup.getHierarchy();
+        if (!isMounted(h)) {
+            throw new RuntimeException("hierarchy " + h.getDir() + " is not mounted");
+        }
+        if (Utils.checkDirExists(cgroup.getDir())) {
+            throw new RuntimeException("cgroup {} already exists " + cgroup.getDir());
+        }
+
+        if (!(new File(cgroup.getDir())).mkdir()) {
+            throw new RuntimeException("Could not create cgroup dir at " + cgroup.getDir());
+        }
+    }
+
+    @Override
+    public void deleteCgroup(CgroupCommon cgroup) throws IOException {
+        cgroup.delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
new file mode 100755
index 0000000..c8bb304
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCommon implements CgroupCommonOperation {
+
+    public static final String TASKS = "/tasks";
+    public static final String NOTIFY_ON_RELEASE = "/notify_on_release";
+    public static final String RELEASE_AGENT = "/release_agent";
+    public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children";
+    public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control";
+    public static final String CGROUP_PROCS = "/cgroup.procs";
+
+    private final Hierarchy hierarchy;
+
+    private final String name;
+
+    private final String dir;
+
+    private final CgroupCommon parent;
+
+    private final boolean isRoot;
+
+    private static final Logger LOG = LoggerFactory.getLogger(CgroupCommon.class);
+
+    public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) {
+        this.name = parent.getName() + "/" + name;
+        this.hierarchy = hierarchy;
+        this.parent = parent;
+        this.dir = parent.getDir() + "/" + name;
+        this.isRoot = false;
+    }
+
+    /**
+     * rootCgroup
+     */
+    public CgroupCommon(Hierarchy hierarchy, String dir) {
+        this.name = "";
+        this.hierarchy = hierarchy;
+        this.parent = null;
+        this.dir = dir;
+        this.isRoot = true;
+    }
+
+    @Override
+    public void addTask(int taskId) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, TASKS), String.valueOf(taskId));
+    }
+
+    @Override
+    public Set<Integer> getTasks() throws IOException {
+        List<String> stringTasks = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, TASKS));
+        Set<Integer> tasks = new HashSet<Integer>();
+        for (String task : stringTasks) {
+            tasks.add(Integer.valueOf(task));
+        }
+        return tasks;
+    }
+
+    @Override
+    public void addProcs(int pid) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid));
+    }
+
+    @Override
+    public Set<Long> getPids() throws IOException {
+        List<String> stringPids = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS));
+        Set<Long> pids = new HashSet<>();
+        for (String task : stringPids) {
+            pids.add(Long.valueOf(task));
+        }
+        return pids;
+    }
+
+    @Override
+    public void setNotifyOnRelease(boolean flag) throws IOException {
+
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0");
+    }
+
+    @Override
+    public boolean getNotifyOnRelease() throws IOException {
+        return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
+    }
+
+    @Override
+    public void setReleaseAgent(String command) throws IOException {
+        if (!this.isRoot) {
+            LOG.warn("Cannot set {} in {} since its not the root group", RELEASE_AGENT, this.isRoot);
+            return;
+        }
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, RELEASE_AGENT), command);
+    }
+
+    @Override
+    public String getReleaseAgent() throws IOException {
+        if (!this.isRoot) {
+            LOG.warn("Cannot get {} in {} since its not the root group", RELEASE_AGENT, this.isRoot);
+            return null;
+        }
+        return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, RELEASE_AGENT)).get(0);
+    }
+
+    @Override
+    public void setCgroupCloneChildren(boolean flag) throws IOException {
+        if (!getCores().keySet().contains(SubSystemType.cpuset)) {
+            return;
+        }
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+    }
+
+    @Override
+    public boolean getCgroupCloneChildren() throws IOException {
+        return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
+    }
+
+    @Override
+    public void setEventControl(String eventFd, String controlFd, String... args) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append(eventFd);
+        sb.append(' ');
+        sb.append(controlFd);
+        for (String arg : args) {
+            sb.append(' ');
+            sb.append(arg);
+        }
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString());
+    }
+
+    public Hierarchy getHierarchy() {
+        return hierarchy;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDir() {
+        return dir;
+    }
+
+    public CgroupCommon getParent() {
+        return parent;
+    }
+
+    public Set<CgroupCommon> getChildren() {
+
+        File file = new File(this.dir);
+        File[] files = file.listFiles();
+        if (files == null) {
+            LOG.info("{} is not a directory", this.dir);
+            return null;
+        }
+        Set<CgroupCommon> children = new HashSet<CgroupCommon>();
+        for (File child : files) {
+            if (child.isDirectory()) {
+                children.add(new CgroupCommon(child.getName(), this.hierarchy, this));
+            }
+        }
+        return children;
+    }
+
+    public boolean isRoot() {
+        return isRoot;
+    }
+
+    public Map<SubSystemType, CgroupCore> getCores() {
+        return CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir);
+    }
+
+    public void delete() throws IOException {
+        this.free();
+        if (!this.isRoot) {
+            this.parent.getChildren().remove(this);
+        }
+    }
+
+    private void free() throws IOException {
+        for (CgroupCommon child : getChildren()) {
+            child.free();
+        }
+        if (this.isRoot) {
+            return;
+        }
+        Set<Integer> tasks = this.getTasks();
+        if (tasks != null) {
+            for (Integer task : tasks) {
+                this.parent.addTask(task);
+            }
+        }
+        CgroupUtils.deleteDir(this.dir);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        boolean ret = false;
+        if (o != null && (o instanceof CgroupCommon)) {
+
+            boolean hierarchyFlag =false;
+            if (((CgroupCommon)o).hierarchy != null && this.hierarchy != null) {
+                hierarchyFlag = ((CgroupCommon)o).hierarchy.equals(this.hierarchy);
+            } else if (((CgroupCommon)o).hierarchy == null && this.hierarchy == null) {
+                hierarchyFlag = true;
+            } else {
+                hierarchyFlag = false;
+            }
+
+            boolean nameFlag = false;
+            if (((CgroupCommon)o).name != null && this.name != null) {
+                nameFlag = ((CgroupCommon)o).name.equals(this.name);
+            } else if (((CgroupCommon)o).name == null && this.name == null) {
+                nameFlag = true;
+            } else {
+                nameFlag = false;
+            }
+
+            boolean dirFlag = false;
+            if (((CgroupCommon)o).dir != null && this.dir != null) {
+                dirFlag = ((CgroupCommon)o).dir.equals(this.dir);
+            } else if (((CgroupCommon)o).dir == null && this.dir == null) {
+                dirFlag = true;
+            } else {
+                dirFlag = false;
+            }
+            ret = hierarchyFlag && nameFlag && dirFlag;
+        }
+        return ret;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (this.name != null ? this.name.hashCode() : 0);
+        result = prime * result + (this.hierarchy != null ? this.hierarchy.hashCode() : 0);
+        result = prime * result + (this.dir != null ? this.dir.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return this.getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
new file mode 100755
index 0000000..eecba69
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface CgroupCommonOperation {
+
+    /**
+     * add task to cgroup
+     * @param taskid task id of task to add
+     */
+    public void addTask(int taskid) throws IOException;
+
+    /**
+     * Get a list of task ids running in CGroup
+     */
+    public Set<Integer> getTasks() throws IOException;
+
+    /**
+     * add a process to cgroup
+     * @param pid the PID of the process to add
+     */
+    public void addProcs(int pid) throws IOException;
+
+    /**
+     * get the PIDs of processes running in cgroup
+     */
+    public Set<Long> getPids() throws IOException;
+
+    /**
+     * to set notify_on_release config in cgroup
+     */
+    public void setNotifyOnRelease(boolean flag) throws IOException;
+
+    /**
+     * to get the notify_on_release config
+     */
+    public boolean getNotifyOnRelease() throws IOException;
+
+    /**
+     * set a command for the release agent to execute
+     */
+    public void setReleaseAgent(String command) throws IOException;
+
+    /**
+     * get the command for the relase agent to execute
+     */
+    public String getReleaseAgent() throws IOException;
+
+    /**
+     * Set the cgroup.clone_children config
+     */
+    public void setCgroupCloneChildren(boolean flag) throws IOException;
+
+    /**
+     * get the cgroup.clone_children config
+     */
+    public boolean getCgroupCloneChildren() throws IOException;
+
+    /**
+     * set event control config
+     */
+    public void setEventControl(String eventFd, String controlFd, String... args) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
new file mode 100755
index 0000000..4d35c77
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.container.cgroup.core.BlkioCore;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.container.cgroup.core.CpuacctCore;
+import org.apache.storm.container.cgroup.core.CpusetCore;
+import org.apache.storm.container.cgroup.core.DevicesCore;
+import org.apache.storm.container.cgroup.core.FreezerCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.container.cgroup.core.NetClsCore;
+import org.apache.storm.container.cgroup.core.NetPrioCore;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCoreFactory {
+
+    public static CgroupCore getInstance(SubSystemType type, String dir) {
+        switch (type) {
+        case blkio:
+            return new BlkioCore(dir);
+        case cpuacct:
+            return new CpuacctCore(dir);
+        case cpuset:
+            return new CpusetCore(dir);
+        case cpu:
+            return new CpuCore(dir);
+        case devices:
+            return new DevicesCore(dir);
+        case freezer:
+            return new FreezerCore(dir);
+        case memory:
+            return new MemoryCore(dir);
+        case net_cls:
+            return new NetClsCore(dir);
+        case net_prio:
+            return new NetPrioCore(dir);
+        default:
+           return null;
+        }
+    }
+
+    public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) {
+        Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>();
+        for (SubSystemType type : types) {
+            CgroupCore inst = getInstance(type, dir);
+            if (inst != null) {
+                result.put(type, inst);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
new file mode 100755
index 0000000..00ac9fd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An interface to implement the basic functions to manage cgroups such as mount and mounting a hiearchy
+ * and creating cgroups.  Also contains functions to access basic information of cgroups.
+ */
+public interface CgroupOperation {
+
+    /**
+     * Get a list of hierarchies
+     */
+    public List<Hierarchy> getHierarchies();
+
+    /**
+     * get a list of available subsystems
+     */
+    public Set<SubSystem> getSubSystems();
+
+    /**
+     * Check if a subsystem is enabled
+     */
+    public boolean isSubSystemEnabled(SubSystemType subsystem);
+
+    /**
+     * get the first hierarchy that has a certain subsystem isMounted
+     */
+    public Hierarchy getHierarchyWithSubSystem(SubSystemType subsystem);
+
+    /**
+     * get the first hierarchy that has a certain list of subsystems isMounted
+     */
+    public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> subSystems);
+
+    /**
+     * check if a hiearchy is mounted
+     */
+    public boolean isMounted(Hierarchy hierarchy);
+
+    /**
+     * mount a hierarchy
+     */
+    public void mount(Hierarchy hierarchy) throws IOException;
+
+    /**
+     * umount a heirarchy
+     */
+    public void umount(Hierarchy hierarchy) throws IOException;
+
+    /**
+     * create a cgroup
+     */
+    public void createCgroup(CgroupCommon cgroup) throws SecurityException;
+
+    /**
+     * delete a cgroup
+     */
+    public void deleteCgroup(CgroupCommon cgroup) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
new file mode 100644
index 0000000..5a4744c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import com.google.common.io.Files;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class CgroupUtils {
+
+    public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
+    public static final String MOUNT_STATUS_FILE = "/proc/mounts";
+
+    private static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class);
+
+    public static void deleteDir(String dir) {
+        File d = new File(dir);
+        if (!d.exists()) {
+            LOG.warn("dir {} does not exist!", dir);
+            return;
+        }
+        if (!d.isDirectory()) {
+            throw new RuntimeException("dir " + dir + " is not a directory!");
+        }
+        if (!d.delete()) {
+            throw new RuntimeException("Cannot delete dir " + dir);
+        }
+    }
+
+    /**
+     * Get a set of SubSystemType objects from a comma delimited list of subsystem names
+     */
+    public static Set<SubSystemType> getSubSystemsFromString(String str) {
+        Set<SubSystemType> result = new HashSet<SubSystemType>();
+        String[] subSystems = str.split(",");
+        for (String subSystem : subSystems) {
+            //return null to mount options in string that is not part of cgroups
+            SubSystemType type = SubSystemType.getSubSystem(subSystem);
+            if (type != null) {
+                result.add(type);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get a string that is a comma delimited list of subsystems
+     */
+    public static String subSystemsToString(Set<SubSystemType> subSystems) {
+        StringBuilder sb = new StringBuilder();
+        if (subSystems.size() == 0) {
+            return sb.toString();
+        }
+        for (SubSystemType type : subSystems) {
+            sb.append(type.name()).append(",");
+        }
+        return sb.toString().substring(0, sb.length() - 1);
+    }
+
+    public static boolean enabled() {
+        return Utils.checkFileExists(CGROUP_STATUS_FILE);
+    }
+
+    public static List<String> readFileByLine(String filePath) throws IOException {
+        return Files.readLines(new File(filePath), Charset.defaultCharset());
+    }
+
+    public static void writeFileByLine(String filePath, List<String> linesToWrite) throws IOException {
+        LOG.debug("For CGroups - writing {} to {} ", linesToWrite, filePath);
+        File file = new File(filePath);
+        if (!file.exists()) {
+            LOG.error("{} does not exist", filePath);
+            return;
+        }
+        try (FileWriter writer = new FileWriter(file, true);
+             BufferedWriter bw = new BufferedWriter(writer)) {
+            for (String string : linesToWrite) {
+                bw.write(string);
+                bw.newLine();
+                bw.flush();
+            }
+        }
+    }
+
+    public static void writeFileByLine(String filePath, String lineToWrite) throws IOException {
+        writeFileByLine(filePath, Arrays.asList(lineToWrite));
+    }
+
+    public static String getDir(String dir, String constant) {
+        return dir + constant;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java
new file mode 100755
index 0000000..57eb8ff
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+/**
+ * a class that represents a device in linux
+ */
+public class Device {
+
+    public final int major;
+    public final int minor;
+
+    public Device(int major, int minor) {
+        this.major = major;
+        this.minor = minor;
+    }
+
+    public Device(String str) {
+        String[] strArgs = str.split(":");
+        this.major = Integer.valueOf(strArgs[0]);
+        this.minor = Integer.valueOf(strArgs[1]);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(major).append(":").append(minor);
+        return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + major;
+        result = prime * result + minor;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Device other = (Device) obj;
+        if (major != other.major) {
+            return false;
+        }
+        if (minor != other.minor) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
new file mode 100755
index 0000000..b2b245c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.util.Set;
+
+/**
+ * A class that describes a cgroup hierarchy
+ */
+public class Hierarchy {
+
+    private final String name;
+
+    private final Set<SubSystemType> subSystems;
+
+    private final String type;
+
+    private final String dir;
+
+    private final CgroupCommon rootCgroups;
+
+    public Hierarchy(String name, Set<SubSystemType> subSystems, String dir) {
+        this.name = name;
+        this.subSystems = subSystems;
+        this.dir = dir;
+        this.rootCgroups = new CgroupCommon(this, dir);
+        this.type = CgroupUtils.subSystemsToString(subSystems);
+    }
+
+    /**
+     * get subsystems
+     */
+    public Set<SubSystemType> getSubSystems() {
+        return subSystems;
+    }
+
+    /**
+     * get all subsystems in hierarchy as a comma delimited list
+     */
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((dir == null) ? 0 : dir.hashCode());
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((type == null) ? 0 : type.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Hierarchy other = (Hierarchy) obj;
+        if (dir == null) {
+            if (other.dir != null) {
+                return false;
+            }
+        } else if (!dir.equals(other.dir)) {
+            return false;
+        }
+        if (name == null) {
+            if (other.name != null) {
+                return false;
+            }
+        } else if (!name.equals(other.name)) {
+            return false;
+        }
+        if (type == null) {
+            if (other.type != null) {
+                return false;
+            }
+        } else if (!type.equals(other.type)) {
+            return false;
+        }
+        return true;
+    }
+
+    public String getDir() {
+        return dir;
+    }
+
+    public CgroupCommon getRootCgroups() {
+        return rootCgroups;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean isSubSystemMounted(SubSystemType subsystem) {
+        for (SubSystemType type : this.subSystems) {
+            if (type == subsystem) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return this.dir;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
new file mode 100755
index 0000000..e354fb0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+/**
+ * a class that implements operations that can be performed on a cgroup subsystem
+ */
+public class SubSystem {
+
+    private SubSystemType type;
+
+    private int hierarchyID;
+
+    private int cgroupsNum;
+
+    private boolean enable;
+
+    public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, boolean enable) {
+        this.type = type;
+        this.hierarchyID = hierarchyID;
+        this.cgroupsNum = cgroupNum;
+        this.enable = enable;
+    }
+
+    public SubSystemType getType() {
+        return type;
+    }
+
+    public void setType(SubSystemType type) {
+        this.type = type;
+    }
+
+    public int getHierarchyID() {
+        return hierarchyID;
+    }
+
+    public void setHierarchyID(int hierarchyID) {
+        this.hierarchyID = hierarchyID;
+    }
+
+    public int getCgroupsNum() {
+        return cgroupsNum;
+    }
+
+    public void setCgroupsNum(int cgroupsNum) {
+        this.cgroupsNum = cgroupsNum;
+    }
+
+    public boolean isEnable() {
+        return enable;
+    }
+
+    public void setEnable(boolean enable) {
+        this.enable = enable;
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        boolean ret = false;
+        if (object != null && object instanceof SubSystem) {
+            ret = ((this.type == ((SubSystem)object).getType())
+                    && (this.hierarchyID == ((SubSystem)object).getHierarchyID()));
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
new file mode 100755
index 0000000..914abcc
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+/**
+ * A enum class to described the subsystems that can be used
+ */
+public enum SubSystemType {
+
+    // net_cls,ns is not supported in ubuntu
+    blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio;
+
+
+    public static SubSystemType getSubSystem(String str) {
+        try {
+            return SubSystemType.valueOf(str);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
new file mode 100644
index 0000000..8ebd763
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.cgroup;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ * A class that implements system operations for using cgroups
+ */
+public class SystemOperation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
+
+    public static boolean isRoot() throws IOException {
+        String result = SystemOperation.exec("echo $EUID").substring(0, 1);
+        return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false;
+    }
+
+    public static void mount(String name, String target, String type, String options) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append("mount -t ")
+                .append(type)
+                .append(" -o ")
+                .append(options)
+                .append(" ")
+                .append(name)
+                .append(" ")
+                .append(target);
+        SystemOperation.exec(sb.toString());
+    }
+
+    public static void umount(String pathToDir) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        sb.append("umount ").append(pathToDir);
+        SystemOperation.exec(sb.toString());
+    }
+
+    public static String exec(String cmd) throws IOException {
+        LOG.debug("Shell cmd: {}", cmd);
+        Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start();
+        try {
+            process.waitFor();
+            String output = IOUtils.toString(process.getInputStream());
+            String errorOutput = IOUtils.toString(process.getErrorStream());
+            LOG.debug("Shell Output: {}", output);
+            if (errorOutput.length() != 0) {
+                LOG.error("Shell Error Output: {}", errorOutput);
+                throw new IOException(errorOutput);
+            }
+            return output;
+        } catch (InterruptedException | ClosedByInterruptException ie) {
+            throw new IOException(ie);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
new file mode 100755
index 0000000..c426610
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BlkioCore implements CgroupCore {
+
+    public static final String BLKIO_WEIGHT = "/blkio.weight";
+    public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device";
+    public static final String BLKIO_RESET_STATS = "/blkio.reset_stats";
+
+    public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = "/blkio.throttle.read_bps_device";
+    public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = "/blkio.throttle.write_bps_device";
+    public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = "/blkio.throttle.read_iops_device";
+    public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = "/blkio.throttle.write_iops_device";
+
+    public static final String BLKIO_THROTTLE_IO_SERVICED = "/blkio.throttle.io_serviced";
+    public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = "/blkio.throttle.io_service_bytes";
+
+    public static final String BLKIO_TIME = "/blkio.time";
+    public static final String BLKIO_SECTORS = "/blkio.sectors";
+    public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced";
+    public static final String BLKIO_IO_SERVICE_BYTES = "/blkio.io_service_bytes";
+    public static final String BLKIO_IO_SERVICE_TIME = "/blkio.io_service_time";
+    public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time";
+    public static final String BLKIO_IO_MERGED = "/blkio.io_merged";
+    public static final String BLKIO_IO_QUEUED = "/blkio.io_queued";
+
+    private final String dir;
+
+    public BlkioCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.blkio;
+    }
+
+    /* weight: 100-1000 */
+    public void setBlkioWeight(int weight) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight));
+    }
+
+    public int getBlkioWeight() throws IOException {
+        return Integer.valueOf(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue();
+    }
+
+    public void setBlkioWeightDevice(Device device, int weight) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
+    }
+
+    public Map<Device, Integer> getBlkioWeightDevice() throws IOException {
+        List<String> strings = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
+        Map<Device, Integer> result = new HashMap<Device, Integer>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Integer weight = Integer.valueOf(strArgs[1]);
+            result.put(device, weight);
+        }
+        return result;
+    }
+
+    public void setReadBps(Device device, long bps) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
+    }
+
+    public Map<Device, Long> getReadBps() throws IOException {
+        return parseConfig(BLKIO_THROTTLE_READ_BPS_DEVICE);
+    }
+
+    public void setWriteBps(Device device, long bps) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
+    }
+
+    public Map<Device, Long> getWriteBps() throws IOException {
+        return parseConfig(BLKIO_THROTTLE_WRITE_BPS_DEVICE);
+    }
+
+    public void setReadIOps(Device device, long iops) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
+    }
+
+    public Map<Device, Long> getReadIOps() throws IOException {
+        return parseConfig(BLKIO_THROTTLE_READ_IOPS_DEVICE);
+    }
+
+    public void setWriteIOps(Device device, long iops) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
+    }
+
+    public Map<Device, Long> getWriteIOps() throws IOException {
+        return parseConfig(BLKIO_THROTTLE_WRITE_IOPS_DEVICE);
+    }
+
+    public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES)));
+    }
+
+    public Map<Device, Long> getBlkioTime() throws IOException {
+        return parseConfig(BLKIO_TIME);
+    }
+
+    public Map<Device, Long> getBlkioSectors() throws IOException {
+        return parseConfig(BLKIO_SECTORS);
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICED)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICE_BYTES)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICE_TIME)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_WAIT_TIME)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_MERGED)));
+    }
+
+    public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException {
+        return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_QUEUED)));
+    }
+
+    public void resetStats() throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_RESET_STATS), "1");
+    }
+
+    private String makeContext(Device device, Object data) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(device.toString()).append(" ").append(data);
+        return sb.toString();
+    }
+
+    private Map<Device, Long> parseConfig(String config) throws IOException {
+        List<String> strings = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, config));
+        Map<Device, Long> result = new HashMap<Device, Long>();
+        for (String string : strings) {
+            String[] strArgs = string.split(" ");
+            Device device = new Device(strArgs[0]);
+            Long value = Long.valueOf(strArgs[1]);
+            result.put(device, value);
+        }
+        return result;
+    }
+
+    private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) {
+        Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>();
+        for (String str : strs) {
+            String[] strArgs = str.split(" ");
+            if (strArgs.length != 3) {
+                continue;
+            }
+            Device device = new Device(strArgs[0]);
+            RecordType key = RecordType.getType(strArgs[1]);
+            Long value = Long.parseLong(strArgs[2]);
+            Map<RecordType, Long> record = result.get(device);
+            if (record == null) {
+                record = new HashMap<RecordType, Long>();
+                result.put(device, record);
+            }
+            record.put(key, value);
+        }
+        return result;
+    }
+
+    public enum RecordType {
+        read, write, sync, async, total;
+
+        public static RecordType getType(String type) {
+            try {
+                return RecordType.valueOf(type.toLowerCase());
+            } catch (Exception e) {
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
new file mode 100755
index 0000000..a6b098e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.SubSystemType;
+
+public interface CgroupCore {
+
+    public SubSystemType getType();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
new file mode 100755
index 0000000..1d21251
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CpuCore implements CgroupCore {
+
+    public static final String CPU_SHARES = "/cpu.shares";
+    public static final String CPU_RT_RUNTIME_US = "/cpu.rt_runtime_us";
+    public static final String CPU_RT_PERIOD_US = "/cpu.rt_period_us";
+    public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
+    public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
+    public static final String CPU_STAT = "/cpu.stat";
+
+    private final String dir;
+
+    public CpuCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.cpu;
+    }
+
+    public void setCpuShares(int weight) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
+    }
+
+    public int getCpuShares() throws IOException {
+        return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES)).get(0));
+    }
+
+    public void setCpuRtRuntimeUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us));
+    }
+
+    public long getCpuRtRuntimeUs() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0));
+    }
+
+    public void setCpuRtPeriodUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us));
+    }
+
+    public Long getCpuRtPeriodUs() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_PERIOD_US)).get(0));
+    }
+
+    public void setCpuCfsPeriodUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
+    }
+
+    public Long getCpuCfsPeriodUs() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0));
+    }
+
+    public void setCpuCfsQuotaUs(long us) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
+    }
+
+    public Long getCpuCfsQuotaUs() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0));
+    }
+
+    public Stat getCpuStat() throws IOException {
+        return new Stat(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_STAT)));
+    }
+
+    public static class Stat {
+        public final int nrPeriods;
+        public final int nrThrottled;
+        public final int throttledTime;
+
+        public Stat(List<String> statStr) {
+            this.nrPeriods = Integer.parseInt(statStr.get(0).split(" ")[1]);
+            this.nrThrottled = Integer.parseInt(statStr.get(1).split(" ")[1]);
+            this.throttledTime = Integer.parseInt(statStr.get(2).split(" ")[1]);
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + nrPeriods;
+            result = prime * result + nrThrottled;
+            result = prime * result + throttledTime;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            Stat other = (Stat) obj;
+            if (nrPeriods != other.nrPeriods) {
+                return false;
+            }
+            if (nrThrottled != other.nrThrottled) {
+                return false;
+            }
+            if (throttledTime != other.throttledTime) {
+                return false;
+            }
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
new file mode 100755
index 0000000..2e683f4
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CpuacctCore implements CgroupCore {
+
+    public static final String CPUACCT_USAGE = "/cpuacct.usage";
+    public static final String CPUACCT_STAT = "/cpuacct.stat";
+    public static final String CPUACCT_USAGE_PERCPU = "/cpuacct.usage_percpu";
+
+    private final String dir;
+
+    public CpuacctCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.cpuacct;
+    }
+
+    public Long getCpuUsage() throws IOException {
+        return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_USAGE)).get(0));
+    }
+
+    public Map<StatType, Long> getCpuStat() throws IOException {
+        List<String> strs = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_STAT));
+        Map<StatType, Long> result = new HashMap<StatType, Long>();
+        result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1]));
+        result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1]));
+        return result;
+    }
+
+    public Long[] getPerCpuUsage() throws IOException {
+        String str = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0);
+        String[] strArgs = str.split(" ");
+        Long[] result = new Long[strArgs.length];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = Long.parseLong(strArgs[i]);
+        }
+        return result;
+    }
+
+    public static enum StatType {
+        user, system;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
new file mode 100755
index 0000000..d089e95
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+public class CpusetCore implements CgroupCore {
+
+    public static final String CPUSET_CPUS = "/cpuset.cpus";
+    public static final String CPUSET_MEMS = "/cpuset.mems";
+    public static final String CPUSET_MEMORY_MIGRATE = "/cpuset.memory_migrate";
+    public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive";
+    public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive";
+    public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall";
+    public static final String CPUSET_MEMORY_PRESSURE = "/cpuset.memory_pressure";
+    public static final String CPUSET_MEMORY_PRESSURE_ENABLED = "/cpuset.memory_pressure_enabled";
+    public static final String CPUSET_MEMORY_SPREAD_PAGE = "/cpuset.memory_spread_page";
+    public static final String CPUSET_MEMORY_SPREAD_SLAB = "/cpuset.memory_spread_slab";
+    public static final String CPUSET_SCHED_LOAD_BALANCE = "/cpuset.sched_load_balance";
+    public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = "/cpuset.sched_relax_domain_level";
+
+    private final String dir;
+
+    public CpusetCore(String dir) {
+        this.dir = dir;
+    }
+
+    @Override
+    public SubSystemType getType() {
+        return SubSystemType.cpuset;
+    }
+
+    public void setCpus(int[] nums) throws IOException {
+        setConfigs(nums, CPUSET_CPUS);
+    }
+
+    public int[] getCpus() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPUS)).get(0);
+        return parseNums(output);
+    }
+
+    public void setMems(int[] nums) throws IOException {
+        setConfigs(nums, CPUSET_MEMS);
+    }
+
+    private void setConfigs(int[] nums, String config) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        for (int num : nums) {
+            sb.append(num);
+            sb.append(',');
+        }
+        sb.deleteCharAt(sb.length() - 1);
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, config), sb.toString());
+    }
+
+    public int[] getMems() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMS)).get(0);
+        return parseNums(output);
+    }
+
+    public void setMemMigrate(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemMigrate() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0));
+        return output > 0;
+    }
+
+    public void setCpuExclusive(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isCpuExclusive() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0));
+        return output > 0;
+    }
+
+    public void setMemExclusive(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemExclusive() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0));
+        return output > 0;
+    }
+
+    public void setMemHardwall(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemHardwall() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
+        return output > 0;
+    }
+
+    public int getMemPressure() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0);
+        return Integer.parseInt(output);
+    }
+
+    public void setMemPressureEnabled(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemPressureEnabled() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
+        return output > 0;
+    }
+
+    public void setMemSpreadPage(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemSpreadPage() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0));
+        return output > 0;
+    }
+
+    public void setMemSpreadSlab(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isMemSpreadSlab() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0));
+        return output > 0;
+    }
+
+    public void setSchedLoadBlance(boolean flag) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
+    }
+
+    public boolean isSchedLoadBlance() throws IOException {
+        int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0));
+        return output > 0;
+    }
+
+    public void setSchedRelaxDomainLevel(int value) throws IOException {
+        CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
+    }
+
+    public int getSchedRelaxDomainLevel() throws IOException {
+        String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
+        return Integer.parseInt(output);
+    }
+
+    public static int[] parseNums(String outputStr) {
+        char[] output = outputStr.toCharArray();
+        LinkedList<Integer> numList = new LinkedList<Integer>();
+        int value = 0;
+        int start = 0;
+        boolean isHyphen = false;
+        for (char ch : output) {
+            if (ch == ',') {
+                if (isHyphen) {
+                    for (; start <= value; start++) {
+                        numList.add(start);
+                    }
+                    isHyphen = false;
+                } else {
+                    numList.add(value);
+                }
+                value = 0;
+            } else if (ch == '-') {
+                isHyphen = true;
+                start = value;
+                value = 0;
+            } else {
+                value = value * 10 + (ch - '0');
+            }
+        }
+        if (output[output.length - 1] != ',') {
+            if (isHyphen) {
+                for (; start <= value; start++) {
+                    numList.add(start);
+                }
+            } else {
+                numList.add(value);
+            }
+        }
+
+        int[] nums = new int[numList.size()];
+        int index = 0;
+        for (int num : numList) {
+            nums[index] = num;
+            index++;
+        }
+        return nums;
+    }
+}