You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:37 UTC
[27/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
deleted file mode 100644
index 69ec1ed..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import java.util.Set;
-
-public class Hierarchy {
-
- private final String name;
-
- private final Set<ResourceType> resourceTypes;
-
- private final String type;
-
- private final String dir;
-
- private final CgroupCommon rootCgroups;
-
- public Hierarchy(String name, Set<ResourceType> resourceTypes, String dir) {
- this.name = name;
- this.resourceTypes = resourceTypes;
- this.dir = dir;
- this.rootCgroups = new CgroupCommon(this, dir);
- this.type = CgroupUtils.reAnalyse(resourceTypes);
- }
-
- public Set<ResourceType> getResourceTypes() {
- return resourceTypes;
- }
-
- public String getType() {
- return type;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((dir == null) ? 0 : dir.hashCode());
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- result = prime * result + ((type == null) ? 0 : type.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Hierarchy other = (Hierarchy) obj;
- if (dir == null) {
- if (other.dir != null)
- return false;
- } else if (!dir.equals(other.dir))
- return false;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- if (type == null) {
- if (other.type != null)
- return false;
- } else if (!type.equals(other.type))
- return false;
- return true;
- }
-
- public String getDir() {
- return dir;
- }
-
- public CgroupCommon getRootCgroups() {
- return rootCgroups;
- }
-
- public String getName() {
- return name;
- }
-
- public boolean subSystemMounted(ResourceType subsystem) {
- for (ResourceType type : this.resourceTypes) {
- if (type == subsystem)
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
deleted file mode 100644
index c2a1d42..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-public enum ResourceType {
-
- // net_cls,ns is not supposted in ubuntu
- blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio;
-
- public static ResourceType getResourceType(String str) {
- if (str.equals("cpu"))
- return cpu;
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
deleted file mode 100644
index 23e630c..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup.core;
-
-import io.gearpump.cluster.cgroup.ResourceType;
-
-public interface CgroupCore {
-
- public ResourceType getType();
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
deleted file mode 100644
index 3402d5a..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup.core;
-
-import io.gearpump.cluster.cgroup.CgroupUtils;
-import io.gearpump.cluster.cgroup.Constants;
-import io.gearpump.cluster.cgroup.ResourceType;
-
-import java.io.IOException;
-
-public class CpuCore implements CgroupCore {
-
- public static final String CPU_SHARES = "/cpu.shares";
- public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
- public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
-
- private final String dir;
-
- public CpuCore(String dir) {
- this.dir = dir;
- }
-
- @Override
- public ResourceType getType() {
- // TODO Auto-generated method stub
- return ResourceType.cpu;
- }
-
- public void setCpuShares(int weight) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
- }
-
- public int getCpuShares() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0));
- }
-
- public void setCpuCfsPeriodUs(long us) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
- }
-
- public void setCpuCfsQuotaUs(long us) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
deleted file mode 100644
index 0772133..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.utils;
-
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class SystemOperation {
-
- public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
-
- public static void mount(String name, String target, String type, String data) throws IOException {
- StringBuilder sb = new StringBuilder();
- sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target);
- SystemOperation.exec(sb.toString());
- }
-
- public static void umount(String name) throws IOException {
- StringBuilder sb = new StringBuilder();
- sb.append("umount ").append(name);
- SystemOperation.exec(sb.toString());
- }
-
- public static String exec(String cmd) throws IOException {
- LOG.debug("Shell cmd: " + cmd);
- Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start();
- try {
- process.waitFor();
- String output = IOUtils.toString(process.getInputStream());
- String errorOutput = IOUtils.toString(process.getErrorStream());
- LOG.debug("Shell Output: " + output);
- if (errorOutput.length() != 0) {
- LOG.error("Shell Error Output: " + errorOutput);
- throw new IOException(errorOutput);
- }
- return output;
- } catch (InterruptedException ie) {
- throw new IOException(ie.toString());
- }
- }
-
- public static void main(String[] args) throws IOException {
- SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java
new file mode 100644
index 0000000..a616593
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+public class CGroupResource {
+
+ private ResourceType type;
+
+ private int hierarchyID;
+
+ private int cgroupsNum;
+
+ private boolean enable;
+
+ public CGroupResource(ResourceType type, int hierarchyID, int cgroupNum, boolean enable) {
+ this.type = type;
+ this.hierarchyID = hierarchyID;
+ this.cgroupsNum = cgroupNum;
+ this.enable = enable;
+ }
+
+ public ResourceType getType() {
+ return type;
+ }
+
+ public void setType(ResourceType type) {
+ this.type = type;
+ }
+
+ public int getHierarchyID() {
+ return hierarchyID;
+ }
+
+ public void setHierarchyID(int hierarchyID) {
+ this.hierarchyID = hierarchyID;
+ }
+
+ public int getCgroupsNum() {
+ return cgroupsNum;
+ }
+
+ public void setCgroupsNum(int cgroupsNum) {
+ this.cgroupsNum = cgroupsNum;
+ }
+
+ public boolean isEnable() {
+ return enable;
+ }
+
+ public void setEnable(boolean enable) {
+ this.enable = enable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java
new file mode 100644
index 0000000..fb2ba65
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.utils.SystemOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class CgroupCenter implements CgroupOperation {
+
+ public static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class);
+
+ private static CgroupCenter instance;
+
+ private CgroupCenter() {
+
+ }
+
+ /**
+ * Thread unsafe
+ *
+ * @return
+ */
+ public synchronized static CgroupCenter getInstance() {
+ if (instance == null)
+ instance = new CgroupCenter();
+ return CgroupUtils.enabled() ? instance : null;
+ }
+
+ @Override
+ public List<Hierarchy> getHierarchies() {
+ // TODO Auto-generated method stub
+ Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
+ FileReader reader = null;
+ BufferedReader br = null;
+ try {
+ reader = new FileReader(Constants.MOUNT_STATUS_FILE);
+ br = new BufferedReader(reader);
+ String str = null;
+ while ((str = br.readLine()) != null) {
+ String[] strSplit = str.split(" ");
+ if (!strSplit[2].equals("cgroup"))
+ continue;
+ String name = strSplit[0];
+ String type = strSplit[3];
+ String dir = strSplit[1];
+ Hierarchy h = hierarchies.get(type);
+ h = new Hierarchy(name, CgroupUtils.analyse(type), dir);
+ hierarchies.put(type, h);
+ }
+ return new ArrayList<Hierarchy>(hierarchies.values());
+ } catch (Exception e) {
+ LOG.error("Get hierarchies error", e);
+ } finally {
+ CgroupUtils.close(reader, br);
+ }
+ return null;
+ }
+
+ @Override
+ public Set<CGroupResource> getCGroupResources() {
+ // TODO Auto-generated method stub
+ Set<CGroupResource> resources = new HashSet<CGroupResource>();
+ FileReader reader = null;
+ BufferedReader br = null;
+ try {
+ reader = new FileReader(Constants.CGROUP_STATUS_FILE);
+ br = new BufferedReader(reader);
+ String str = null;
+ while ((str = br.readLine()) != null) {
+ String[] split = str.split("\t");
+ ResourceType type = ResourceType.getResourceType(split[0]);
+ if (type == null)
+ continue;
+ resources.add(new CGroupResource(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true
+ : false));
+ }
+ return resources;
+ } catch (Exception e) {
+ LOG.error("Get subSystems error ", e);
+ } finally {
+ CgroupUtils.close(reader, br);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean enabled(ResourceType resourceType) {
+ // TODO Auto-generated method stub
+ Set<CGroupResource> resources = this.getCGroupResources();
+ for (CGroupResource resource : resources) {
+ if (resource.getType() == resourceType)
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Hierarchy busy(ResourceType resourceType) {
+ List<Hierarchy> hierarchies = this.getHierarchies();
+ for (Hierarchy hierarchy : hierarchies) {
+ for (ResourceType type : hierarchy.getResourceTypes()) {
+ if (type == resourceType)
+ return hierarchy;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Hierarchy mounted(Hierarchy hierarchy) {
+ // TODO Auto-generated method stub
+ List<Hierarchy> hierarchies = this.getHierarchies();
+ if (CgroupUtils.dirExists(hierarchy.getDir())) {
+ for (Hierarchy h : hierarchies) {
+ if (h.equals(hierarchy))
+ return h;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void mount(Hierarchy hierarchy) throws IOException {
+ // TODO Auto-generated method stub
+ if (this.mounted(hierarchy) != null) {
+ LOG.error(hierarchy.getDir() + " is mounted");
+ return;
+ }
+ Set<ResourceType> resourceTypes = hierarchy.getResourceTypes();
+ for (ResourceType type : resourceTypes) {
+ if (this.busy(type) != null) {
+ LOG.error("subsystem: " + type.name() + " is busy");
+ resourceTypes.remove(type);
+ }
+ }
+ if (resourceTypes.size() == 0)
+ return;
+ if (!CgroupUtils.dirExists(hierarchy.getDir()))
+ new File(hierarchy.getDir()).mkdirs();
+ String subSystems = CgroupUtils.reAnalyse(resourceTypes);
+ SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems);
+ }
+
+ @Override
+ public void umount(Hierarchy hierarchy) throws IOException {
+ // TODO Auto-generated method stub
+ if (this.mounted(hierarchy) != null) {
+ hierarchy.getRootCgroups().delete();
+ SystemOperation.umount(hierarchy.getDir());
+ CgroupUtils.deleteDir(hierarchy.getDir());
+ }
+ }
+
+ @Override
+ public void create(CgroupCommon cgroup) throws SecurityException {
+ // TODO Auto-generated method stub
+ if (cgroup.isRoot()) {
+ LOG.error("You can't create rootCgroup in this function");
+ return;
+ }
+ CgroupCommon parent = cgroup.getParent();
+ while (parent != null) {
+ if (!CgroupUtils.dirExists(parent.getDir())) {
+ LOG.error(parent.getDir() + "is not existed");
+ return;
+ }
+ parent = parent.getParent();
+ }
+ Hierarchy h = cgroup.getHierarchy();
+ if (mounted(h) == null) {
+ LOG.error(h.getDir() + " is not mounted");
+ return;
+ }
+ if (CgroupUtils.dirExists(cgroup.getDir())) {
+ LOG.error(cgroup.getDir() + " is existed");
+ return;
+ }
+ (new File(cgroup.getDir())).mkdir();
+ }
+
+ @Override
+ public void delete(CgroupCommon cgroup) throws IOException {
+ // TODO Auto-generated method stub
+ cgroup.delete();
+ }
+
+ public static void main(String args[]) {
+ System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java
new file mode 100644
index 0000000..2bccfec
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.cgroup.core.CgroupCore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCommon implements CgroupCommonOperation {
+
+ public static final String TASKS = "/tasks";
+ public static final String NOTIFY_ON_RELEASE = "/notify_on_release";
+ public static final String RELEASE_AGENT = "/release_agent";
+ public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children";
+ public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control";
+ public static final String CGROUP_PROCS = "/cgroup.procs";
+
+ private final Hierarchy hierarchy;
+
+ private final String name;
+
+ private final String dir;
+
+ private final CgroupCommon parent;
+
+ private final Map<ResourceType, CgroupCore> cores;
+
+ private final boolean isRoot;
+
+ private final Set<CgroupCommon> children = new HashSet<CgroupCommon>();
+
+ public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) {
+ this.name = parent.getName() + "/" + name;
+ this.hierarchy = hierarchy;
+ this.parent = parent;
+ this.dir = parent.getDir() + "/" + name;
+ this.init();
+ cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir);
+ this.isRoot = false;
+ }
+
+ /**
+ * rootCgroup
+ */
+ public CgroupCommon(Hierarchy hierarchy, String dir) {
+ this.name = "";
+ this.hierarchy = hierarchy;
+ this.parent = null;
+ this.dir = dir;
+ this.init();
+ cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir);
+ this.isRoot = true;
+ }
+
+ @Override
+ public void addTask(int taskId) throws IOException {
+ // TODO Auto-generated method stub
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId));
+ }
+
+ @Override
+ public Set<Integer> getTasks() throws IOException {
+ List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
+ Set<Integer> tasks = new HashSet<Integer>();
+ for (String task : stringTasks) {
+ tasks.add(Integer.valueOf(task));
+ }
+ return tasks;
+ }
+
+ @Override
+ public void addProcs(int pid) throws IOException {
+ // TODO Auto-generated method stub
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid));
+ }
+
+ @Override
+ public Set<Integer> getPids() throws IOException {
+ // TODO Auto-generated method stub
+ List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
+ Set<Integer> pids = new HashSet<Integer>();
+ for (String task : stringPids) {
+ pids.add(Integer.valueOf(task));
+ }
+ return pids;
+ }
+
+ @Override
+ public void setNotifyOnRelease(boolean flag) throws IOException {
+ // TODO Auto-generated method stub
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0");
+ }
+
+ @Override
+ public boolean getNotifyOnRelease() throws IOException {
+ return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
+ }
+
+ @Override
+ public void setReleaseAgent(String command) throws IOException {
+ // TODO Auto-generated method stub
+ if (!this.isRoot)
+ return;
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command);
+ }
+
+ @Override
+ public String getReleaseAgent() throws IOException {
+ if (!this.isRoot)
+ return null;
+ return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0);
+ }
+
+ @Override
+ public void setCgroupCloneChildren(boolean flag) throws IOException {
+ // TODO Auto-generated method stub
+ if (!this.cores.keySet().contains(ResourceType.cpuset))
+ return;
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+ }
+
+ @Override
+ public boolean getCgroupCloneChildren() throws IOException {
+ return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
+ }
+
+ @Override
+ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException {
+ // TODO Auto-generated method stub
+ StringBuilder sb = new StringBuilder();
+ sb.append(eventFd);
+ sb.append(' ');
+ sb.append(controlFd);
+ for (String arg : args) {
+ sb.append(' ');
+ sb.append(arg);
+ }
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString());
+ }
+
+ public Hierarchy getHierarchy() {
+ return hierarchy;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDir() {
+ return dir;
+ }
+
+ public CgroupCommon getParent() {
+ return parent;
+ }
+
+ public Set<CgroupCommon> getChildren() {
+ return children;
+ }
+
+ public boolean isRoot() {
+ return isRoot;
+ }
+
+ public Map<ResourceType, CgroupCore> getCores() {
+ return cores;
+ }
+
+ public void delete() throws IOException {
+ this.free();
+ if (!this.isRoot)
+ this.parent.getChildren().remove(this);
+ }
+
+ private void free() throws IOException {
+ for (CgroupCommon child : this.children)
+ child.free();
+ if (this.isRoot)
+ return;
+ Set<Integer> tasks = this.getTasks();
+ if (tasks != null) {
+ for (Integer task : tasks) {
+ this.parent.addTask(task);
+ }
+ }
+ CgroupUtils.deleteDir(this.dir);
+ }
+
+ private void init() {
+ File file = new File(this.dir);
+ File[] files = file.listFiles();
+ if (files == null)
+ return;
+ for (File child : files) {
+ if (child.isDirectory()) {
+ this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java
new file mode 100644
index 0000000..9ae923b
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface CgroupCommonOperation {
+
+ public void addTask(int taskid) throws IOException;
+
+ public Set<Integer> getTasks() throws IOException;
+
+ public void addProcs(int pid) throws IOException;
+
+ public Set<Integer> getPids() throws IOException;
+
+ public void setNotifyOnRelease(boolean flag) throws IOException;
+
+ public boolean getNotifyOnRelease() throws IOException;
+
+ public void setReleaseAgent(String command) throws IOException;
+
+ public String getReleaseAgent() throws IOException;
+
+ public void setCgroupCloneChildren(boolean flag) throws IOException;
+
+ public boolean getCgroupCloneChildren() throws IOException;
+
+ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java
new file mode 100644
index 0000000..15ebcbc
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.cgroup.core.CgroupCore;
+import org.apache.gearpump.cluster.cgroup.core.CpuCore;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCoreFactory {
+
+ public static Map<ResourceType, CgroupCore> getInstance(Set<ResourceType> types, String dir) {
+ Map<ResourceType, CgroupCore> result = new HashMap<ResourceType, CgroupCore>();
+ for (ResourceType type : types) {
+ switch (type) {
+ case cpu:
+ result.put(ResourceType.cpu, new CpuCore(dir));
+ break;
+ default:
+ break;
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java
new file mode 100644
index 0000000..399f3bd
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public interface CgroupOperation {
+
+ public List<Hierarchy> getHierarchies();
+
+ public Set<CGroupResource> getCGroupResources();
+
+ public boolean enabled(ResourceType subsystem);
+
+ public Hierarchy busy(ResourceType subsystem);
+
+ public Hierarchy mounted(Hierarchy hierarchy);
+
+ public void mount(Hierarchy hierarchy) throws IOException;
+
+ public void umount(Hierarchy hierarchy) throws IOException;
+
+ public void create(CgroupCommon cgroup) throws SecurityException;
+
+ public void delete(CgroupCommon cgroup) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java
new file mode 100644
index 0000000..8f199d9
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.utils.SystemOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class CgroupUtils {
+
+ public static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class);
+
+ public static void deleteDir(String dir) {
+ try {
+ String cmd = "rmdir " + dir;
+ SystemOperation.exec(cmd);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ LOG.error("rm " + dir + " fail!", e);
+ }
+ }
+
+ public static boolean fileExists(String dir) {
+ File file = new File(dir);
+ return file.exists();
+ }
+
+ public static boolean dirExists(String dir) {
+ File file = new File(dir);
+ return file.isDirectory();
+ }
+
+ public static Set<ResourceType> analyse(String str) {
+ Set<ResourceType> result = new HashSet<ResourceType>();
+ String[] subSystems = str.split(",");
+ for (String subSystem : subSystems) {
+ ResourceType type = ResourceType.getResourceType(subSystem);
+ if (type != null)
+ result.add(type);
+ }
+ return result;
+ }
+
+ public static String reAnalyse(Set<ResourceType> subSystems) {
+ StringBuilder sb = new StringBuilder();
+ if (subSystems.size() == 0)
+ return sb.toString();
+ for (ResourceType type : subSystems) {
+ sb.append(type.name()).append(",");
+ }
+ return sb.toString().substring(0, sb.length() - 1);
+ }
+
+ public static boolean enabled() {
+ return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
+ }
+
+ public static List<String> readFileByLine(String fileDir) throws IOException {
+ List<String> result = new ArrayList<String>();
+ FileReader fileReader = null;
+ BufferedReader reader = null;
+ try {
+ File file = new File(fileDir);
+ fileReader = new FileReader(file);
+ reader = new BufferedReader(fileReader);
+ String tempString = null;
+ while ((tempString = reader.readLine()) != null) {
+ result.add(tempString);
+ }
+ } finally {
+ CgroupUtils.close(fileReader, reader);
+ }
+ return result;
+ }
+
+ public static void writeFileByLine(String fileDir, List<String> strings) throws IOException {
+ FileWriter writer = null;
+ BufferedWriter bw = null;
+ try {
+ File file = new File(fileDir);
+ if (!file.exists()) {
+ LOG.error(fileDir + " is no existed");
+ return;
+ }
+ writer = new FileWriter(file, true);
+ bw = new BufferedWriter(writer);
+ for (String string : strings) {
+ bw.write(string);
+ bw.newLine();
+ bw.flush();
+ }
+ } finally {
+ CgroupUtils.close(writer, bw);
+ }
+ }
+
+ public static void writeFileByLine(String fileDir, String string) throws IOException {
+ FileWriter writer = null;
+ BufferedWriter bw = null;
+ try {
+ File file = new File(fileDir);
+ if (!file.exists()) {
+ LOG.error(fileDir + " is no existed");
+ return;
+ }
+ writer = new FileWriter(file, true);
+ bw = new BufferedWriter(writer);
+ bw.write(string);
+ bw.newLine();
+ bw.flush();
+ } finally {
+ CgroupUtils.close(writer, bw);
+ }
+ }
+
+ public static void close(FileReader reader, BufferedReader br) {
+ try {
+ if (reader != null)
+ reader.close();
+ if (br != null)
+ br.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+
+ }
+ }
+
+ public static void close(FileWriter writer, BufferedWriter bw) {
+ try {
+ if (writer != null)
+ writer.close();
+ if (bw != null)
+ bw.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java
new file mode 100644
index 0000000..fb905c7
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+public class Constants {
+
+ public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
+
+ public static final String MOUNT_STATUS_FILE = "/proc/mounts";
+
+ public static String getDir(String dir, String constant) {
+ return dir + constant;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java
new file mode 100644
index 0000000..446802f
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import java.util.Set;
+
+public class Hierarchy {
+
+ private final String name;
+
+ private final Set<ResourceType> resourceTypes;
+
+ private final String type;
+
+ private final String dir;
+
+ private final CgroupCommon rootCgroups;
+
+ public Hierarchy(String name, Set<ResourceType> resourceTypes, String dir) {
+ this.name = name;
+ this.resourceTypes = resourceTypes;
+ this.dir = dir;
+ this.rootCgroups = new CgroupCommon(this, dir);
+ this.type = CgroupUtils.reAnalyse(resourceTypes);
+ }
+
+ public Set<ResourceType> getResourceTypes() {
+ return resourceTypes;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((dir == null) ? 0 : dir.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Hierarchy other = (Hierarchy) obj;
+ if (dir == null) {
+ if (other.dir != null)
+ return false;
+ } else if (!dir.equals(other.dir))
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (type == null) {
+ if (other.type != null)
+ return false;
+ } else if (!type.equals(other.type))
+ return false;
+ return true;
+ }
+
+ public String getDir() {
+ return dir;
+ }
+
+ public CgroupCommon getRootCgroups() {
+ return rootCgroups;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean subSystemMounted(ResourceType subsystem) {
+ for (ResourceType type : this.resourceTypes) {
+ if (type == subsystem)
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java
new file mode 100644
index 0000000..c3360e6
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+public enum ResourceType {
+
+ // net_cls,ns is not supposted in ubuntu
+ blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio;
+
+ public static ResourceType getResourceType(String str) {
+ if (str.equals("cpu"))
+ return cpu;
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java
new file mode 100644
index 0000000..39c0999
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup.core;
+
+import org.apache.gearpump.cluster.cgroup.ResourceType;
+
+public interface CgroupCore {
+
+ public ResourceType getType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java
new file mode 100644
index 0000000..4b35f7f
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup.core;
+
+import org.apache.gearpump.cluster.cgroup.CgroupUtils;
+import org.apache.gearpump.cluster.cgroup.Constants;
+import org.apache.gearpump.cluster.cgroup.ResourceType;
+
+import java.io.IOException;
+
+public class CpuCore implements CgroupCore {
+
+ public static final String CPU_SHARES = "/cpu.shares";
+ public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
+ public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
+
+ private final String dir;
+
+ public CpuCore(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public ResourceType getType() {
+ // TODO Auto-generated method stub
+ return ResourceType.cpu;
+ }
+
+ public void setCpuShares(int weight) throws IOException {
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
+ }
+
+ public int getCpuShares() throws IOException {
+ return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0));
+ }
+
+ public void setCpuCfsPeriodUs(long us) throws IOException {
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
+ }
+
+ public void setCpuCfsQuotaUs(long us) throws IOException {
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
new file mode 100644
index 0000000..5b2a890
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SystemOperation {
+
+ public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
+
+ public static void mount(String name, String target, String type, String data) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target);
+ SystemOperation.exec(sb.toString());
+ }
+
+ public static void umount(String name) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("umount ").append(name);
+ SystemOperation.exec(sb.toString());
+ }
+
+ public static String exec(String cmd) throws IOException {
+ LOG.debug("Shell cmd: " + cmd);
+ Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start();
+ try {
+ process.waitFor();
+ String output = IOUtils.toString(process.getInputStream());
+ String errorOutput = IOUtils.toString(process.getErrorStream());
+ LOG.debug("Shell Output: " + output);
+ if (errorOutput.length() != 0) {
+ LOG.error("Shell Error Output: " + errorOutput);
+ throw new IOException(errorOutput);
+ }
+ return output;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie.toString());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
deleted file mode 100644
index ae7fb42..0000000
--- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.worker
-
-import com.typesafe.config.Config
-import org.apache.commons.lang.SystemUtils
-import org.slf4j.{Logger, LoggerFactory}
-
-import io.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore}
-import io.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType}
-import io.gearpump.cluster.worker.CGroupManager._
-
-class CGroupManager(config: Config) {
- private val center = CgroupCenter.getInstance()
- private val rootDir = CGroupManager.getCgroupRootDir(config)
- private var hierarchy: Hierarchy = null
- private var rootCgroup: CgroupCommon = null
-
- prepareSubSystem()
-
- private def prepareSubSystem(): Unit = {
- if (rootDir == null) {
- throw new RuntimeException(s"Check configuration file. The $CGROUP_ROOT is missing.")
- }
- if (center == null) {
- throw new RuntimeException("Cgroup error, please check /proc/cgroups")
- }
- hierarchy = center.busy(ResourceType.cpu)
- if (hierarchy == null) {
- val types = new java.util.HashSet[ResourceType]
- types.add(ResourceType.cpu)
- hierarchy = new Hierarchy(GEARPUMP_HIERARCHY_NAME, types, GEARPUMP_CPU_HIERARCHY_DIR)
- }
- rootCgroup = new CgroupCommon(rootDir, hierarchy, hierarchy.getRootCgroups)
- }
-
- private def validateCpuUpperLimitValue(value: Int): Int = {
- if (value > 10) {
- 10
- } else if (value < 1 && value != -1) {
- 1
- } else {
- value
- }
- }
-
- private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = {
- val _cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit)
- if (_cpuCoreUpperLimit == -1) {
- cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit)
- }
- else {
- cpuCore.setCpuCfsPeriodUs(100000)
- cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit * 100000)
- }
- }
-
- def startNewExecutor(config: Config, cpuNum: Int, appId: Int, executorId: Int): List[String] = {
- val groupName = getGroupName(appId, executorId)
- val workerGroup: CgroupCommon = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
- this.center.create(workerGroup)
- val cpu: CgroupCore = workerGroup.getCores.get(ResourceType.cpu)
- val cpuCore: CpuCore = cpu.asInstanceOf[CpuCore]
- cpuCore.setCpuShares(cpuNum * CGroupManager.ONE_CPU_SLOT)
- setCpuUsageUpperLimit(cpuCore, CGroupManager.getWorkerCpuCoreUpperLimit(config))
-
- val sb: StringBuilder = new StringBuilder
- sb.append("cgexec -g cpu:").append(workerGroup.getName).toString().split(" ").toList
- }
-
- def shutDownExecutor(appId: Int, executorId: Int): Unit = {
- val groupName = getGroupName(appId, executorId)
- val workerGroup = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
- center.delete(workerGroup)
- }
-
- def close(): Unit = {
- center.delete(rootCgroup)
- }
-
- private def getGroupName(appId: Int, executorId: Int): String = {
- "app" + appId + "executor" + executorId
- }
-}
-
-object CGroupManager {
- private val LOG: Logger = LoggerFactory.getLogger(getClass)
- private val CGROUP_ROOT = "gearpump.cgroup.root"
- private val Executor_CPU_CORE_UPPER_LIMIT = "gearpump.cgroup.cpu-core-limit-per-executor"
- private val GEARPUMP_HIERARCHY_NAME = "gearpump_cpu"
- private val GEARPUMP_CPU_HIERARCHY_DIR = "/cgroup/cpu"
- private val ONE_CPU_SLOT = 1024
-
- def getCgroupRootDir(config: Config): String = {
- config.getString(CGROUP_ROOT)
- }
-
- def getWorkerCpuCoreUpperLimit(config: Config): Int = {
- config.getInt(Executor_CPU_CORE_UPPER_LIMIT)
- }
-
- def getInstance(config: Config): Option[CGroupManager] = {
- if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC_OSX) {
- LOG.error(s"CGroup is not supported on Windows OS, Mac OS X")
- None
- } else {
- Some(new CGroupManager(config))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
deleted file mode 100644
index eb57a18..0000000
--- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.worker
-
-import java.io.File
-import scala.sys.process.Process
-
-import com.typesafe.config.Config
-import org.slf4j.{Logger, LoggerFactory}
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.{ProcessLogRedirector, RichProcess}
-
-/**
- * CGroupProcessLauncher is used to launch a process for Executor with CGroup.
- * For more details, please refer http://gearpump.io
- */
-class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
- private val APP_MASTER = -1
- private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config)
- private val LOG: Logger = LoggerFactory.getLogger(getClass)
-
- override def cleanProcess(appId: Int, executorId: Int): Unit = {
- if (executorId != APP_MASTER) {
- cgroupManager.foreach(_.shutDownExecutor(appId, executorId))
- }
- }
-
- override def createProcess(
- appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String],
- classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
- val cgroupCommand = if (executorId != APP_MASTER) {
- cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,
- executorId)).getOrElse(List.empty)
- } else List.empty
- LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " +
- s"classpath: ${classPath.mkString(File.pathSeparator)}")
-
- val java = System.getProperty("java.home") + "/bin/java"
- val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath
- .mkString(File.pathSeparator), mainClass) ++ arguments
- LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " +
- s"options: ${options.mkString(" ")}")
- val logger = new ProcessLogRedirector()
- val process = Process(command).run(logger)
- new RichProcess(process, logger)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala
new file mode 100644
index 0000000..24c169c
--- /dev/null
+++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import com.typesafe.config.Config
+import org.apache.commons.lang.SystemUtils
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore}
+import org.apache.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType}
+import org.apache.gearpump.cluster.worker.CGroupManager._
+
+class CGroupManager(config: Config) {
+ private val center = CgroupCenter.getInstance()
+ private val rootDir = CGroupManager.getCgroupRootDir(config)
+ private var hierarchy: Hierarchy = null
+ private var rootCgroup: CgroupCommon = null
+
+ prepareSubSystem()
+
+ private def prepareSubSystem(): Unit = {
+ if (rootDir == null) {
+ throw new RuntimeException(s"Check configuration file. The $CGROUP_ROOT is missing.")
+ }
+ if (center == null) {
+ throw new RuntimeException("Cgroup error, please check /proc/cgroups")
+ }
+ hierarchy = center.busy(ResourceType.cpu)
+ if (hierarchy == null) {
+ val types = new java.util.HashSet[ResourceType]
+ types.add(ResourceType.cpu)
+ hierarchy = new Hierarchy(GEARPUMP_HIERARCHY_NAME, types, GEARPUMP_CPU_HIERARCHY_DIR)
+ }
+ rootCgroup = new CgroupCommon(rootDir, hierarchy, hierarchy.getRootCgroups)
+ }
+
+ private def validateCpuUpperLimitValue(value: Int): Int = {
+ if (value > 10) {
+ 10
+ } else if (value < 1 && value != -1) {
+ 1
+ } else {
+ value
+ }
+ }
+
+ private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = {
+ val _cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit)
+ if (_cpuCoreUpperLimit == -1) {
+ cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit)
+ }
+ else {
+ cpuCore.setCpuCfsPeriodUs(100000)
+ cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit * 100000)
+ }
+ }
+
+ def startNewExecutor(config: Config, cpuNum: Int, appId: Int, executorId: Int): List[String] = {
+ val groupName = getGroupName(appId, executorId)
+ val workerGroup: CgroupCommon = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
+ this.center.create(workerGroup)
+ val cpu: CgroupCore = workerGroup.getCores.get(ResourceType.cpu)
+ val cpuCore: CpuCore = cpu.asInstanceOf[CpuCore]
+ cpuCore.setCpuShares(cpuNum * CGroupManager.ONE_CPU_SLOT)
+ setCpuUsageUpperLimit(cpuCore, CGroupManager.getWorkerCpuCoreUpperLimit(config))
+
+ val sb: StringBuilder = new StringBuilder
+ sb.append("cgexec -g cpu:").append(workerGroup.getName).toString().split(" ").toList
+ }
+
+ def shutDownExecutor(appId: Int, executorId: Int): Unit = {
+ val groupName = getGroupName(appId, executorId)
+ val workerGroup = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
+ center.delete(workerGroup)
+ }
+
+ def close(): Unit = {
+ center.delete(rootCgroup)
+ }
+
+ private def getGroupName(appId: Int, executorId: Int): String = {
+ "app" + appId + "executor" + executorId
+ }
+}
+
+object CGroupManager {
+ private val LOG: Logger = LoggerFactory.getLogger(getClass)
+ private val CGROUP_ROOT = "gearpump.cgroup.root"
+ private val Executor_CPU_CORE_UPPER_LIMIT = "gearpump.cgroup.cpu-core-limit-per-executor"
+ private val GEARPUMP_HIERARCHY_NAME = "gearpump_cpu"
+ private val GEARPUMP_CPU_HIERARCHY_DIR = "/cgroup/cpu"
+ private val ONE_CPU_SLOT = 1024
+
+ def getCgroupRootDir(config: Config): String = {
+ config.getString(CGROUP_ROOT)
+ }
+
+ def getWorkerCpuCoreUpperLimit(config: Config): Int = {
+ config.getInt(Executor_CPU_CORE_UPPER_LIMIT)
+ }
+
+ def getInstance(config: Config): Option[CGroupManager] = {
+ if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC_OSX) {
+ LOG.error(s"CGroup is not supported on Windows OS, Mac OS X")
+ None
+ } else {
+ Some(new CGroupManager(config))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
new file mode 100644
index 0000000..dc2eabd
--- /dev/null
+++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import scala.sys.process.Process
+
+import com.typesafe.config.Config
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{ProcessLogRedirector, RichProcess}
+
+/**
+ * CGroupProcessLauncher is used to launch a process for Executor with CGroup.
+ * For more details, please refer http://gearpump.io
+ */
+class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
+ private val APP_MASTER = -1
+ private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config)
+ private val LOG: Logger = LoggerFactory.getLogger(getClass)
+
+ override def cleanProcess(appId: Int, executorId: Int): Unit = {
+ if (executorId != APP_MASTER) {
+ cgroupManager.foreach(_.shutDownExecutor(appId, executorId))
+ }
+ }
+
+ override def createProcess(
+ appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String],
+ classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+ val cgroupCommand = if (executorId != APP_MASTER) {
+ cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,
+ executorId)).getOrElse(List.empty)
+ } else List.empty
+ LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " +
+ s"classpath: ${classPath.mkString(File.pathSeparator)}")
+
+ val java = System.getProperty("java.home") + "/bin/java"
+ val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath
+ .mkString(File.pathSeparator), mainClass) ++ arguments
+ LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " +
+ s"options: ${options.mkString(" ")}")
+ val logger = new ProcessLogRedirector()
+ val process = Process(command).run(logger)
+ new RichProcess(process, logger)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
deleted file mode 100644
index 510258d..0000000
--- a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util;
-
-import backtype.storm.utils.TimeCacheMap;
-
-/**
- * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression.
- */
-@SuppressWarnings("deprecation")
-public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> {
-
- public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) {
- super(expirationSecs, new ExpiredCallback<K, V>() {
-
- @Override
- public void expire(K key, V val) {
- callback.expire(key, val);
- }
- });
- }
-
- public static interface Callback<K, V> {
- public void expire(K key, V val);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
new file mode 100644
index 0000000..923883c
--- /dev/null
+++ b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util;
+
+import backtype.storm.utils.TimeCacheMap;
+
+/**
+ * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression.
+ */
+@SuppressWarnings("deprecation")
+public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> {
+
+ public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) {
+ super(expirationSecs, new ExpiredCallback<K, V>() {
+
+ @Override
+ public void expire(K key, V val) {
+ callback.expire(key, val);
+ }
+ });
+ }
+
+ public static interface Callback<K, V> {
+ public void expire(K key, V val);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/resources/geardefault.conf b/experiments/storm/src/main/resources/geardefault.conf
index 54c478e..38ac9d3 100644
--- a/experiments/storm/src/main/resources/geardefault.conf
+++ b/experiments/storm/src/main/resources/geardefault.conf
@@ -1,5 +1,5 @@
gearpump {
storm {
- serialization-framework = "io.gearpump.experiments.storm.util.StormSerializationFramework"
+ serialization-framework = "org.apache.gearpump.experiments.storm.util.StormSerializationFramework"
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
deleted file mode 100644
index 19814e9..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import org.slf4j.Logger
-
-import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
-import io.gearpump.util.LogUtil
-
-object StormRunner {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient)
-
- private def usage(): Unit = {
- val keys = commands.keys.toList.sorted
- // scalastyle:off println
- Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
- // scalastyle:on println
- }
-
- private def executeCommand(command: String, commandArgs: Array[String]): Unit = {
- if (!commands.contains(command)) {
- usage()
- } else {
- commands(command).main(commandArgs)
- }
- }
-
- def main(args: Array[String]): Unit = {
- if (args.length == 0) {
- usage()
- } else {
- val command = args(0)
- val commandArgs = args.drop(1)
- executeCommand(command, commandArgs)
- }
- }
-}