You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:37 UTC

[27/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
deleted file mode 100644
index 69ec1ed..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/Hierarchy.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-import java.util.Set;
-
-public class Hierarchy {
-
-  private final String name;
-
-  private final Set<ResourceType> resourceTypes;
-
-  private final String type;
-
-  private final String dir;
-
-  private final CgroupCommon rootCgroups;
-
-  public Hierarchy(String name, Set<ResourceType> resourceTypes, String dir) {
-    this.name = name;
-    this.resourceTypes = resourceTypes;
-    this.dir = dir;
-    this.rootCgroups = new CgroupCommon(this, dir);
-    this.type = CgroupUtils.reAnalyse(resourceTypes);
-  }
-
-  public Set<ResourceType> getResourceTypes() {
-    return resourceTypes;
-  }
-
-  public String getType() {
-    return type;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((dir == null) ? 0 : dir.hashCode());
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    result = prime * result + ((type == null) ? 0 : type.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Hierarchy other = (Hierarchy) obj;
-    if (dir == null) {
-      if (other.dir != null)
-        return false;
-    } else if (!dir.equals(other.dir))
-      return false;
-    if (name == null) {
-      if (other.name != null)
-        return false;
-    } else if (!name.equals(other.name))
-      return false;
-    if (type == null) {
-      if (other.type != null)
-        return false;
-    } else if (!type.equals(other.type))
-      return false;
-    return true;
-  }
-
-  public String getDir() {
-    return dir;
-  }
-
-  public CgroupCommon getRootCgroups() {
-    return rootCgroups;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public boolean subSystemMounted(ResourceType subsystem) {
-    for (ResourceType type : this.resourceTypes) {
-      if (type == subsystem)
-        return true;
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
deleted file mode 100644
index c2a1d42..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/ResourceType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup;
-
-public enum ResourceType {
-
-  // net_cls,ns is not supposted in ubuntu
-  blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio;
-
-  public static ResourceType getResourceType(String str) {
-    if (str.equals("cpu"))
-      return cpu;
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
deleted file mode 100644
index 23e630c..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CgroupCore.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup.core;
-
-import io.gearpump.cluster.cgroup.ResourceType;
-
-public interface CgroupCore {
-
-  public ResourceType getType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
deleted file mode 100644
index 3402d5a..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/cgroup/core/CpuCore.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.cgroup.core;
-
-import io.gearpump.cluster.cgroup.CgroupUtils;
-import io.gearpump.cluster.cgroup.Constants;
-import io.gearpump.cluster.cgroup.ResourceType;
-
-import java.io.IOException;
-
-public class CpuCore implements CgroupCore {
-
-  public static final String CPU_SHARES = "/cpu.shares";
-  public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
-  public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
-
-  private final String dir;
-
-  public CpuCore(String dir) {
-    this.dir = dir;
-  }
-
-  @Override
-  public ResourceType getType() {
-    // TODO Auto-generated method stub
-    return ResourceType.cpu;
-  }
-
-  public void setCpuShares(int weight) throws IOException {
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
-  }
-
-  public int getCpuShares() throws IOException {
-    return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0));
-  }
-
-  public void setCpuCfsPeriodUs(long us) throws IOException {
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
-  }
-
-  public void setCpuCfsQuotaUs(long us) throws IOException {
-    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
deleted file mode 100644
index 0772133..0000000
--- a/experiments/cgroup/src/main/java/io/gearpump/cluster/utils/SystemOperation.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.utils;
-
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class SystemOperation {
-
-  public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
-
-  public static void mount(String name, String target, String type, String data) throws IOException {
-    StringBuilder sb = new StringBuilder();
-    sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target);
-    SystemOperation.exec(sb.toString());
-  }
-
-  public static void umount(String name) throws IOException {
-    StringBuilder sb = new StringBuilder();
-    sb.append("umount ").append(name);
-    SystemOperation.exec(sb.toString());
-  }
-
-  public static String exec(String cmd) throws IOException {
-    LOG.debug("Shell cmd: " + cmd);
-    Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start();
-    try {
-      process.waitFor();
-      String output = IOUtils.toString(process.getInputStream());
-      String errorOutput = IOUtils.toString(process.getErrorStream());
-      LOG.debug("Shell Output: " + output);
-      if (errorOutput.length() != 0) {
-        LOG.error("Shell Error Output: " + errorOutput);
-        throw new IOException(errorOutput);
-      }
-      return output;
-    } catch (InterruptedException ie) {
-      throw new IOException(ie.toString());
-    }
-  }
-
-  public static void main(String[] args) throws IOException {
-    SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java
new file mode 100644
index 0000000..a616593
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CGroupResource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+public class CGroupResource {
+
+  private ResourceType type;
+
+  private int hierarchyID;
+
+  private int cgroupsNum;
+
+  private boolean enable;
+
+  public CGroupResource(ResourceType type, int hierarchyID, int cgroupNum, boolean enable) {
+    this.type = type;
+    this.hierarchyID = hierarchyID;
+    this.cgroupsNum = cgroupNum;
+    this.enable = enable;
+  }
+
+  public ResourceType getType() {
+    return type;
+  }
+
+  public void setType(ResourceType type) {
+    this.type = type;
+  }
+
+  public int getHierarchyID() {
+    return hierarchyID;
+  }
+
+  public void setHierarchyID(int hierarchyID) {
+    this.hierarchyID = hierarchyID;
+  }
+
+  public int getCgroupsNum() {
+    return cgroupsNum;
+  }
+
+  public void setCgroupsNum(int cgroupsNum) {
+    this.cgroupsNum = cgroupsNum;
+  }
+
+  public boolean isEnable() {
+    return enable;
+  }
+
+  public void setEnable(boolean enable) {
+    this.enable = enable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java
new file mode 100644
index 0000000..fb2ba65
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCenter.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.utils.SystemOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class CgroupCenter implements CgroupOperation {
+
+  public static Logger LOG = LoggerFactory.getLogger(CgroupCenter.class);
+
+  private static CgroupCenter instance;
+
+  private CgroupCenter() {
+
+  }
+
+  /**
+   * Thread unsafe
+   *
+   * @return
+   */
+  public synchronized static CgroupCenter getInstance() {
+    if (instance == null)
+      instance = new CgroupCenter();
+    return CgroupUtils.enabled() ? instance : null;
+  }
+
+  @Override
+  public List<Hierarchy> getHierarchies() {
+    // TODO Auto-generated method stub
+    Map<String, Hierarchy> hierarchies = new HashMap<String, Hierarchy>();
+    FileReader reader = null;
+    BufferedReader br = null;
+    try {
+      reader = new FileReader(Constants.MOUNT_STATUS_FILE);
+      br = new BufferedReader(reader);
+      String str = null;
+      while ((str = br.readLine()) != null) {
+        String[] strSplit = str.split(" ");
+        if (!strSplit[2].equals("cgroup"))
+          continue;
+        String name = strSplit[0];
+        String type = strSplit[3];
+        String dir = strSplit[1];
+        Hierarchy h = hierarchies.get(type);
+        h = new Hierarchy(name, CgroupUtils.analyse(type), dir);
+        hierarchies.put(type, h);
+      }
+      return new ArrayList<Hierarchy>(hierarchies.values());
+    } catch (Exception e) {
+      LOG.error("Get hierarchies error", e);
+    } finally {
+      CgroupUtils.close(reader, br);
+    }
+    return null;
+  }
+
+  @Override
+  public Set<CGroupResource> getCGroupResources() {
+    // TODO Auto-generated method stub
+    Set<CGroupResource> resources = new HashSet<CGroupResource>();
+    FileReader reader = null;
+    BufferedReader br = null;
+    try {
+      reader = new FileReader(Constants.CGROUP_STATUS_FILE);
+      br = new BufferedReader(reader);
+      String str = null;
+      while ((str = br.readLine()) != null) {
+        String[] split = str.split("\t");
+        ResourceType type = ResourceType.getResourceType(split[0]);
+        if (type == null)
+          continue;
+        resources.add(new CGroupResource(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true
+          : false));
+      }
+      return resources;
+    } catch (Exception e) {
+      LOG.error("Get subSystems error ", e);
+    } finally {
+      CgroupUtils.close(reader, br);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean enabled(ResourceType resourceType) {
+    // TODO Auto-generated method stub
+    Set<CGroupResource> resources = this.getCGroupResources();
+    for (CGroupResource resource : resources) {
+      if (resource.getType() == resourceType)
+        return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Hierarchy busy(ResourceType resourceType) {
+    List<Hierarchy> hierarchies = this.getHierarchies();
+    for (Hierarchy hierarchy : hierarchies) {
+      for (ResourceType type : hierarchy.getResourceTypes()) {
+        if (type == resourceType)
+          return hierarchy;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Hierarchy mounted(Hierarchy hierarchy) {
+    // TODO Auto-generated method stub
+    List<Hierarchy> hierarchies = this.getHierarchies();
+    if (CgroupUtils.dirExists(hierarchy.getDir())) {
+      for (Hierarchy h : hierarchies) {
+        if (h.equals(hierarchy))
+          return h;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void mount(Hierarchy hierarchy) throws IOException {
+    // TODO Auto-generated method stub
+    if (this.mounted(hierarchy) != null) {
+      LOG.error(hierarchy.getDir() + " is mounted");
+      return;
+    }
+    Set<ResourceType> resourceTypes = hierarchy.getResourceTypes();
+    for (ResourceType type : resourceTypes) {
+      if (this.busy(type) != null) {
+        LOG.error("subsystem: " + type.name() + " is busy");
+        resourceTypes.remove(type);
+      }
+    }
+    if (resourceTypes.size() == 0)
+      return;
+    if (!CgroupUtils.dirExists(hierarchy.getDir()))
+      new File(hierarchy.getDir()).mkdirs();
+    String subSystems = CgroupUtils.reAnalyse(resourceTypes);
+    SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems);
+  }
+
+  @Override
+  public void umount(Hierarchy hierarchy) throws IOException {
+    // TODO Auto-generated method stub
+    if (this.mounted(hierarchy) != null) {
+      hierarchy.getRootCgroups().delete();
+      SystemOperation.umount(hierarchy.getDir());
+      CgroupUtils.deleteDir(hierarchy.getDir());
+    }
+  }
+
+  @Override
+  public void create(CgroupCommon cgroup) throws SecurityException {
+    // TODO Auto-generated method stub
+    if (cgroup.isRoot()) {
+      LOG.error("You can't create rootCgroup in this function");
+      return;
+    }
+    CgroupCommon parent = cgroup.getParent();
+    while (parent != null) {
+      if (!CgroupUtils.dirExists(parent.getDir())) {
+        LOG.error(parent.getDir() + "is not existed");
+        return;
+      }
+      parent = parent.getParent();
+    }
+    Hierarchy h = cgroup.getHierarchy();
+    if (mounted(h) == null) {
+      LOG.error(h.getDir() + " is not mounted");
+      return;
+    }
+    if (CgroupUtils.dirExists(cgroup.getDir())) {
+      LOG.error(cgroup.getDir() + " is existed");
+      return;
+    }
+    (new File(cgroup.getDir())).mkdir();
+  }
+
+  @Override
+  public void delete(CgroupCommon cgroup) throws IOException {
+    // TODO Auto-generated method stub
+    cgroup.delete();
+  }
+
+  public static void main(String args[]) {
+    System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java
new file mode 100644
index 0000000..2bccfec
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommon.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.cgroup.core.CgroupCore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCommon implements CgroupCommonOperation {
+
+  public static final String TASKS = "/tasks";
+  public static final String NOTIFY_ON_RELEASE = "/notify_on_release";
+  public static final String RELEASE_AGENT = "/release_agent";
+  public static final String CGROUP_CLONE_CHILDREN = "/cgroup.clone_children";
+  public static final String CGROUP_EVENT_CONTROL = "/cgroup.event_control";
+  public static final String CGROUP_PROCS = "/cgroup.procs";
+
+  private final Hierarchy hierarchy;
+
+  private final String name;
+
+  private final String dir;
+
+  private final CgroupCommon parent;
+
+  private final Map<ResourceType, CgroupCore> cores;
+
+  private final boolean isRoot;
+
+  private final Set<CgroupCommon> children = new HashSet<CgroupCommon>();
+
+  public CgroupCommon(String name, Hierarchy hierarchy, CgroupCommon parent) {
+    this.name = parent.getName() + "/" + name;
+    this.hierarchy = hierarchy;
+    this.parent = parent;
+    this.dir = parent.getDir() + "/" + name;
+    this.init();
+    cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir);
+    this.isRoot = false;
+  }
+
+  /**
+   * rootCgroup
+   */
+  public CgroupCommon(Hierarchy hierarchy, String dir) {
+    this.name = "";
+    this.hierarchy = hierarchy;
+    this.parent = null;
+    this.dir = dir;
+    this.init();
+    cores = CgroupCoreFactory.getInstance(this.hierarchy.getResourceTypes(), this.dir);
+    this.isRoot = true;
+  }
+
+  @Override
+  public void addTask(int taskId) throws IOException {
+    // TODO Auto-generated method stub
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId));
+  }
+
+  @Override
+  public Set<Integer> getTasks() throws IOException {
+    List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
+    Set<Integer> tasks = new HashSet<Integer>();
+    for (String task : stringTasks) {
+      tasks.add(Integer.valueOf(task));
+    }
+    return tasks;
+  }
+
+  @Override
+  public void addProcs(int pid) throws IOException {
+    // TODO Auto-generated method stub
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid));
+  }
+
+  @Override
+  public Set<Integer> getPids() throws IOException {
+    // TODO Auto-generated method stub
+    List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
+    Set<Integer> pids = new HashSet<Integer>();
+    for (String task : stringPids) {
+      pids.add(Integer.valueOf(task));
+    }
+    return pids;
+  }
+
+  @Override
+  public void setNotifyOnRelease(boolean flag) throws IOException {
+    // TODO Auto-generated method stub
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0");
+  }
+
+  @Override
+  public boolean getNotifyOnRelease() throws IOException {
+    return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
+  }
+
+  @Override
+  public void setReleaseAgent(String command) throws IOException {
+    // TODO Auto-generated method stub
+    if (!this.isRoot)
+      return;
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command);
+  }
+
+  @Override
+  public String getReleaseAgent() throws IOException {
+    if (!this.isRoot)
+      return null;
+    return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0);
+  }
+
+  @Override
+  public void setCgroupCloneChildren(boolean flag) throws IOException {
+    // TODO Auto-generated method stub
+    if (!this.cores.keySet().contains(ResourceType.cpuset))
+      return;
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+  }
+
+  @Override
+  public boolean getCgroupCloneChildren() throws IOException {
+    return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
+  }
+
+  @Override
+  public void setEventControl(String eventFd, String controlFd, String... args) throws IOException {
+    // TODO Auto-generated method stub
+    StringBuilder sb = new StringBuilder();
+    sb.append(eventFd);
+    sb.append(' ');
+    sb.append(controlFd);
+    for (String arg : args) {
+      sb.append(' ');
+      sb.append(arg);
+    }
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString());
+  }
+
+  public Hierarchy getHierarchy() {
+    return hierarchy;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDir() {
+    return dir;
+  }
+
+  public CgroupCommon getParent() {
+    return parent;
+  }
+
+  public Set<CgroupCommon> getChildren() {
+    return children;
+  }
+
+  public boolean isRoot() {
+    return isRoot;
+  }
+
+  public Map<ResourceType, CgroupCore> getCores() {
+    return cores;
+  }
+
+  public void delete() throws IOException {
+    this.free();
+    if (!this.isRoot)
+      this.parent.getChildren().remove(this);
+  }
+
+  private void free() throws IOException {
+    for (CgroupCommon child : this.children)
+      child.free();
+    if (this.isRoot)
+      return;
+    Set<Integer> tasks = this.getTasks();
+    if (tasks != null) {
+      for (Integer task : tasks) {
+        this.parent.addTask(task);
+      }
+    }
+    CgroupUtils.deleteDir(this.dir);
+  }
+
+  private void init() {
+    File file = new File(this.dir);
+    File[] files = file.listFiles();
+    if (files == null)
+      return;
+    for (File child : files) {
+      if (child.isDirectory()) {
+        this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java
new file mode 100644
index 0000000..9ae923b
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCommonOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface CgroupCommonOperation {
+
+  public void addTask(int taskid) throws IOException;
+
+  public Set<Integer> getTasks() throws IOException;
+
+  public void addProcs(int pid) throws IOException;
+
+  public Set<Integer> getPids() throws IOException;
+
+  public void setNotifyOnRelease(boolean flag) throws IOException;
+
+  public boolean getNotifyOnRelease() throws IOException;
+
+  public void setReleaseAgent(String command) throws IOException;
+
+  public String getReleaseAgent() throws IOException;
+
+  public void setCgroupCloneChildren(boolean flag) throws IOException;
+
+  public boolean getCgroupCloneChildren() throws IOException;
+
+  public void setEventControl(String eventFd, String controlFd, String... args) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java
new file mode 100644
index 0000000..15ebcbc
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupCoreFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.cgroup.core.CgroupCore;
+import org.apache.gearpump.cluster.cgroup.core.CpuCore;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CgroupCoreFactory {
+
+  public static Map<ResourceType, CgroupCore> getInstance(Set<ResourceType> types, String dir) {
+    Map<ResourceType, CgroupCore> result = new HashMap<ResourceType, CgroupCore>();
+    for (ResourceType type : types) {
+      switch (type) {
+        case cpu:
+          result.put(ResourceType.cpu, new CpuCore(dir));
+          break;
+        default:
+          break;
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java
new file mode 100644
index 0000000..399f3bd
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupOperation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public interface CgroupOperation {
+
+  public List<Hierarchy> getHierarchies();
+
+  public Set<CGroupResource> getCGroupResources();
+
+  public boolean enabled(ResourceType subsystem);
+
+  public Hierarchy busy(ResourceType subsystem);
+
+  public Hierarchy mounted(Hierarchy hierarchy);
+
+  public void mount(Hierarchy hierarchy) throws IOException;
+
+  public void umount(Hierarchy hierarchy) throws IOException;
+
+  public void create(CgroupCommon cgroup) throws SecurityException;
+
+  public void delete(CgroupCommon cgroup) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java
new file mode 100644
index 0000000..8f199d9
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/CgroupUtils.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import org.apache.gearpump.cluster.utils.SystemOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class CgroupUtils {
+
+  public static final Logger LOG = LoggerFactory.getLogger(CgroupUtils.class);
+
+  public static void deleteDir(String dir) {
+    try {
+      String cmd = "rmdir " + dir;
+      SystemOperation.exec(cmd);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      LOG.error("rm " + dir + " fail!", e);
+    }
+  }
+
+  public static boolean fileExists(String dir) {
+    File file = new File(dir);
+    return file.exists();
+  }
+
+  public static boolean dirExists(String dir) {
+    File file = new File(dir);
+    return file.isDirectory();
+  }
+
+  public static Set<ResourceType> analyse(String str) {
+    Set<ResourceType> result = new HashSet<ResourceType>();
+    String[] subSystems = str.split(",");
+    for (String subSystem : subSystems) {
+      ResourceType type = ResourceType.getResourceType(subSystem);
+      if (type != null)
+        result.add(type);
+    }
+    return result;
+  }
+
+  public static String reAnalyse(Set<ResourceType> subSystems) {
+    StringBuilder sb = new StringBuilder();
+    if (subSystems.size() == 0)
+      return sb.toString();
+    for (ResourceType type : subSystems) {
+      sb.append(type.name()).append(",");
+    }
+    return sb.toString().substring(0, sb.length() - 1);
+  }
+
+  public static boolean enabled() {
+    return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
+  }
+
+  public static List<String> readFileByLine(String fileDir) throws IOException {
+    List<String> result = new ArrayList<String>();
+    FileReader fileReader = null;
+    BufferedReader reader = null;
+    try {
+      File file = new File(fileDir);
+      fileReader = new FileReader(file);
+      reader = new BufferedReader(fileReader);
+      String tempString = null;
+      while ((tempString = reader.readLine()) != null) {
+        result.add(tempString);
+      }
+    } finally {
+      CgroupUtils.close(fileReader, reader);
+    }
+    return result;
+  }
+
+  public static void writeFileByLine(String fileDir, List<String> strings) throws IOException {
+    FileWriter writer = null;
+    BufferedWriter bw = null;
+    try {
+      File file = new File(fileDir);
+      if (!file.exists()) {
+        LOG.error(fileDir + " is no existed");
+        return;
+      }
+      writer = new FileWriter(file, true);
+      bw = new BufferedWriter(writer);
+      for (String string : strings) {
+        bw.write(string);
+        bw.newLine();
+        bw.flush();
+      }
+    } finally {
+      CgroupUtils.close(writer, bw);
+    }
+  }
+
+  public static void writeFileByLine(String fileDir, String string) throws IOException {
+    FileWriter writer = null;
+    BufferedWriter bw = null;
+    try {
+      File file = new File(fileDir);
+      if (!file.exists()) {
+        LOG.error(fileDir + " is no existed");
+        return;
+      }
+      writer = new FileWriter(file, true);
+      bw = new BufferedWriter(writer);
+      bw.write(string);
+      bw.newLine();
+      bw.flush();
+    } finally {
+      CgroupUtils.close(writer, bw);
+    }
+  }
+
+  public static void close(FileReader reader, BufferedReader br) {
+    try {
+      if (reader != null)
+        reader.close();
+      if (br != null)
+        br.close();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+
+    }
+  }
+
+  public static void close(FileWriter writer, BufferedWriter bw) {
+    try {
+      if (writer != null)
+        writer.close();
+      if (bw != null)
+        bw.close();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java
new file mode 100644
index 0000000..fb905c7
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Constants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+public class Constants {
+
+  public static final String CGROUP_STATUS_FILE = "/proc/cgroups";
+
+  public static final String MOUNT_STATUS_FILE = "/proc/mounts";
+
+  public static String getDir(String dir, String constant) {
+    return dir + constant;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java
new file mode 100644
index 0000000..446802f
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/Hierarchy.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+import java.util.Set;
+
+public class Hierarchy {
+
+  private final String name;
+
+  private final Set<ResourceType> resourceTypes;
+
+  private final String type;
+
+  private final String dir;
+
+  private final CgroupCommon rootCgroups;
+
+  public Hierarchy(String name, Set<ResourceType> resourceTypes, String dir) {
+    this.name = name;
+    this.resourceTypes = resourceTypes;
+    this.dir = dir;
+    this.rootCgroups = new CgroupCommon(this, dir);
+    this.type = CgroupUtils.reAnalyse(resourceTypes);
+  }
+
+  public Set<ResourceType> getResourceTypes() {
+    return resourceTypes;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((dir == null) ? 0 : dir.hashCode());
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Hierarchy other = (Hierarchy) obj;
+    if (dir == null) {
+      if (other.dir != null)
+        return false;
+    } else if (!dir.equals(other.dir))
+      return false;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    if (type == null) {
+      if (other.type != null)
+        return false;
+    } else if (!type.equals(other.type))
+      return false;
+    return true;
+  }
+
+  public String getDir() {
+    return dir;
+  }
+
+  public CgroupCommon getRootCgroups() {
+    return rootCgroups;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public boolean subSystemMounted(ResourceType subsystem) {
+    for (ResourceType type : this.resourceTypes) {
+      if (type == subsystem)
+        return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java
new file mode 100644
index 0000000..c3360e6
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/ResourceType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup;
+
+public enum ResourceType {
+
+  // net_cls,ns is not supposted in ubuntu
+  blkio, cpu, cpuacct, cpuset, devices, freezer, memory, perf_event, net_cls, net_prio;
+
+  public static ResourceType getResourceType(String str) {
+    if (str.equals("cpu"))
+      return cpu;
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java
new file mode 100644
index 0000000..39c0999
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CgroupCore.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup.core;
+
+import org.apache.gearpump.cluster.cgroup.ResourceType;
+
+public interface CgroupCore {
+
+  public ResourceType getType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java
new file mode 100644
index 0000000..4b35f7f
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/cgroup/core/CpuCore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.cgroup.core;
+
+import org.apache.gearpump.cluster.cgroup.CgroupUtils;
+import org.apache.gearpump.cluster.cgroup.Constants;
+import org.apache.gearpump.cluster.cgroup.ResourceType;
+
+import java.io.IOException;
+
+public class CpuCore implements CgroupCore {
+
+  public static final String CPU_SHARES = "/cpu.shares";
+  public static final String CPU_CFS_PERIOD_US = "/cpu.cfs_period_us";
+  public static final String CPU_CFS_QUOTA_US = "/cpu.cfs_quota_us";
+
+  private final String dir;
+
+  public CpuCore(String dir) {
+    this.dir = dir;
+  }
+
+  @Override
+  public ResourceType getType() {
+    // TODO Auto-generated method stub
+    return ResourceType.cpu;
+  }
+
+  public void setCpuShares(int weight) throws IOException {
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
+  }
+
+  public int getCpuShares() throws IOException {
+    return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0));
+  }
+
+  public void setCpuCfsPeriodUs(long us) throws IOException {
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
+  }
+
+  public void setCpuCfsQuotaUs(long us) throws IOException {
+    CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
new file mode 100644
index 0000000..5b2a890
--- /dev/null
+++ b/experiments/cgroup/src/main/java/org/apache/gearpump/cluster/utils/SystemOperation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SystemOperation {
+
+  public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
+
+  public static void mount(String name, String target, String type, String data) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target);
+    SystemOperation.exec(sb.toString());
+  }
+
+  public static void umount(String name) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    sb.append("umount ").append(name);
+    SystemOperation.exec(sb.toString());
+  }
+
+  public static String exec(String cmd) throws IOException {
+    LOG.debug("Shell cmd: " + cmd);
+    Process process = new ProcessBuilder(new String[]{"/bin/bash", "-c", cmd}).start();
+    try {
+      process.waitFor();
+      String output = IOUtils.toString(process.getInputStream());
+      String errorOutput = IOUtils.toString(process.getErrorStream());
+      LOG.debug("Shell Output: " + output);
+      if (errorOutput.length() != 0) {
+        LOG.error("Shell Error Output: " + errorOutput);
+        throw new IOException(errorOutput);
+      }
+      return output;
+    } catch (InterruptedException ie) {
+      throw new IOException(ie.toString());
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    SystemOperation.mount("test", "/cgroup/cpu", "cgroup", "cpu");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
deleted file mode 100644
index ae7fb42..0000000
--- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupManager.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.worker
-
-import com.typesafe.config.Config
-import org.apache.commons.lang.SystemUtils
-import org.slf4j.{Logger, LoggerFactory}
-
-import io.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore}
-import io.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType}
-import io.gearpump.cluster.worker.CGroupManager._
-
-class CGroupManager(config: Config) {
-  private val center = CgroupCenter.getInstance()
-  private val rootDir = CGroupManager.getCgroupRootDir(config)
-  private var hierarchy: Hierarchy = null
-  private var rootCgroup: CgroupCommon = null
-
-  prepareSubSystem()
-
-  private def prepareSubSystem(): Unit = {
-    if (rootDir == null) {
-      throw new RuntimeException(s"Check configuration file. The $CGROUP_ROOT is missing.")
-    }
-    if (center == null) {
-      throw new RuntimeException("Cgroup error, please check /proc/cgroups")
-    }
-    hierarchy = center.busy(ResourceType.cpu)
-    if (hierarchy == null) {
-      val types = new java.util.HashSet[ResourceType]
-      types.add(ResourceType.cpu)
-      hierarchy = new Hierarchy(GEARPUMP_HIERARCHY_NAME, types, GEARPUMP_CPU_HIERARCHY_DIR)
-    }
-    rootCgroup = new CgroupCommon(rootDir, hierarchy, hierarchy.getRootCgroups)
-  }
-
-  private def validateCpuUpperLimitValue(value: Int): Int = {
-    if (value > 10) {
-      10
-    } else if (value < 1 && value != -1) {
-      1
-    } else {
-      value
-    }
-  }
-
-  private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = {
-    val _cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit)
-    if (_cpuCoreUpperLimit == -1) {
-      cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit)
-    }
-    else {
-      cpuCore.setCpuCfsPeriodUs(100000)
-      cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit * 100000)
-    }
-  }
-
-  def startNewExecutor(config: Config, cpuNum: Int, appId: Int, executorId: Int): List[String] = {
-    val groupName = getGroupName(appId, executorId)
-    val workerGroup: CgroupCommon = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
-    this.center.create(workerGroup)
-    val cpu: CgroupCore = workerGroup.getCores.get(ResourceType.cpu)
-    val cpuCore: CpuCore = cpu.asInstanceOf[CpuCore]
-    cpuCore.setCpuShares(cpuNum * CGroupManager.ONE_CPU_SLOT)
-    setCpuUsageUpperLimit(cpuCore, CGroupManager.getWorkerCpuCoreUpperLimit(config))
-
-    val sb: StringBuilder = new StringBuilder
-    sb.append("cgexec -g cpu:").append(workerGroup.getName).toString().split(" ").toList
-  }
-
-  def shutDownExecutor(appId: Int, executorId: Int): Unit = {
-    val groupName = getGroupName(appId, executorId)
-    val workerGroup = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
-    center.delete(workerGroup)
-  }
-
-  def close(): Unit = {
-    center.delete(rootCgroup)
-  }
-
-  private def getGroupName(appId: Int, executorId: Int): String = {
-    "app" + appId + "executor" + executorId
-  }
-}
-
-object CGroupManager {
-  private val LOG: Logger = LoggerFactory.getLogger(getClass)
-  private val CGROUP_ROOT = "gearpump.cgroup.root"
-  private val Executor_CPU_CORE_UPPER_LIMIT = "gearpump.cgroup.cpu-core-limit-per-executor"
-  private val GEARPUMP_HIERARCHY_NAME = "gearpump_cpu"
-  private val GEARPUMP_CPU_HIERARCHY_DIR = "/cgroup/cpu"
-  private val ONE_CPU_SLOT = 1024
-
-  def getCgroupRootDir(config: Config): String = {
-    config.getString(CGROUP_ROOT)
-  }
-
-  def getWorkerCpuCoreUpperLimit(config: Config): Int = {
-    config.getInt(Executor_CPU_CORE_UPPER_LIMIT)
-  }
-
-  def getInstance(config: Config): Option[CGroupManager] = {
-    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC_OSX) {
-      LOG.error(s"CGroup is not supported on Windows OS, Mac OS X")
-      None
-    } else {
-      Some(new CGroupManager(config))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
deleted file mode 100644
index eb57a18..0000000
--- a/experiments/cgroup/src/main/scala/io/gearpump/cluster/worker/CGroupProcessLauncher.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster.worker
-
-import java.io.File
-import scala.sys.process.Process
-
-import com.typesafe.config.Config
-import org.slf4j.{Logger, LoggerFactory}
-
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.util.{ProcessLogRedirector, RichProcess}
-
-/**
- * CGroupProcessLauncher is used to launch a process for Executor with CGroup.
- * For more details, please refer http://gearpump.io
- */
-class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
-  private val APP_MASTER = -1
-  private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config)
-  private val LOG: Logger = LoggerFactory.getLogger(getClass)
-
-  override def cleanProcess(appId: Int, executorId: Int): Unit = {
-    if (executorId != APP_MASTER) {
-      cgroupManager.foreach(_.shutDownExecutor(appId, executorId))
-    }
-  }
-
-  override def createProcess(
-    appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String],
-    classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
-    val cgroupCommand = if (executorId != APP_MASTER) {
-      cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,
-        executorId)).getOrElse(List.empty)
-    } else List.empty
-    LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " +
-      s"classpath: ${classPath.mkString(File.pathSeparator)}")
-
-    val java = System.getProperty("java.home") + "/bin/java"
-    val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath
-      .mkString(File.pathSeparator), mainClass) ++ arguments
-    LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " +
-      s"options: ${options.mkString(" ")}")
-    val logger = new ProcessLogRedirector()
-    val process = Process(command).run(logger)
-    new RichProcess(process, logger)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala
new file mode 100644
index 0000000..24c169c
--- /dev/null
+++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupManager.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import com.typesafe.config.Config
+import org.apache.commons.lang.SystemUtils
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.gearpump.cluster.cgroup.core.{CgroupCore, CpuCore}
+import org.apache.gearpump.cluster.cgroup.{CgroupCenter, CgroupCommon, Hierarchy, ResourceType}
+import org.apache.gearpump.cluster.worker.CGroupManager._
+
+class CGroupManager(config: Config) {
+  private val center = CgroupCenter.getInstance()
+  private val rootDir = CGroupManager.getCgroupRootDir(config)
+  private var hierarchy: Hierarchy = null
+  private var rootCgroup: CgroupCommon = null
+
+  prepareSubSystem()
+
+  private def prepareSubSystem(): Unit = {
+    if (rootDir == null) {
+      throw new RuntimeException(s"Check configuration file. The $CGROUP_ROOT is missing.")
+    }
+    if (center == null) {
+      throw new RuntimeException("Cgroup error, please check /proc/cgroups")
+    }
+    hierarchy = center.busy(ResourceType.cpu)
+    if (hierarchy == null) {
+      val types = new java.util.HashSet[ResourceType]
+      types.add(ResourceType.cpu)
+      hierarchy = new Hierarchy(GEARPUMP_HIERARCHY_NAME, types, GEARPUMP_CPU_HIERARCHY_DIR)
+    }
+    rootCgroup = new CgroupCommon(rootDir, hierarchy, hierarchy.getRootCgroups)
+  }
+
+  private def validateCpuUpperLimitValue(value: Int): Int = {
+    if (value > 10) {
+      10
+    } else if (value < 1 && value != -1) {
+      1
+    } else {
+      value
+    }
+  }
+
+  private def setCpuUsageUpperLimit(cpuCore: CpuCore, cpuCoreUpperLimit: Int): Unit = {
+    val _cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit)
+    if (_cpuCoreUpperLimit == -1) {
+      cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit)
+    }
+    else {
+      cpuCore.setCpuCfsPeriodUs(100000)
+      cpuCore.setCpuCfsQuotaUs(_cpuCoreUpperLimit * 100000)
+    }
+  }
+
+  def startNewExecutor(config: Config, cpuNum: Int, appId: Int, executorId: Int): List[String] = {
+    val groupName = getGroupName(appId, executorId)
+    val workerGroup: CgroupCommon = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
+    this.center.create(workerGroup)
+    val cpu: CgroupCore = workerGroup.getCores.get(ResourceType.cpu)
+    val cpuCore: CpuCore = cpu.asInstanceOf[CpuCore]
+    cpuCore.setCpuShares(cpuNum * CGroupManager.ONE_CPU_SLOT)
+    setCpuUsageUpperLimit(cpuCore, CGroupManager.getWorkerCpuCoreUpperLimit(config))
+
+    val sb: StringBuilder = new StringBuilder
+    sb.append("cgexec -g cpu:").append(workerGroup.getName).toString().split(" ").toList
+  }
+
+  def shutDownExecutor(appId: Int, executorId: Int): Unit = {
+    val groupName = getGroupName(appId, executorId)
+    val workerGroup = new CgroupCommon(groupName, hierarchy, this.rootCgroup)
+    center.delete(workerGroup)
+  }
+
+  def close(): Unit = {
+    center.delete(rootCgroup)
+  }
+
+  private def getGroupName(appId: Int, executorId: Int): String = {
+    "app" + appId + "executor" + executorId
+  }
+}
+
+object CGroupManager {
+  private val LOG: Logger = LoggerFactory.getLogger(getClass)
+  private val CGROUP_ROOT = "gearpump.cgroup.root"
+  private val Executor_CPU_CORE_UPPER_LIMIT = "gearpump.cgroup.cpu-core-limit-per-executor"
+  private val GEARPUMP_HIERARCHY_NAME = "gearpump_cpu"
+  private val GEARPUMP_CPU_HIERARCHY_DIR = "/cgroup/cpu"
+  private val ONE_CPU_SLOT = 1024
+
+  def getCgroupRootDir(config: Config): String = {
+    config.getString(CGROUP_ROOT)
+  }
+
+  def getWorkerCpuCoreUpperLimit(config: Config): Int = {
+    config.getInt(Executor_CPU_CORE_UPPER_LIMIT)
+  }
+
+  def getInstance(config: Config): Option[CGroupManager] = {
+    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC_OSX) {
+      LOG.error(s"CGroup is not supported on Windows OS, Mac OS X")
+      None
+    } else {
+      Some(new CGroupManager(config))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
new file mode 100644
index 0000000..dc2eabd
--- /dev/null
+++ b/experiments/cgroup/src/main/scala/org/apache/gearpump/cluster/worker/CGroupProcessLauncher.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import scala.sys.process.Process
+
+import com.typesafe.config.Config
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{ProcessLogRedirector, RichProcess}
+
+/**
+ * CGroupProcessLauncher is used to launch a process for Executor with CGroup.
+ * For more details, please refer http://gearpump.io
+ */
+class CGroupProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
+  private val APP_MASTER = -1
+  private val cgroupManager: Option[CGroupManager] = CGroupManager.getInstance(config)
+  private val LOG: Logger = LoggerFactory.getLogger(getClass)
+
+  override def cleanProcess(appId: Int, executorId: Int): Unit = {
+    if (executorId != APP_MASTER) {
+      cgroupManager.foreach(_.shutDownExecutor(appId, executorId))
+    }
+  }
+
+  override def createProcess(
+    appId: Int, executorId: Int, resource: Resource, appConfig: Config, options: Array[String],
+    classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+    val cgroupCommand = if (executorId != APP_MASTER) {
+      cgroupManager.map(_.startNewExecutor(appConfig, resource.slots, appId,
+        executorId)).getOrElse(List.empty)
+    } else List.empty
+    LOG.info(s"Launch executor with CGroup ${cgroupCommand.mkString(" ")}, " +
+      s"classpath: ${classPath.mkString(File.pathSeparator)}")
+
+    val java = System.getProperty("java.home") + "/bin/java"
+    val command = cgroupCommand ++ List(java) ++ options ++ List("-cp", classPath
+      .mkString(File.pathSeparator), mainClass) ++ arguments
+    LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; " +
+      s"options: ${options.mkString(" ")}")
+    val logger = new ProcessLogRedirector()
+    val process = Process(command).run(logger)
+    new RichProcess(process, logger)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
deleted file mode 100644
index 510258d..0000000
--- a/experiments/storm/src/main/java/io/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.util;
-
-import backtype.storm.utils.TimeCacheMap;
-
-/**
- * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression.
- */
-@SuppressWarnings("deprecation")
-public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> {
-
-  public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) {
-    super(expirationSecs, new ExpiredCallback<K, V>() {
-
-      @Override
-      public void expire(K key, V val) {
-        callback.expire(key, val);
-      }
-    });
-  }
-
-  public static interface Callback<K, V> {
-    public void expire(K key, V val);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
new file mode 100644
index 0000000..923883c
--- /dev/null
+++ b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.storm.util;
+
+import backtype.storm.utils.TimeCacheMap;
+
+/**
+ * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression.
+ */
+@SuppressWarnings("deprecation")
+public class TimeCacheMapWrapper<K, V> extends TimeCacheMap<K, V> {
+
+  public TimeCacheMapWrapper (int expirationSecs, Callback<K, V> callback) {
+    super(expirationSecs, new ExpiredCallback<K, V>() {
+
+      @Override
+      public void expire(K key, V val) {
+        callback.expire(key, val);
+      }
+    });
+  }
+
+  public static interface Callback<K, V> {
+    public void expire(K key, V val);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/resources/geardefault.conf b/experiments/storm/src/main/resources/geardefault.conf
index 54c478e..38ac9d3 100644
--- a/experiments/storm/src/main/resources/geardefault.conf
+++ b/experiments/storm/src/main/resources/geardefault.conf
@@ -1,5 +1,5 @@
 gearpump {
   storm {
-    serialization-framework = "io.gearpump.experiments.storm.util.StormSerializationFramework"
+    serialization-framework = "org.apache.gearpump.experiments.storm.util.StormSerializationFramework"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
deleted file mode 100644
index 19814e9..0000000
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm
-
-import org.slf4j.Logger
-
-import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
-import io.gearpump.util.LogUtil
-
-object StormRunner {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient)
-
-  private def usage(): Unit = {
-    val keys = commands.keys.toList.sorted
-    // scalastyle:off println
-    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
-    // scalastyle:on println
-  }
-
-  private def executeCommand(command: String, commandArgs: Array[String]): Unit = {
-    if (!commands.contains(command)) {
-      usage()
-    } else {
-      commands(command).main(commandArgs)
-    }
-  }
-
-  def main(args: Array[String]): Unit = {
-    if (args.length == 0) {
-      usage()
-    } else {
-      val command = args(0)
-      val commandArgs = args.drop(1)
-      executeCommand(command, commandArgs)
-    }
-  }
-}