You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:32 UTC
[43/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
new file mode 100644
index 0000000..b255dad
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.ClientZookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ZKStateStorage implements IStateStorage {
+
+ private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
+
+ private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>();
+ private CuratorFramework zkWriter;
+ private CuratorFramework zkReader;
+ private AtomicBoolean active;
+
+ private boolean isNimbus;
+ private Map authConf;
+ private Map<Object, Object> conf;
+
+ private class ZkWatcherCallBack implements WatcherCallBack{
+ @Override
+ public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+ if (active.get()) {
+ if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+ LOG.debug("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+ } else {
+ LOG.debug("Received event {} : {} : {}", state, type, path);
+ }
+
+ if (!type.equals(Watcher.Event.EventType.None)) {
+ for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+ ZKStateChangedCallback fn = e.getValue();
+ fn.changed(type, path);
+ }
+ }
+ }
+ }
+ }
+
+ public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ this.conf = conf;
+ this.authConf = authConf;
+ if (context.getDaemonType().equals(DaemonType.NIMBUS))
+ this.isNimbus = true;
+
+ // just mkdir STORM_ZOOKEEPER_ROOT dir
+ CuratorFramework zkTemp = mkZk();
+ String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+ ClientZookeeper.mkdirs(zkTemp, rootPath, acls);
+ zkTemp.close();
+
+ active = new AtomicBoolean(true);
+ zkWriter = mkZk(new ZkWatcherCallBack());
+ if (isNimbus) {
+ zkReader = mkZk(new ZkWatcherCallBack());
+ } else {
+ zkReader = zkWriter;
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorFramework mkZk() throws IOException {
+ return ClientZookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
+ new DefaultWatcherCallBack(), authConf);
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+ return ClientZookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
+ String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+ }
+
+ @Override
+ public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+ ClientZookeeper.deleteNodeBlobstore(zkWriter, path, nimbusHostPortInfo);
+ }
+
+ @Override
+ public String register(ZKStateChangedCallback callback) {
+ String id = UUID.randomUUID().toString();
+ this.callbacks.put(id, callback);
+ return id;
+ }
+
+ @Override
+ public void unregister(String id) {
+ this.callbacks.remove(id);
+ }
+
+ @Override
+ public String create_sequential(String path, byte[] data, List<ACL> acls) {
+ return ClientZookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
+ }
+
+ @Override
+ public void mkdirs(String path, List<ACL> acls) {
+ ClientZookeeper.mkdirs(zkWriter, path, acls);
+ }
+
+ @Override
+ public void delete_node(String path) {
+ ClientZookeeper.deleteNode(zkWriter, path);
+ }
+
+ @Override
+ public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+ ClientZookeeper.mkdirs(zkWriter, ClientZookeeper.parentPath(path), acls);
+ if (ClientZookeeper.exists(zkWriter, path, false)) {
+ try {
+ ClientZookeeper.setData(zkWriter, path, data);
+ } catch (RuntimeException e) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+ ClientZookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+ } else {
+ throw e;
+ }
+ }
+
+ } else {
+ ClientZookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+ }
+ }
+
+ @Override
+ public Integer get_version(String path, boolean watch) throws Exception {
+ Integer ret = ClientZookeeper.getVersion(zkReader, path, watch);
+ return ret;
+ }
+
+ @Override
+ public boolean node_exists(String path, boolean watch) {
+ return ClientZookeeper.existsNode(zkReader, path, watch);
+ }
+
+ @Override
+ public List<String> get_children(String path, boolean watch) {
+ return ClientZookeeper.getChildren(zkReader, path, watch);
+ }
+
+ @Override
+ public void close() {
+ this.active.set(false);
+ zkWriter.close();
+ if (isNimbus) {
+ zkReader.close();
+ }
+ }
+
+ @Override
+ public void set_data(String path, byte[] data, List<ACL> acls) {
+ if (ClientZookeeper.exists(zkWriter, path, false)) {
+ ClientZookeeper.setData(zkWriter, path, data);
+ } else {
+ ClientZookeeper.mkdirs(zkWriter, ClientZookeeper.parentPath(path), acls);
+ try {
+ ClientZookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
+ } catch (RuntimeException e) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
+ ClientZookeeper.setData(zkWriter, path, data);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public byte[] get_data(String path, boolean watch) {
+ byte[] ret = null;
+
+ ret = ClientZookeeper.getData(zkReader, path, watch);
+
+ return ret;
+ }
+
+ @Override
+ public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
+ return ClientZookeeper.getDataWithVersion(zkReader, path, watch);
+ }
+
+ @Override
+ public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+ set_data(path, data, acls);
+ }
+
+ @Override
+ public byte[] get_worker_hb(String path, boolean watch) {
+ return ClientZookeeper.getData(zkReader, path, watch);
+ }
+
+ @Override
+ public List<String> get_worker_hb_children(String path, boolean watch) {
+ return get_children(path, watch);
+ }
+
+ @Override
+ public void delete_worker_hb(String path) {
+ delete_node(path);
+ }
+
+ @Override
+ public void add_listener(final ConnectionStateListener listener) {
+ ClientZookeeper.addListener(zkReader, new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ listener.stateChanged(curatorFramework, connectionState);
+ }
+ });
+ }
+
+ @Override
+ public void sync_path(String path) {
+ ClientZookeeper.syncPath(zkWriter, path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
new file mode 100644
index 0000000..3715e48
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+import java.util.Map;
+
+public class ZKStateStorageFactory implements StateStorageFactory {
+
+ @Override
+ public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
+ try {
+ return new ZKStateStorage(config, auth_conf, acls, context);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
new file mode 100644
index 0000000..ba6417f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCenter.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCenter implements CgroupOperation {
+
+ private static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class);
+
+ private static CgroupCenter instance;
+
+ private CgroupCenter() {
+
+ }
+
+ public synchronized static CgroupCenter getInstance() {
+ if (CgroupUtils.enabled()) {
+ instance = new CgroupCenter();
+ return instance;
+ }
+ return null;
+ }
+
+ @Override
+ public List<Hierarchy> getHierarchies() {
+ Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
+ try (FileReader reader = new FileReader(CgroupUtils.MOUNT_STATUS_FILE);
+ BufferedReader br = new BufferedReader(reader)) {
+ String str = null;
+ while ((str = br.readLine()) != null) {
+ String[] strSplit = str.split(" ");
+ if (!strSplit[2].equals("cgroup")) {
+ continue;
+ }
+ String name = strSplit[0];
+ String type = strSplit[3];
+ String dir = strSplit[1];
+ //Some mount options (i.e. rw and relatime) in type are not cgroups related
+ Hierarchy h = new Hierarchy(name, CgroupUtils.getSubSystemsFromString(type), dir);
+ hierarchies.put(type, h);
+ }
+ return new ArrayList<Hierarchy>(hierarchies.values());
+ } catch (Exception e) {
+ LOG.error("Get hierarchies error {}", e);
+ }
+ return null;
+ }
+
+ @Override
+ public Set<SubSystem> getSubSystems() {
+ Set<SubSystem> subSystems = new HashSet<SubSystem>();
+ try (FileReader reader = new FileReader(CgroupUtils.CGROUP_STATUS_FILE);
+ BufferedReader br = new BufferedReader(reader)){
+ String str = null;
+ while ((str = br.readLine()) != null) {
+ String[] split = str.split("\t");
+ SubSystemType type = SubSystemType.getSubSystem(split[0]);
+ if (type == null) {
+ continue;
+ }
+ int hierarchyID = Integer.valueOf(split[1]);
+ int cgroupNum = Integer.valueOf(split[2]);
+ boolean enable = Integer.valueOf(split[3]).intValue() == 1 ? true : false;
+ subSystems.add(new SubSystem(type, hierarchyID, cgroupNum, enable));
+ }
+ return subSystems;
+ } catch (Exception e) {
+ LOG.error("Get subSystems error {}", e);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isSubSystemEnabled(SubSystemType subSystemType) {
+ Set<SubSystem> subSystems = this.getSubSystems();
+ for (SubSystem subSystem : subSystems) {
+ if (subSystem.getType() == subSystemType) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Hierarchy getHierarchyWithSubSystem(SubSystemType subSystem) {
+ return getHierarchyWithSubSystems(Arrays.asList(subSystem));
+ }
+
+ @Override
+ public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> subSystems) {
+ List<Hierarchy> hierarchies = this.getHierarchies();
+ for (Hierarchy hierarchy : hierarchies) {
+ Hierarchy ret = hierarchy;
+ for (SubSystemType subSystem : subSystems) {
+ if (!hierarchy.getSubSystems().contains(subSystem)) {
+ ret = null;
+ break;
+ }
+ }
+ if (ret != null) {
+ return ret;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isMounted(Hierarchy hierarchy) {
+ if (Utils.checkDirExists(hierarchy.getDir())) {
+ List<Hierarchy> hierarchies = this.getHierarchies();
+ for (Hierarchy h : hierarchies) {
+ if (h.equals(hierarchy)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void mount(Hierarchy hierarchy) throws IOException {
+ if (this.isMounted(hierarchy)) {
+ LOG.error("{} is already mounted", hierarchy.getDir());
+ return;
+ }
+ Set<SubSystemType> subSystems = hierarchy.getSubSystems();
+ for (SubSystemType type : subSystems) {
+ Hierarchy hierarchyWithSubSystem = this.getHierarchyWithSubSystem(type);
+ if (hierarchyWithSubSystem != null) {
+ LOG.error("subSystem: {} is already mounted on hierarchy: {}", type.name(), hierarchyWithSubSystem);
+ subSystems.remove(type);
+ }
+ }
+ if (subSystems.size() == 0) {
+ return;
+ }
+ if (!Utils.checkDirExists(hierarchy.getDir())) {
+ new File(hierarchy.getDir()).mkdirs();
+ }
+ String subSystemsName = CgroupUtils.subSystemsToString(subSystems);
+ SystemOperation.mount(subSystemsName, hierarchy.getDir(), "cgroup", subSystemsName);
+
+ }
+
+ @Override
+ public void umount(Hierarchy hierarchy) throws IOException {
+ if (this.isMounted(hierarchy)) {
+ hierarchy.getRootCgroups().delete();
+ SystemOperation.umount(hierarchy.getDir());
+ CgroupUtils.deleteDir(hierarchy.getDir());
+ } else {
+ LOG.error("{} is not mounted", hierarchy.getDir());
+ }
+ }
+
+ @Override
+ public void createCgroup(CgroupCommon cgroup) throws SecurityException {
+ if (cgroup.isRoot()) {
+ LOG.error("You can't create rootCgroup in this function");
+ throw new RuntimeException("You can't create rootCgroup in this function");
+ }
+ CgroupCommon parent = cgroup.getParent();
+ while (parent != null) {
+ if (!Utils.checkDirExists(parent.getDir())) {
+ throw new RuntimeException("Parent " + parent.getDir() + "does not exist");
+ }
+ parent = parent.getParent();
+ }
+ Hierarchy h = cgroup.getHierarchy();
+ if (!isMounted(h)) {
+ throw new RuntimeException("hierarchy " + h.getDir() + " is not mounted");
+ }
+ if (Utils.checkDirExists(cgroup.getDir())) {
+ throw new RuntimeException("cgroup {} already exists " + cgroup.getDir());
+ }
+
+ if (!(new File(cgroup.getDir())).mkdir()) {
+ throw new RuntimeException("Could not create cgroup dir at " + cgroup.getDir());
+ }
+ }
+
+ @Override
+ public void deleteCgroup(CgroupCommon cgroup) throws IOException {
+ cgroup.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
new file mode 100755
index 0000000..c8bb304
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommon.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCommon implements CgroupCommonOperation {
+
+ public static final String TASKS = "/tasks";
+ public static final String NOTIFY_ON_RELEASE = "/notify_on_release";
+ public static final String RELEASE_AGENT = "/release_agent";
+ public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children";
+ public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control";
+ public static final String CGROUP_PROCS = "/cgroup.procs";
+
+ private final Hierarchy hierarchy;
+
+ private final String name;
+
+ private final String dir;
+
+ private final CgroupCommon parent;
+
+ private final boolean isRoot;
+
+ private static final Logger LOG = LoggerFactory.getLogger(CgroupCommon.class);
+
+ public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) {
+ this.name = parent.getName() + "/" + name;
+ this.hierarchy = hierarchy;
+ this.parent = parent;
+ this.dir = parent.getDir() + "/" + name;
+ this.isRoot = false;
+ }
+
+ /**
+ * rootCgroup
+ */
+ public CgroupCommon(Hierarchy hierarchy, String dir) {
+ this.name = "";
+ this.hierarchy = hierarchy;
+ this.parent = null;
+ this.dir = dir;
+ this.isRoot = true;
+ }
+
+ @Override
+ public void addTask(int taskId) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, TASKS), String.valueOf(taskId));
+ }
+
+ @Override
+ public Set<Integer> getTasks() throws IOException {
+ List<String> stringTasks = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, TASKS));
+ Set<Integer> tasks = new HashSet<Integer>();
+ for (String task : stringTasks) {
+ tasks.add(Integer.valueOf(task));
+ }
+ return tasks;
+ }
+
+ @Override
+ public void addProcs(int pid) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid));
+ }
+
+ @Override
+ public Set<Long> getPids() throws IOException {
+ List<String> stringPids = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_PROCS));
+ Set<Long> pids = new HashSet<>();
+ for (String task : stringPids) {
+ pids.add(Long.valueOf(task));
+ }
+ return pids;
+ }
+
+ @Override
+ public void setNotifyOnRelease(boolean flag) throws IOException {
+
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0");
+ }
+
+ @Override
+ public boolean getNotifyOnRelease() throws IOException {
+ return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
+ }
+
+ @Override
+ public void setReleaseAgent(String command) throws IOException {
+ if (!this.isRoot) {
+ LOG.warn("Cannot set {} in {} since its not the root group", RELEASE_AGENT, this.isRoot);
+ return;
+ }
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, RELEASE_AGENT), command);
+ }
+
+ @Override
+ public String getReleaseAgent() throws IOException {
+ if (!this.isRoot) {
+ LOG.warn("Cannot get {} in {} since its not the root group", RELEASE_AGENT, this.isRoot);
+ return null;
+ }
+ return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, RELEASE_AGENT)).get(0);
+ }
+
+ @Override
+ public void setCgroupCloneChildren(boolean flag) throws IOException {
+ if (!getCores().keySet().contains(SubSystemType.cpuset)) {
+ return;
+ }
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+ }
+
+ @Override
+ public boolean getCgroupCloneChildren() throws IOException {
+ return CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
+ }
+
+ @Override
+ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append(eventFd);
+ sb.append(' ');
+ sb.append(controlFd);
+ for (String arg : args) {
+ sb.append(' ');
+ sb.append(arg);
+ }
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString());
+ }
+
+ public Hierarchy getHierarchy() {
+ return hierarchy;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDir() {
+ return dir;
+ }
+
+ public CgroupCommon getParent() {
+ return parent;
+ }
+
+ public Set<CgroupCommon> getChildren() {
+
+ File file = new File(this.dir);
+ File[] files = file.listFiles();
+ if (files == null) {
+ LOG.info("{} is not a directory", this.dir);
+ return null;
+ }
+ Set<CgroupCommon> children = new HashSet<CgroupCommon>();
+ for (File child : files) {
+ if (child.isDirectory()) {
+ children.add(new CgroupCommon(child.getName(), this.hierarchy, this));
+ }
+ }
+ return children;
+ }
+
+ public boolean isRoot() {
+ return isRoot;
+ }
+
+ public Map<SubSystemType, CgroupCore> getCores() {
+ return CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir);
+ }
+
+ public void delete() throws IOException {
+ this.free();
+ if (!this.isRoot) {
+ this.parent.getChildren().remove(this);
+ }
+ }
+
+ private void free() throws IOException {
+ for (CgroupCommon child : getChildren()) {
+ child.free();
+ }
+ if (this.isRoot) {
+ return;
+ }
+ Set<Integer> tasks = this.getTasks();
+ if (tasks != null) {
+ for (Integer task : tasks) {
+ this.parent.addTask(task);
+ }
+ }
+ CgroupUtils.deleteDir(this.dir);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ boolean ret = false;
+ if (o != null && (o instanceof CgroupCommon)) {
+
+ boolean hierarchyFlag =false;
+ if (((CgroupCommon)o).hierarchy != null && this.hierarchy != null) {
+ hierarchyFlag = ((CgroupCommon)o).hierarchy.equals(this.hierarchy);
+ } else if (((CgroupCommon)o).hierarchy == null && this.hierarchy == null) {
+ hierarchyFlag = true;
+ } else {
+ hierarchyFlag = false;
+ }
+
+ boolean nameFlag = false;
+ if (((CgroupCommon)o).name != null && this.name != null) {
+ nameFlag = ((CgroupCommon)o).name.equals(this.name);
+ } else if (((CgroupCommon)o).name == null && this.name == null) {
+ nameFlag = true;
+ } else {
+ nameFlag = false;
+ }
+
+ boolean dirFlag = false;
+ if (((CgroupCommon)o).dir != null && this.dir != null) {
+ dirFlag = ((CgroupCommon)o).dir.equals(this.dir);
+ } else if (((CgroupCommon)o).dir == null && this.dir == null) {
+ dirFlag = true;
+ } else {
+ dirFlag = false;
+ }
+ ret = hierarchyFlag && nameFlag && dirFlag;
+ }
+ return ret;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (this.name != null ? this.name.hashCode() : 0);
+ result = prime * result + (this.hierarchy != null ? this.hierarchy.hashCode() : 0);
+ result = prime * result + (this.dir != null ? this.dir.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return this.getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
new file mode 100755
index 0000000..eecba69
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCommonOperation.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface CgroupCommonOperation {
+
+ /**
+ * add task to cgroup
+ * @param taskid task id of task to add
+ */
+ public void addTask(int taskid) throws IOException;
+
+ /**
+ * Get a list of task ids running in CGroup
+ */
+ public Set<Integer> getTasks() throws IOException;
+
+ /**
+ * add a process to cgroup
+ * @param pid the PID of the process to add
+ */
+ public void addProcs(int pid) throws IOException;
+
+ /**
+ * get the PIDs of processes running in cgroup
+ */
+ public Set<Long> getPids() throws IOException;
+
+ /**
+ * to set notify_on_release config in cgroup
+ */
+ public void setNotifyOnRelease(boolean flag) throws IOException;
+
+ /**
+ * to get the notify_on_release config
+ */
+ public boolean getNotifyOnRelease() throws IOException;
+
+ /**
+ * set a command for the release agent to execute
+ */
+ public void setReleaseAgent(String command) throws IOException;
+
+ /**
+ * get the command for the relase agent to execute
+ */
+ public String getReleaseAgent() throws IOException;
+
+ /**
+ * Set the cgroup.clone_children config
+ */
+ public void setCgroupCloneChildren(boolean flag) throws IOException;
+
+ /**
+ * get the cgroup.clone_children config
+ */
+ public boolean getCgroupCloneChildren() throws IOException;
+
+ /**
+ * set event control config
+ */
+ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
new file mode 100755
index 0000000..4d35c77
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupCoreFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import org.apache.storm.container.cgroup.core.BlkioCore;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.container.cgroup.core.CpuacctCore;
+import org.apache.storm.container.cgroup.core.CpusetCore;
+import org.apache.storm.container.cgroup.core.DevicesCore;
+import org.apache.storm.container.cgroup.core.FreezerCore;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.container.cgroup.core.NetClsCore;
+import org.apache.storm.container.cgroup.core.NetPrioCore;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCoreFactory {
+
+ public static CgroupCore getInstance(SubSystemType type, String dir) {
+ switch (type) {
+ case blkio:
+ return new BlkioCore(dir);
+ case cpuacct:
+ return new CpuacctCore(dir);
+ case cpuset:
+ return new CpusetCore(dir);
+ case cpu:
+ return new CpuCore(dir);
+ case devices:
+ return new DevicesCore(dir);
+ case freezer:
+ return new FreezerCore(dir);
+ case memory:
+ return new MemoryCore(dir);
+ case net_cls:
+ return new NetClsCore(dir);
+ case net_prio:
+ return new NetPrioCore(dir);
+ default:
+ return null;
+ }
+ }
+
+ public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) {
+ Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>();
+ for (SubSystemType type : types) {
+ CgroupCore inst = getInstance(type, dir);
+ if (inst != null) {
+ result.put(type, inst);
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
new file mode 100755
index 0000000..00ac9fd
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupOperation.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An interface to implement the basic functions to manage cgroups such as mount and mounting a hiearchy
+ * and creating cgroups. Also contains functions to access basic information of cgroups.
+ */
+public interface CgroupOperation {
+
+ /**
+ * Get a list of hierarchies
+ */
+ public List<Hierarchy> getHierarchies();
+
+ /**
+ * get a list of available subsystems
+ */
+ public Set<SubSystem> getSubSystems();
+
+ /**
+ * Check if a subsystem is enabled
+ */
+ public boolean isSubSystemEnabled(SubSystemType subsystem);
+
+ /**
+ * get the first hierarchy that has a certain subsystem isMounted
+ */
+ public Hierarchy getHierarchyWithSubSystem(SubSystemType subsystem);
+
+ /**
+ * get the first hierarchy that has a certain list of subsystems isMounted
+ */
+ public Hierarchy getHierarchyWithSubSystems(List<SubSystemType> subSystems);
+
+ /**
+ * check if a hiearchy is mounted
+ */
+ public boolean isMounted(Hierarchy hierarchy);
+
+ /**
+ * mount a hierarchy
+ */
+ public void mount(Hierarchy hierarchy) throws IOException;
+
+ /**
+ * umount a heirarchy
+ */
+ public void umount(Hierarchy hierarchy) throws IOException;
+
+ /**
+ * create a cgroup
+ */
+ public void createCgroup(CgroupCommon cgroup) throws SecurityException;
+
+ /**
+ * delete a cgroup
+ */
+ public void deleteCgroup(CgroupCommon cgroup) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
new file mode 100644
index 0000000..5a4744c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/CgroupUtils.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import com.google.common.io.Files;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class CgroupUtils {
+
+ public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
+ public static final String MOUNT_STATUS_FILE = "/proc/mounts";
+
+ private static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class);
+
+ public static void deleteDir(String dir) {
+ File d = new File(dir);
+ if (!d.exists()) {
+ LOG.warn("dir {} does not exist!", dir);
+ return;
+ }
+ if (!d.isDirectory()) {
+ throw new RuntimeException("dir " + dir + " is not a directory!");
+ }
+ if (!d.delete()) {
+ throw new RuntimeException("Cannot delete dir " + dir);
+ }
+ }
+
+ /**
+ * Get a set of SubSystemType objects from a comma delimited list of subsystem names
+ */
+ public static Set<SubSystemType> getSubSystemsFromString(String str) {
+ Set<SubSystemType> result = new HashSet<SubSystemType>();
+ String[] subSystems = str.split(",");
+ for (String subSystem : subSystems) {
+ //return null to mount options in string that is not part of cgroups
+ SubSystemType type = SubSystemType.getSubSystem(subSystem);
+ if (type != null) {
+ result.add(type);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get a string that is a comma delimited list of subsystems
+ */
+ public static String subSystemsToString(Set<SubSystemType> subSystems) {
+ StringBuilder sb = new StringBuilder();
+ if (subSystems.size() == 0) {
+ return sb.toString();
+ }
+ for (SubSystemType type : subSystems) {
+ sb.append(type.name()).append(",");
+ }
+ return sb.toString().substring(0, sb.length() - 1);
+ }
+
+ public static boolean enabled() {
+ return Utils.checkFileExists(CGROUP_STATUS_FILE);
+ }
+
+ public static List<String> readFileByLine(String filePath) throws IOException {
+ return Files.readLines(new File(filePath), Charset.defaultCharset());
+ }
+
+ public static void writeFileByLine(String filePath, List<String> linesToWrite) throws IOException {
+ LOG.debug("For CGroups - writing {} to {} ", linesToWrite, filePath);
+ File file = new File(filePath);
+ if (!file.exists()) {
+ LOG.error("{} does not exist", filePath);
+ return;
+ }
+ try (FileWriter writer = new FileWriter(file, true);
+ BufferedWriter bw = new BufferedWriter(writer)) {
+ for (String string : linesToWrite) {
+ bw.write(string);
+ bw.newLine();
+ bw.flush();
+ }
+ }
+ }
+
+ public static void writeFileByLine(String filePath, String lineToWrite) throws IOException {
+ writeFileByLine(filePath, Arrays.asList(lineToWrite));
+ }
+
+ public static String getDir(String dir, String constant) {
+ return dir + constant;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java
new file mode 100755
index 0000000..57eb8ff
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/Device.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+/**
+ * a class that represents a device in linux
+ */
+public class Device {
+
+ public final int major;
+ public final int minor;
+
+ public Device(int major, int minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public Device(String str) {
+ String[] strArgs = str.split(":");
+ this.major = Integer.valueOf(strArgs[0]);
+ this.minor = Integer.valueOf(strArgs[1]);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(major).append(":").append(minor);
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + major;
+ result = prime * result + minor;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Device other = (Device) obj;
+ if (major != other.major) {
+ return false;
+ }
+ if (minor != other.minor) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
new file mode 100755
index 0000000..b2b245c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/Hierarchy.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+import java.util.Set;
+
+/**
+ * A class that describes a cgroup hierarchy
+ */
+public class Hierarchy {
+
+ private final String name;
+
+ private final Set<SubSystemType> subSystems;
+
+ private final String type;
+
+ private final String dir;
+
+ private final CgroupCommon rootCgroups;
+
+ public Hierarchy(String name, Set<SubSystemType> subSystems, String dir) {
+ this.name = name;
+ this.subSystems = subSystems;
+ this.dir = dir;
+ this.rootCgroups = new CgroupCommon(this, dir);
+ this.type = CgroupUtils.subSystemsToString(subSystems);
+ }
+
+ /**
+ * get subsystems
+ */
+ public Set<SubSystemType> getSubSystems() {
+ return subSystems;
+ }
+
+ /**
+ * get all subsystems in hierarchy as a comma delimited list
+ */
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((dir == null) ? 0 : dir.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Hierarchy other = (Hierarchy) obj;
+ if (dir == null) {
+ if (other.dir != null) {
+ return false;
+ }
+ } else if (!dir.equals(other.dir)) {
+ return false;
+ }
+ if (name == null) {
+ if (other.name != null) {
+ return false;
+ }
+ } else if (!name.equals(other.name)) {
+ return false;
+ }
+ if (type == null) {
+ if (other.type != null) {
+ return false;
+ }
+ } else if (!type.equals(other.type)) {
+ return false;
+ }
+ return true;
+ }
+
+ public String getDir() {
+ return dir;
+ }
+
+ public CgroupCommon getRootCgroups() {
+ return rootCgroups;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isSubSystemMounted(SubSystemType subsystem) {
+ for (SubSystemType type : this.subSystems) {
+ if (type == subsystem) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return this.dir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
new file mode 100755
index 0000000..e354fb0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystem.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+/**
+ * a class that implements operations that can be performed on a cgroup subsystem
+ */
+public class SubSystem {
+
+ private SubSystemType type;
+
+ private int hierarchyID;
+
+ private int cgroupsNum;
+
+ private boolean enable;
+
+ public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, boolean enable) {
+ this.type = type;
+ this.hierarchyID = hierarchyID;
+ this.cgroupsNum = cgroupNum;
+ this.enable = enable;
+ }
+
+ public SubSystemType getType() {
+ return type;
+ }
+
+ public void setType(SubSystemType type) {
+ this.type = type;
+ }
+
+ public int getHierarchyID() {
+ return hierarchyID;
+ }
+
+ public void setHierarchyID(int hierarchyID) {
+ this.hierarchyID = hierarchyID;
+ }
+
+ public int getCgroupsNum() {
+ return cgroupsNum;
+ }
+
+ public void setCgroupsNum(int cgroupsNum) {
+ this.cgroupsNum = cgroupsNum;
+ }
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ boolean ret = false;
+ if (object != null && object instanceof SubSystem) {
+ ret = ((this.type == ((SubSystem)object).getType())
+ && (this.hierarchyID == ((SubSystem)object).getHierarchyID()));
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
new file mode 100755
index 0000000..914abcc
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/SubSystemType.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup;
+
+/**
+ * A enum class to described the subsystems that can be used
+ */
+public enum SubSystemType {
+
+ // net_cls,ns is not supported in ubuntu
+ blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio;
+
+
+ public static SubSystemType getSubSystem(String str) {
+ try {
+ return SubSystemType.valueOf(str);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
new file mode 100644
index 0000000..8ebd763
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/SystemOperation.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.cgroup;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ * A class that implements system operations for using cgroups
+ */
+public class SystemOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
+
+ public static boolean isRoot() throws IOException {
+ String result = SystemOperation.exec("echo $EUID").substring(0, 1);
+ return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false;
+ }
+
+ public static void mount(String name, String target, String type, String options) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("mount -t ")
+ .append(type)
+ .append(" -o ")
+ .append(options)
+ .append(" ")
+ .append(name)
+ .append(" ")
+ .append(target);
+ SystemOperation.exec(sb.toString());
+ }
+
+ public static void umount(String pathToDir) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("umount ").append(pathToDir);
+ SystemOperation.exec(sb.toString());
+ }
+
+ public static String exec(String cmd) throws IOException {
+ LOG.debug("Shell cmd: {}", cmd);
+ Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start();
+ try {
+ process.waitFor();
+ String output = IOUtils.toString(process.getInputStream());
+ String errorOutput = IOUtils.toString(process.getErrorStream());
+ LOG.debug("Shell Output: {}", output);
+ if (errorOutput.length() != 0) {
+ LOG.error("Shell Error Output: {}", errorOutput);
+ throw new IOException(errorOutput);
+ }
+ return output;
+ } catch (InterruptedException | ClosedByInterruptException ie) {
+ throw new IOException(ie);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
new file mode 100755
index 0000000..c426610
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/BlkioCore.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.Device;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BlkioCore implements CgroupCore {
+
+ public static final String BLKIO_WEIGHT = "/blkio.weight";
+ public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device";
+ public static final String BLKIO_RESET_STATS = "/blkio.reset_stats";
+
+ public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = "/blkio.throttle.read_bps_device";
+ public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = "/blkio.throttle.write_bps_device";
+ public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = "/blkio.throttle.read_iops_device";
+ public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = "/blkio.throttle.write_iops_device";
+
+ public static final String BLKIO_THROTTLE_IO_SERVICED = "/blkio.throttle.io_serviced";
+ public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = "/blkio.throttle.io_service_bytes";
+
+ public static final String BLKIO_TIME = "/blkio.time";
+ public static final String BLKIO_SECTORS = "/blkio.sectors";
+ public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced";
+ public static final String BLKIO_IO_SERVICE_BYTES = "/blkio.io_service_bytes";
+ public static final String BLKIO_IO_SERVICE_TIME = "/blkio.io_service_time";
+ public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time";
+ public static final String BLKIO_IO_MERGED = "/blkio.io_merged";
+ public static final String BLKIO_IO_QUEUED = "/blkio.io_queued";
+
+ private final String dir;
+
+ public BlkioCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.blkio;
+ }
+
+ /* weight: 100-1000 */
+ public void setBlkioWeight(int weight) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight));
+ }
+
+ public int getBlkioWeight() throws IOException {
+ return Integer.valueOf(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue();
+ }
+
+ public void setBlkioWeightDevice(Device device, int weight) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
+ }
+
+ public Map<Device, Integer> getBlkioWeightDevice() throws IOException {
+ List<String> strings = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
+ Map<Device, Integer> result = new HashMap<Device, Integer>();
+ for (String string : strings) {
+ String[] strArgs = string.split(" ");
+ Device device = new Device(strArgs[0]);
+ Integer weight = Integer.valueOf(strArgs[1]);
+ result.put(device, weight);
+ }
+ return result;
+ }
+
+ public void setReadBps(Device device, long bps) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
+ }
+
+ public Map<Device, Long> getReadBps() throws IOException {
+ return parseConfig(BLKIO_THROTTLE_READ_BPS_DEVICE);
+ }
+
+ public void setWriteBps(Device device, long bps) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
+ }
+
+ public Map<Device, Long> getWriteBps() throws IOException {
+ return parseConfig(BLKIO_THROTTLE_WRITE_BPS_DEVICE);
+ }
+
+ public void setReadIOps(Device device, long iops) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
+ }
+
+ public Map<Device, Long> getReadIOps() throws IOException {
+ return parseConfig(BLKIO_THROTTLE_READ_IOPS_DEVICE);
+ }
+
+ public void setWriteIOps(Device device, long iops) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
+ }
+
+ public Map<Device, Long> getWriteIOps() throws IOException {
+ return parseConfig(BLKIO_THROTTLE_WRITE_IOPS_DEVICE);
+ }
+
+ public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED)));
+ }
+
+ public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES)));
+ }
+
+ public Map<Device, Long> getBlkioTime() throws IOException {
+ return parseConfig(BLKIO_TIME);
+ }
+
+ public Map<Device, Long> getBlkioSectors() throws IOException {
+ return parseConfig(BLKIO_SECTORS);
+ }
+
+ public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICED)));
+ }
+
+ public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICE_BYTES)));
+ }
+
+ public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_SERVICE_TIME)));
+ }
+
+ public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_WAIT_TIME)));
+ }
+
+ public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_MERGED)));
+ }
+
+ public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, BLKIO_IO_QUEUED)));
+ }
+
+ public void resetStats() throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, BLKIO_RESET_STATS), "1");
+ }
+
+ private String makeContext(Device device, Object data) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(device.toString()).append(" ").append(data);
+ return sb.toString();
+ }
+
+ private Map<Device, Long> parseConfig(String config) throws IOException {
+ List<String> strings = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, config));
+ Map<Device, Long> result = new HashMap<Device, Long>();
+ for (String string : strings) {
+ String[] strArgs = string.split(" ");
+ Device device = new Device(strArgs[0]);
+ Long value = Long.valueOf(strArgs[1]);
+ result.put(device, value);
+ }
+ return result;
+ }
+
+ private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) {
+ Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>();
+ for (String str : strs) {
+ String[] strArgs = str.split(" ");
+ if (strArgs.length != 3) {
+ continue;
+ }
+ Device device = new Device(strArgs[0]);
+ RecordType key = RecordType.getType(strArgs[1]);
+ Long value = Long.parseLong(strArgs[2]);
+ Map<RecordType, Long> record = result.get(device);
+ if (record == null) {
+ record = new HashMap<RecordType, Long>();
+ result.put(device, record);
+ }
+ record.put(key, value);
+ }
+ return result;
+ }
+
+ public enum RecordType {
+ read, write, sync, async, total;
+
+ public static RecordType getType(String type) {
+ try {
+ return RecordType.valueOf(type.toLowerCase());
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
new file mode 100755
index 0000000..a6b098e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CgroupCore.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.SubSystemType;
+
+public interface CgroupCore {
+
+ public SubSystemType getType();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
new file mode 100755
index 0000000..1d21251
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CpuCore implements CgroupCore {
+
+ public static final String CPU_SHARES = "/cpu.shares";
+ public static final String CPU_RT_RUNTIME_US = "/cpu.rt_runtime_us";
+ public static final String CPU_RT_PERIOD_US = "/cpu.rt_period_us";
+ public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
+ public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
+ public static final String CPU_STAT = "/cpu.stat";
+
+ private final String dir;
+
+ public CpuCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.cpu;
+ }
+
+ public void setCpuShares(int weight) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
+ }
+
+ public int getCpuShares() throws IOException {
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_SHARES)).get(0));
+ }
+
+ public void setCpuRtRuntimeUs(long us) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us));
+ }
+
+ public long getCpuRtRuntimeUs() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0));
+ }
+
+ public void setCpuRtPeriodUs(long us) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us));
+ }
+
+ public Long getCpuRtPeriodUs() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_RT_PERIOD_US)).get(0));
+ }
+
+ public void setCpuCfsPeriodUs(long us) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
+ }
+
+ public Long getCpuCfsPeriodUs() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0));
+ }
+
+ public void setCpuCfsQuotaUs(long us) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
+ }
+
+ public Long getCpuCfsQuotaUs() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0));
+ }
+
+ public Stat getCpuStat() throws IOException {
+ return new Stat(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPU_STAT)));
+ }
+
+ public static class Stat {
+ public final int nrPeriods;
+ public final int nrThrottled;
+ public final int throttledTime;
+
+ public Stat(List<String> statStr) {
+ this.nrPeriods = Integer.parseInt(statStr.get(0).split(" ")[1]);
+ this.nrThrottled = Integer.parseInt(statStr.get(1).split(" ")[1]);
+ this.throttledTime = Integer.parseInt(statStr.get(2).split(" ")[1]);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + nrPeriods;
+ result = prime * result + nrThrottled;
+ result = prime * result + throttledTime;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Stat other = (Stat) obj;
+ if (nrPeriods != other.nrPeriods) {
+ return false;
+ }
+ if (nrThrottled != other.nrThrottled) {
+ return false;
+ }
+ if (throttledTime != other.throttledTime) {
+ return false;
+ }
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
new file mode 100755
index 0000000..2e683f4
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuacctCore.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CpuacctCore implements CgroupCore {
+
+ public static final String CPUACCT_USAGE = "/cpuacct.usage";
+ public static final String CPUACCT_STAT = "/cpuacct.stat";
+ public static final String CPUACCT_USAGE_PERCPU = "/cpuacct.usage_percpu";
+
+ private final String dir;
+
+ public CpuacctCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.cpuacct;
+ }
+
+ public Long getCpuUsage() throws IOException {
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_USAGE)).get(0));
+ }
+
+ public Map<StatType, Long> getCpuStat() throws IOException {
+ List<String> strs = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_STAT));
+ Map<StatType, Long> result = new HashMap<StatType, Long>();
+ result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1]));
+ result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1]));
+ return result;
+ }
+
+ public Long[] getPerCpuUsage() throws IOException {
+ String str = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0);
+ String[] strArgs = str.split(" ");
+ Long[] result = new Long[strArgs.length];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = Long.parseLong(strArgs[i]);
+ }
+ return result;
+ }
+
+ public static enum StatType {
+ user, system;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
new file mode 100755
index 0000000..d089e95
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.container.cgroup.core;
+
+import org.apache.storm.container.cgroup.CgroupUtils;
+import org.apache.storm.container.cgroup.SubSystemType;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+public class CpusetCore implements CgroupCore {
+
+ public static final String CPUSET_CPUS = "/cpuset.cpus";
+ public static final String CPUSET_MEMS = "/cpuset.mems";
+ public static final String CPUSET_MEMORY_MIGRATE = "/cpuset.memory_migrate";
+ public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive";
+ public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive";
+ public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall";
+ public static final String CPUSET_MEMORY_PRESSURE = "/cpuset.memory_pressure";
+ public static final String CPUSET_MEMORY_PRESSURE_ENABLED = "/cpuset.memory_pressure_enabled";
+ public static final String CPUSET_MEMORY_SPREAD_PAGE = "/cpuset.memory_spread_page";
+ public static final String CPUSET_MEMORY_SPREAD_SLAB = "/cpuset.memory_spread_slab";
+ public static final String CPUSET_SCHED_LOAD_BALANCE = "/cpuset.sched_load_balance";
+ public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = "/cpuset.sched_relax_domain_level";
+
+ private final String dir;
+
+ public CpusetCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public SubSystemType getType() {
+ return SubSystemType.cpuset;
+ }
+
+ public void setCpus(int[] nums) throws IOException {
+ setConfigs(nums, CPUSET_CPUS);
+ }
+
+ public int[] getCpus() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPUS)).get(0);
+ return parseNums(output);
+ }
+
+ public void setMems(int[] nums) throws IOException {
+ setConfigs(nums, CPUSET_MEMS);
+ }
+
+ private void setConfigs(int[] nums, String config) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ for (int num : nums) {
+ sb.append(num);
+ sb.append(',');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, config), sb.toString());
+ }
+
+ public int[] getMems() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMS)).get(0);
+ return parseNums(output);
+ }
+
+ public void setMemMigrate(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isMemMigrate() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0));
+ return output > 0;
+ }
+
+ public void setCpuExclusive(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isCpuExclusive() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0));
+ return output > 0;
+ }
+
+ public void setMemExclusive(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isMemExclusive() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0));
+ return output > 0;
+ }
+
+ public void setMemHardwall(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isMemHardwall() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
+ return output > 0;
+ }
+
+ public int getMemPressure() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0);
+ return Integer.parseInt(output);
+ }
+
+ public void setMemPressureEnabled(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isMemPressureEnabled() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
+ return output > 0;
+ }
+
+ public void setMemSpreadPage(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isMemSpreadPage() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0));
+ return output > 0;
+ }
+
+ public void setMemSpreadSlab(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isMemSpreadSlab() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0));
+ return output > 0;
+ }
+
+ public void setSchedLoadBlance(boolean flag) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
+ }
+
+ public boolean isSchedLoadBlance() throws IOException {
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0));
+ return output > 0;
+ }
+
+ public void setSchedRelaxDomainLevel(int value) throws IOException {
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
+ }
+
+ public int getSchedRelaxDomainLevel() throws IOException {
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
+ return Integer.parseInt(output);
+ }
+
+ public static int[] parseNums(String outputStr) {
+ char[] output = outputStr.toCharArray();
+ LinkedList<Integer> numList = new LinkedList<Integer>();
+ int value = 0;
+ int start = 0;
+ boolean isHyphen = false;
+ for (char ch : output) {
+ if (ch == ',') {
+ if (isHyphen) {
+ for (; start <= value; start++) {
+ numList.add(start);
+ }
+ isHyphen = false;
+ } else {
+ numList.add(value);
+ }
+ value = 0;
+ } else if (ch == '-') {
+ isHyphen = true;
+ start = value;
+ value = 0;
+ } else {
+ value = value * 10 + (ch - '0');
+ }
+ }
+ if (output[output.length - 1] != ',') {
+ if (isHyphen) {
+ for (; start <= value; start++) {
+ numList.add(start);
+ }
+ } else {
+ numList.add(value);
+ }
+ }
+
+ int[] nums = new int[numList.size()];
+ int index = 0;
+ for (int num : numList) {
+ nums[index] = num;
+ index++;
+ }
+ return nums;
+ }
+}