You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/02/18 06:02:56 UTC
[3/9] storm git commit: another round of changes
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
index fdb9996..d089e95 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/CpusetCore.java
@@ -18,7 +18,6 @@
package org.apache.storm.container.cgroup.core;
import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
import org.apache.storm.container.cgroup.SubSystemType;
import java.io.IOException;
@@ -51,118 +50,116 @@ public class CpusetCore implements CgroupCore {
}
public void setCpus(int[] nums) throws IOException {
- StringBuilder sb = new StringBuilder();
- for (int num : nums) {
- sb.append(num);
- sb.append(',');
- }
- sb.deleteCharAt(sb.length() - 1);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), sb.toString());
+ setConfigs(nums, CPUSET_CPUS);
}
public int[] getCpus() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPUS)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPUS)).get(0);
return parseNums(output);
}
public void setMems(int[] nums) throws IOException {
+ setConfigs(nums, CPUSET_MEMS);
+ }
+
+ private void setConfigs(int[] nums, String config) throws IOException {
StringBuilder sb = new StringBuilder();
for (int num : nums) {
sb.append(num);
sb.append(',');
}
sb.deleteCharAt(sb.length() - 1);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), sb.toString());
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, config), sb.toString());
}
public int[] getMems() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMS)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMS)).get(0);
return parseNums(output);
}
public void setMemMigrate(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
}
public boolean isMemMigrate() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0));
return output > 0;
}
public void setCpuExclusive(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
}
public boolean isCpuExclusive() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0));
return output > 0;
}
public void setMemExclusive(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
}
public boolean isMemExclusive() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0));
return output > 0;
}
public void setMemHardwall(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
}
public boolean isMemHardwall() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
return output > 0;
}
public int getMemPressure() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0);
return Integer.parseInt(output);
}
public void setMemPressureEnabled(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
}
public boolean isMemPressureEnabled() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
return output > 0;
}
public void setMemSpreadPage(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
}
public boolean isMemSpreadPage() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0));
return output > 0;
}
public void setMemSpreadSlab(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
}
public boolean isMemSpreadSlab() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0));
return output > 0;
}
public void setSchedLoadBlance(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
}
public boolean isSchedLoadBlance() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0));
return output > 0;
}
public void setSchedRelaxDomainLevel(int value) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
}
public int getSchedRelaxDomainLevel() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
return Integer.parseInt(output);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
index a6896c5..c38f5fe 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/DevicesCore.java
@@ -18,9 +18,10 @@
package org.apache.storm.container.cgroup.core;
import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
import org.apache.storm.container.cgroup.SubSystemType;
import org.apache.storm.container.cgroup.Device;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -29,21 +30,23 @@ public class DevicesCore implements CgroupCore {
private final String dir;
- public static final String DEVICES_ALLOW = "/devices.allow";
- public static final String DEVICES_DENY = "/devices.deny";
- public static final String DEVICES_LIST = "/devices.list";
+ private static final String DEVICES_ALLOW = "/devices.allow";
+ private static final String DEVICES_DENY = "/devices.deny";
+ private static final String DEVICES_LIST = "/devices.list";
- public static final char TYPE_ALL = 'a';
- public static final char TYPE_BLOCK = 'b';
- public static final char TYPE_CHAR = 'c';
+ private static final char TYPE_ALL = 'a';
+ private static final char TYPE_BLOCK = 'b';
+ private static final char TYPE_CHAR = 'c';
- public static final int ACCESS_READ = 1;
- public static final int ACCESS_WRITE = 2;
- public static final int ACCESS_CREATE = 4;
+ private static final int ACCESS_READ = 1;
+ private static final int ACCESS_WRITE = 2;
+ private static final int ACCESS_CREATE = 4;
- public static final char ACCESS_READ_CH = 'r';
- public static final char ACCESS_WRITE_CH = 'w';
- public static final char ACCESS_CREATE_CH = 'm';
+ private static final char ACCESS_READ_CH = 'r';
+ private static final char ACCESS_WRITE_CH = 'w';
+ private static final char ACCESS_CREATE_CH = 'm';
+
+ private static final Logger LOG = LoggerFactory.getLogger(DevicesCore.class);
public DevicesCore(String dir) {
this.dir = dir;
@@ -67,9 +70,9 @@ public class DevicesCore implements CgroupCore {
public Record(String output) {
if (output.contains("*")) {
- System.out.println("Pre:" + output);
+ LOG.debug("Pre: {}", output);
output = output.replaceAll("\\*", "-1");
- System.out.println("After:" + output);
+ LOG.debug("After: {}",output);
}
String[] splits = output.split("[: ]");
type = splits[0].charAt(0);
@@ -168,7 +171,7 @@ public class DevicesCore implements CgroupCore {
private void setPermission(String prop, char type, Device device, int accesses) throws IOException {
Record record = new Record(type, device, accesses);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), record.toString());
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, prop), record.toString());
}
public void setAllow(char type, Device device, int accesses) throws IOException {
@@ -180,7 +183,7 @@ public class DevicesCore implements CgroupCore {
}
public Record[] getList() throws IOException {
- List<String> output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, DEVICES_LIST));
+ List<String> output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, DEVICES_LIST));
return Record.parseRecordList(output);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
index 65b8989..89e13dd 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/FreezerCore.java
@@ -18,7 +18,6 @@
package org.apache.storm.container.cgroup.core;
import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
import org.apache.storm.container.cgroup.SubSystemType;
import java.io.IOException;
@@ -39,11 +38,11 @@ public class FreezerCore implements CgroupCore {
}
public void setState(State state) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase());
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase());
}
public State getState() throws IOException {
- return State.getStateValue(CgroupUtils.readFileByLine(Constants.getDir(this.dir, FREEZER_STATE)).get(0));
+ return State.getStateValue(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, FREEZER_STATE)).get(0));
}
public enum State {
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
index 98be198..9bd6a72 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/MemoryCore.java
@@ -18,7 +18,6 @@
package org.apache.storm.container.cgroup.core;
import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
import org.apache.storm.container.cgroup.SubSystemType;
import java.io.IOException;
@@ -110,78 +109,78 @@ public class MemoryCore implements CgroupCore {
}
public Stat getStat() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_STAT)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_STAT)).get(0);
Stat stat = new Stat(output);
return stat;
}
public long getPhysicalUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
}
public long getWithSwapUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
}
public long getMaxPhysicalUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
}
public long getMaxWithSwapUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
}
public void setPhysicalUsageLimit(long value) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
}
public long getPhysicalUsageLimit() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
}
public void setWithSwapUsageLimit(long value) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
}
public long getWithSwapUsageLimit() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
}
public int getPhysicalFailCount() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_FAILCNT)).get(0));
}
public int getWithSwapFailCount() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
}
public void clearForceEmpty() throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0));
}
public void setSwappiness(int value) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value));
}
public int getSwappiness() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
}
public void setUseHierarchy(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
}
public boolean isUseHierarchy() throws IOException {
- int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0));
return output > 0;
}
public void setOomControl(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
}
public boolean isOomControl() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
output = output.split("\n")[0].split("[\\s]")[1];
int value = Integer.parseInt(output);
return value > 0;
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
index 979eaad..d3dd5a7 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetClsCore.java
@@ -18,7 +18,6 @@
package org.apache.storm.container.cgroup.core;
import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
import org.apache.storm.container.cgroup.SubSystemType;
import org.apache.storm.container.cgroup.Device;
@@ -57,11 +56,11 @@ public class NetClsCore implements CgroupCore {
StringBuilder sb = new StringBuilder("0x");
sb.append(toHex(major));
sb.append(toHex(minor));
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
}
public Device getClassId() throws IOException {
- String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0);
+ String output = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_CLS_CLASSID)).get(0);
output = Integer.toHexString(Integer.parseInt(output));
int major = Integer.parseInt(output.substring(0, output.length() - 4));
int minor = Integer.parseInt(output.substring(output.length() - 4));
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
index 95c1a40..b83b81a 100755
--- a/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
+++ b/storm-core/src/jvm/org/apache/storm/container/cgroup/core/NetPrioCore.java
@@ -18,7 +18,6 @@
package org.apache.storm.container.cgroup.core;
import org.apache.storm.container.cgroup.CgroupUtils;
-import org.apache.storm.container.cgroup.Constants;
import org.apache.storm.container.cgroup.SubSystemType;
import java.io.IOException;
@@ -43,7 +42,7 @@ public class NetPrioCore implements CgroupCore {
}
public int getPrioId() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
}
public void setIfPrioMap(String iface, int priority) throws IOException {
@@ -51,12 +50,12 @@ public class NetPrioCore implements CgroupCore {
sb.append(iface);
sb.append(' ');
sb.append(priority);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
+ CgroupUtils.writeFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
}
public Map<String, Integer> getIfPrioMap() throws IOException {
Map<String, Integer> result = new HashMap<String, Integer>();
- List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP));
+ List<String> strs = CgroupUtils.readFileByLine(CgroupUtils.getDir(this.dir, NET_PRIO_IFPRIOMAP));
for (String str : strs) {
String[] strArgs = str.split(" ");
result.put(strArgs[0], Integer.valueOf(strArgs[1]));
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index adaafb6..f8a863c 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -590,7 +590,12 @@ public class Utils {
}
public static boolean checkFileExists(String dir, String file) {
- return Files.exists(new File(dir, file).toPath());
+ return checkFileExists(dir + "/" + file);
+ }
+
+ public static boolean CheckDirExists(String dir) {
+ File file = new File(dir);
+ return file.isDirectory();
}
public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException {
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 956abe8..a7c6b5a 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -400,7 +400,7 @@
(Matchers/any)
(Matchers/any)
(Matchers/any)))))))
-
+
(testing "testing topology.classpath is added to classpath"
(let [topo-cp (str Utils/FILE_PATH_SEPARATOR "any" Utils/FILE_PATH_SEPARATOR "path")
exp-args (exp-args-fn [] [] (Utils/addToClasspath mock-cp [topo-cp]))
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/test/jvm/org/apache/storm/TestCgroups.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestCgroups.java b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
index f19ffc2..0857ba9 100644
--- a/storm-core/test/jvm/org/apache/storm/TestCgroups.java
+++ b/storm-core/test/jvm/org/apache/storm/TestCgroups.java
@@ -23,12 +23,17 @@ import org.junit.Assume;
import org.apache.storm.container.cgroup.CgroupManager;
import org.apache.storm.utils.Utils;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -37,6 +42,8 @@ import java.util.UUID;
*/
public class TestCgroups {
+ private static final Logger LOG = LoggerFactory.getLogger(TestCgroups.class);
+
/**
* Test whether cgroups are setup up correctly for use. Also tests whether Cgroups produces the right command to
* start a worker and cleans up correctly after the worker is shutdown
@@ -46,7 +53,7 @@ public class TestCgroups {
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
//We don't want to run the test is CGroups are not setup
- Assume.assumeTrue("Check if CGroups are setup", ((boolean) config.get(Config.STORM_CGROUP_ENABLE)) == true);
+ Assume.assumeTrue("Check if CGroups are setup", ((boolean) config.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE)) == true);
Assert.assertTrue("Check if STORM_CGROUP_HIERARCHY_DIR exists", stormCgroupHierarchyExists(config));
Assert.assertTrue("Check if STORM_SUPERVISOR_CGROUP_ROOTDIR exists", stormCgroupSupervisorRootDirExists(config));
@@ -58,13 +65,18 @@ public class TestCgroups {
resourcesMap.put("cpu", 200);
resourcesMap.put("memory", 1024);
String workerId = UUID.randomUUID().toString();
- String command = manager.startNewWorker(workerId, resourcesMap);
+ manager.reserveResourcesForWorker(workerId, resourcesMap);
+ List<String> commandList = manager.getLaunchCommand(workerId, new ArrayList<String>());
+ StringBuilder command = new StringBuilder();
+ for (String entry : commandList) {
+ command.append(entry).append(" ");
+ }
String correctCommand1 = config.get(Config.STORM_CGROUP_CGEXEC_CMD) + " -g memory,cpu:/"
- + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId;
+ + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId + " ";
String correctCommand2 = config.get(Config.STORM_CGROUP_CGEXEC_CMD) + " -g cpu,memory:/"
- + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId;
- Assert.assertTrue("Check if cgroup launch command is correct", command.equals(correctCommand1) || command.equals(correctCommand2));
+ + config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId + " ";
+ Assert.assertTrue("Check if cgroup launch command is correct", command.toString().equals(correctCommand1) || command.toString().equals(correctCommand2));
String pathToWorkerCgroupDir = ((String) config.get(Config.STORM_CGROUP_HIERARCHY_DIR))
+ "/" + ((String) config.get(Config.STORM_SUPERVISOR_CGROUP_ROOTDIR)) + "/" + workerId;
@@ -84,7 +96,7 @@ public class TestCgroups {
Assert.assertTrue("Check if memory.limit_in_bytes file exists", fileExists(pathTomemoryLimitInBytes));
Assert.assertEquals("Check if the correct value is written into memory.limit_in_bytes", String.valueOf(1024 * 1024 * 1024), readFileAll(pathTomemoryLimitInBytes));
- manager.shutDownWorker(workerId, true);
+ manager.releaseResourcesForWorker(workerId);
Assert.assertFalse("Make sure cgroup was removed properly", dirExists(pathToWorkerCgroupDir));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9421cd8/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index c4c1b3b..78c73a1 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -140,6 +140,9 @@ public class TestResourceAwareScheduler {
config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 128.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0);
config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();