You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:56 UTC
[18/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java
deleted file mode 100755
index 63a725a..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/Metric.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.callback.Callback;
-import com.alibaba.jstorm.common.metric.operator.Sampling;
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-import com.alibaba.jstorm.utils.IntervalCheck;
-
-public class Metric<T, V> implements Sampling<Map<Integer, T>> {
- private static final long serialVersionUID = -1362345159511508074L;
- private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
-
- protected static boolean enable;
-
- public static void setEnable(boolean e) {
- enable = e;
- }
-
- protected List<RollingWindow<V>> rollingWindows;
- protected AllWindow<V> allWindow;
-
- protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW,
- StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW };
- protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS;
- protected V defaultValue;
- protected Updater<V> updater;
- protected Merger<V> merger;
- protected Convertor<V, T> convertor;
- protected Callback callback;
-
- protected int interval; // unit is second
- protected IntervalCheck intervalCheck;
- protected V unflushed;
-
- public Metric() {
- }
-
- public int getInterval() {
- if (windowSeconds == null || windowSeconds.length == 0) {
- return StatBuckets.NUM_STAT_BUCKETS;
- }
-
- int intervals[] = new int[windowSeconds.length];
- int smallest = Integer.MAX_VALUE;
- for (int i = 0; i < windowSeconds.length; i++) {
- int interval = windowSeconds[i] / bucketSize;
- intervals[i] = interval;
- if (interval < smallest) {
- smallest = interval;
- }
- }
-
- for (int goodInterval = smallest; goodInterval > 1; goodInterval--) {
- boolean good = true;
- for (int interval : intervals) {
- if (interval % goodInterval != 0) {
- good = false;
- break;
- }
- }
-
- if (good == true) {
- return goodInterval;
- }
- }
-
- return 1;
- }
-
- public void init() {
- if (defaultValue == null || updater == null || merger == null
- || convertor == null) {
- throw new IllegalArgumentException("Invalid argements");
- }
-
- rollingWindows = new ArrayList<RollingWindow<V>>();
- if (windowSeconds != null) {
- rollingWindows.clear();
- for (int windowSize : windowSeconds) {
- RollingWindow<V> rollingWindow =
- new RollingWindow<V>(defaultValue, windowSize
- / bucketSize, windowSize, updater, merger);
-
- rollingWindows.add(rollingWindow);
- }
-
- }
- allWindow = new AllWindow<V>(defaultValue, updater, merger);
-
- this.interval = getInterval();
- this.intervalCheck = new IntervalCheck();
- this.intervalCheck.setInterval(interval);
- }
-
- /**
- * In order to improve performance
- * Do
- */
- @Override
- public void update(Number obj) {
- if (enable == false) {
- return;
- }
-
- if (intervalCheck.check()) {
- flush();
- }
- synchronized (this) {
- unflushed = updater.update(obj, unflushed);
- }
- }
-
- public synchronized void flush() {
- if (unflushed == null) {
- return;
- }
- for (RollingWindow<V> rollingWindow : rollingWindows) {
- rollingWindow.updateBatch(unflushed);
- }
- allWindow.updateBatch(unflushed);
- unflushed = null;
- }
-
- @Override
- public Map<Integer, T> getSnapshot() {
- // TODO Auto-generated method stub
- flush();
-
- Map<Integer, T> ret = new TreeMap<Integer, T>();
- for (RollingWindow<V> rollingWindow : rollingWindows) {
- V value = rollingWindow.getSnapshot();
-
- ret.put(rollingWindow.getWindowSecond(), convertor.convert(value));
- }
-
- ret.put(StatBuckets.ALL_TIME_WINDOW,
- convertor.convert(allWindow.getSnapshot()));
-
- if (callback != null) {
- callback.execute(this);
- }
- return ret;
- }
-
- public T getAllTimeValue() {
- return convertor.convert(allWindow.getSnapshot());
- }
-
- public int[] getWindowSeconds() {
- return windowSeconds;
- }
-
- public void setWindowSeconds(int[] windowSeconds) {
- this.windowSeconds = windowSeconds;
- }
-
- public int getBucketSize() {
- return bucketSize;
- }
-
- public void setBucketSize(int bucketSize) {
- this.bucketSize = bucketSize;
- }
-
- public V getDefaultValue() {
- return defaultValue;
- }
-
- public void setDefaultValue(V defaultValue) {
- this.defaultValue = defaultValue;
- }
-
- public Updater<V> getUpdater() {
- return updater;
- }
-
- public void setUpdater(Updater<V> updater) {
- this.updater = updater;
- }
-
- public Merger<V> getMerger() {
- return merger;
- }
-
- public void setMerger(Merger<V> merger) {
- this.merger = merger;
- }
-
- public Convertor<V, T> getConvertor() {
- return convertor;
- }
-
- public void setConvertor(Convertor<V, T> convertor) {
- this.convertor = convertor;
- }
-
- public Callback getCallback() {
- return callback;
- }
-
- public void setCallback(Callback callback) {
- this.callback = callback;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java
deleted file mode 100755
index 54047a6..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/RollingWindow.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.common.metric.operator.Sampling;
-import com.alibaba.jstorm.common.metric.operator.StartTime;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-import com.alibaba.jstorm.utils.IntervalCheck;
-import com.alibaba.jstorm.utils.TimeUtils;
-
-public class RollingWindow<V> implements Sampling<V>, StartTime {
- private static final long serialVersionUID = 3794478417380003279L;
- private static final Logger LOG = LoggerFactory
- .getLogger(RollingWindow.class);
-
- protected long startTime;
- protected Integer currBucketTime;
- protected int interval; // unit is second
- protected int windowSecond;
- protected IntervalCheck intervalCheck;
-
- protected TreeMap<Integer, V> buckets;
- protected Integer bucketNum;
- protected V unflushed;
- protected V defaultValue;
-
- protected Updater<V> updater;
- protected Merger<V> merger;
-
- RollingWindow(V defaultValue, int interval, int windowSecond,
- Updater<V> updater, Merger<V> merger) {
- this.startTime = System.currentTimeMillis();
- this.interval = interval;
- this.intervalCheck = new IntervalCheck();
- this.intervalCheck.setInterval(interval);
- this.currBucketTime = getCurrBucketTime();
-
- this.bucketNum = windowSecond / interval;
- this.windowSecond = (bucketNum) * interval;
-
- this.buckets = new TreeMap<Integer, V>();
-
- this.updater = updater;
- this.merger = merger;
-
- this.defaultValue = defaultValue;
-
- }
-
-
- @Override
- public void update(Number obj) {
- // TODO Auto-generated method stub
-
- if (intervalCheck.check()) {
- rolling();
- }
- synchronized (this) {
- unflushed = updater.update(obj, unflushed);
-
- }
-
- }
-
- /**
- * In order to improve performance
- * Flush one batch to rollingWindow
- *
- */
- public void updateBatch(V batch) {
-
- if (intervalCheck.check()) {
- rolling();
- }
- synchronized (this) {
- unflushed = updater.updateBatch(batch, unflushed);
- }
-
- }
-
- @Override
- public V getSnapshot() {
- // TODO Auto-generated method stub
- if (intervalCheck.check()) {
- rolling();
- }
-
- cleanExpiredBuckets();
- // @@@ Testing
- //LOG.info("Raw Data:" + buckets + ",unflushed:" + unflushed);
-
- Collection<V> values = buckets.values();
-
- V ret = merger.merge(values, unflushed, this);
- if (ret == null) {
-
- // @@@ testing
- //LOG.warn("!!!!Exist null data !!!!!");
- return defaultValue;
- }
- return ret;
- }
-
- /*
- * Move the "current bucket time" index and clean the expired buckets
- */
- protected void rolling() {
- synchronized (this) {
- if (unflushed != null) {
- buckets.put(currBucketTime, unflushed);
- unflushed = null;
- }
-
- currBucketTime = getCurrBucketTime();
-
- return ;
- }
- }
-
- protected void cleanExpiredBuckets() {
- int nowSec = TimeUtils.current_time_secs();
- int startRemove = nowSec - (interval - 1) - windowSecond;
-
- List<Integer> removeList = new ArrayList<Integer>();
-
- for (Integer keyTime : buckets.keySet()) {
- if (keyTime < startRemove) {
- removeList.add(keyTime);
- } else if (keyTime >= startRemove) {
- break;
- }
- }
-
- for (Integer removeKey : removeList) {
- buckets.remove(removeKey);
- // @@@ Testing
- //LOG.info("Remove key:" + removeKey + ", diff:" + (nowSec - removeKey));
-
- }
-
- if (buckets.isEmpty() == false) {
- Integer first = buckets.firstKey();
- startTime = first.longValue() * 1000;
- }
- }
-
- public int getWindowSecond() {
- return windowSecond;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public int getInterval() {
- return interval;
- }
-
- public Integer getBucketNum() {
- return bucketNum;
- }
-
- public V getDefaultValue() {
- return defaultValue;
- }
-
- private Integer getCurrBucketTime() {
- return (TimeUtils.current_time_secs() / interval) * interval;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java
deleted file mode 100755
index 3e9b021..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/StatBuckets.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class StatBuckets {
-
- public static final Integer NUM_STAT_BUCKETS = 20;
-
- public static final Integer MINUTE_WINDOW = 600;
- public static final Integer HOUR_WINDOW = 10800;
- public static final Integer DAY_WINDOW = 86400;
- public static final Integer ALL_TIME_WINDOW = 0;
- public static Set<Integer> TIME_WINDOWS = new TreeSet<Integer>();
- static {
- TIME_WINDOWS.add(ALL_TIME_WINDOW);
- TIME_WINDOWS.add(MINUTE_WINDOW);
- TIME_WINDOWS.add(HOUR_WINDOW);
- TIME_WINDOWS.add(DAY_WINDOW);
- }
-
- public static final String MINUTE_WINDOW_STR = "0d0h10m0s";
- public static final String HOUR_WINDOW_STR = "0d3h0m0s";
- public static final String DAY_WINDOW_STR = "1d0h0m0s";
- public static final String ALL_WINDOW_STR = "All-time";
-
- public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS,
- HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS };
-
- private static final String[][] PRETTYSECDIVIDERS = {
- new String[] { "s", "60" }, new String[] { "m", "60" },
- new String[] { "h", "24" }, new String[] { "d", null } };
-
- /**
- * Service b
- *
- * @param key
- * @return
- */
- public static String parseTimeKey(Integer key) {
- if (key == 0) {
- return ALL_WINDOW_STR;
- } else {
- return String.valueOf(key);
- }
- }
-
- /**
- *
- * Default is the latest result
- *
- * @param showKey
- * @return
- */
- public static Integer getTimeKey(String showKey) {
- Integer window = null;
- if (showKey == null) {
- window = (MINUTE_WINDOW);
- } else if (showKey.equals(MINUTE_WINDOW_STR)) {
- window = (MINUTE_WINDOW);
- } else if (showKey.equals(HOUR_WINDOW_STR)) {
- window = (HOUR_WINDOW);
- } else if (showKey.equals(DAY_WINDOW_STR)) {
- window = (DAY_WINDOW);
- } else if (showKey.equals(ALL_WINDOW_STR)) {
- window = ALL_TIME_WINDOW;
- } else {
- window = MINUTE_WINDOW;
- }
-
- return window;
- }
-
- /**
- * Default is the latest result
- *
- * @param showStr
- * @return
- */
- public static String getShowTimeStr(Integer time) {
- if (time == null) {
- return MINUTE_WINDOW_STR;
- } else if (time.equals(MINUTE_WINDOW)) {
- return MINUTE_WINDOW_STR;
- } else if (time.equals(HOUR_WINDOW)) {
- return HOUR_WINDOW_STR;
- } else if (time.equals(DAY_WINDOW)) {
- return DAY_WINDOW_STR;
- } else if (time.equals(ALL_TIME_WINDOW)) {
- return ALL_WINDOW_STR;
- } else {
- return MINUTE_WINDOW_STR;
- }
-
- }
-
- /**
- * seconds to string like 1d20h30m40s
- *
- * @param secs
- * @return
- */
- public static String prettyUptimeStr(int secs) {
- int diversize = PRETTYSECDIVIDERS.length;
-
- List<String> tmp = new ArrayList<String>();
- int div = secs;
- for (int i = 0; i < diversize; i++) {
- if (PRETTYSECDIVIDERS[i][1] != null) {
- Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
- tmp.add(div % d + PRETTYSECDIVIDERS[i][0]);
- div = div / d;
- } else {
- tmp.add(div + PRETTYSECDIVIDERS[i][0]);
- }
- }
-
- String rtn = "";
- int tmpSzie = tmp.size();
- for (int j = tmpSzie - 1; j > -1; j--) {
- rtn += tmp.get(j);
- }
- return rtn;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
deleted file mode 100755
index 2dbab6f..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.stats;
-
-public enum StaticsType {
- emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
index d9148db..15b1cfa 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupCenter.java
@@ -100,9 +100,8 @@ public class CgroupCenter implements CgroupOperation {
SubSystemType type = SubSystemType.getSubSystem(split[0]);
if (type == null)
continue;
- subSystems.add(new SubSystem(type, Integer.valueOf(split[1]),
- Integer.valueOf(split[2]), Integer.valueOf(split[3])
- .intValue() == 1 ? true : false));
+ subSystems.add(new SubSystem(type, Integer.valueOf(split[1]), Integer.valueOf(split[2]), Integer.valueOf(split[3]).intValue() == 1 ? true
+ : false));
}
return subSystems;
} catch (Exception e) {
@@ -168,8 +167,7 @@ public class CgroupCenter implements CgroupOperation {
if (!CgroupUtils.dirExists(hierarchy.getDir()))
new File(hierarchy.getDir()).mkdirs();
String subSystems = CgroupUtils.reAnalyse(subsystems);
- SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup",
- subSystems);
+ SystemOperation.mount(subSystems, hierarchy.getDir(), "cgroup", subSystems);
}
@@ -217,8 +215,7 @@ public class CgroupCenter implements CgroupOperation {
}
public static void main(String args[]) {
- System.out.println(CgroupCenter.getInstance().getHierarchies().get(0)
- .getRootCgroups().getChildren().size());
+ System.out.println(CgroupCenter.getInstance().getHierarchies().get(0).getRootCgroups().getChildren().size());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
index 4de2d5a..0cc45cc 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/CgroupUtils.java
@@ -82,8 +82,7 @@ public class CgroupUtils {
return CgroupUtils.fileExists(Constants.CGROUP_STATUS_FILE);
}
- public static List<String> readFileByLine(String fileDir)
- throws IOException {
+ public static List<String> readFileByLine(String fileDir) throws IOException {
List<String> result = new ArrayList<String>();
FileReader fileReader = null;
BufferedReader reader = null;
@@ -101,8 +100,7 @@ public class CgroupUtils {
return result;
}
- public static void writeFileByLine(String fileDir, List<String> strings)
- throws IOException {
+ public static void writeFileByLine(String fileDir, List<String> strings) throws IOException {
FileWriter writer = null;
BufferedWriter bw = null;
try {
@@ -123,8 +121,7 @@ public class CgroupUtils {
}
}
- public static void writeFileByLine(String fileDir, String string)
- throws IOException {
+ public static void writeFileByLine(String fileDir, String string) throws IOException {
FileWriter writer = null;
BufferedWriter bw = null;
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
index 1655e49..20d4ec0 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/SubSystem.java
@@ -27,8 +27,7 @@ public class SubSystem {
private boolean enable;
- public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum,
- boolean enable) {
+ public SubSystem(SubSystemType type, int hierarchyID, int cgroupNum, boolean enable) {
this.type = type;
this.hierarchyID = hierarchyID;
this.cgroupsNum = cgroupNum;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
index 0a772f6..224d05d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommon.java
@@ -59,9 +59,7 @@ public class CgroupCommon implements CgroupCommonOperation {
this.parent = parent;
this.dir = parent.getDir() + "/" + name;
this.init();
- cores =
- CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(),
- this.dir);
+ cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir);
this.isRoot = false;
}
@@ -74,23 +72,19 @@ public class CgroupCommon implements CgroupCommonOperation {
this.parent = null;
this.dir = dir;
this.init();
- cores =
- CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(),
- this.dir);
+ cores = CgroupCoreFactory.getInstance(this.hierarchy.getSubSystems(), this.dir);
this.isRoot = true;
}
@Override
public void addTask(int taskId) throws IOException {
// TODO Auto-generated method stub
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS),
- String.valueOf(taskId));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, TASKS), String.valueOf(taskId));
}
@Override
public Set<Integer> getTasks() throws IOException {
- List<String> stringTasks =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
+ List<String> stringTasks = CgroupUtils.readFileByLine(Constants.getDir(this.dir, TASKS));
Set<Integer> tasks = new HashSet<Integer>();
for (String task : stringTasks) {
tasks.add(Integer.valueOf(task));
@@ -101,16 +95,13 @@ public class CgroupCommon implements CgroupCommonOperation {
@Override
public void addProcs(int pid) throws IOException {
// TODO Auto-generated method stub
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS),
- String.valueOf(pid));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_PROCS), String.valueOf(pid));
}
@Override
public Set<Integer> getPids() throws IOException {
// TODO Auto-generated method stub
- List<String> stringPids =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- CGROUP_PROCS));
+ List<String> stringPids = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_PROCS));
Set<Integer> pids = new HashSet<Integer>();
for (String task : stringPids) {
pids.add(Integer.valueOf(task));
@@ -121,16 +112,12 @@ public class CgroupCommon implements CgroupCommonOperation {
@Override
public void setNotifyOnRelease(boolean flag) throws IOException {
// TODO Auto-generated method stub
- CgroupUtils
- .writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE),
- flag ? "1" : "0");
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE), flag ? "1" : "0");
}
@Override
public boolean getNotifyOnRelease() throws IOException {
- return CgroupUtils
- .readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE))
- .get(0).equals("1") ? true : false;
+ return CgroupUtils.readFileByLine(Constants.getDir(this.dir, NOTIFY_ON_RELEASE)).get(0).equals("1") ? true : false;
}
@Override
@@ -138,16 +125,14 @@ public class CgroupCommon implements CgroupCommonOperation {
// TODO Auto-generated method stub
if (!this.isRoot)
return;
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT),
- command);
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, RELEASE_AGENT), command);
}
@Override
public String getReleaseAgent() throws IOException {
if (!this.isRoot)
return null;
- return CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, RELEASE_AGENT)).get(0);
+ return CgroupUtils.readFileByLine(Constants.getDir(this.dir, RELEASE_AGENT)).get(0);
}
@Override
@@ -155,21 +140,16 @@ public class CgroupCommon implements CgroupCommonOperation {
// TODO Auto-generated method stub
if (!this.cores.keySet().contains(SubSystemType.cpuset))
return;
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir,
- CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN), flag ? "1" : "0");
}
@Override
public boolean getCgroupCloneChildren() throws IOException {
- return CgroupUtils
- .readFileByLine(
- Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN))
- .get(0).equals("1") ? true : false;
+ return CgroupUtils.readFileByLine(Constants.getDir(this.dir, CGROUP_CLONE_CHILDREN)).get(0).equals("1") ? true : false;
}
@Override
- public void setEventControl(String eventFd, String controlFd,
- String... args) throws IOException {
+ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException {
// TODO Auto-generated method stub
StringBuilder sb = new StringBuilder();
sb.append(eventFd);
@@ -179,10 +159,7 @@ public class CgroupCommon implements CgroupCommonOperation {
sb.append(' ');
sb.append(arg);
}
- CgroupUtils
- .writeFileByLine(
- Constants.getDir(this.dir, CGROUP_EVENT_CONTROL),
- sb.toString());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CGROUP_EVENT_CONTROL), sb.toString());
}
public Hierarchy getHierarchy() {
@@ -240,8 +217,7 @@ public class CgroupCommon implements CgroupCommonOperation {
return;
for (File child : files) {
if (child.isDirectory()) {
- this.children.add(new CgroupCommon(child.getName(),
- this.hierarchy, this));
+ this.children.add(new CgroupCommon(child.getName(), this.hierarchy, this));
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
index 3f9090f..a76b09c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCommonOperation.java
@@ -42,7 +42,6 @@ public interface CgroupCommonOperation {
public boolean getCgroupCloneChildren() throws IOException;
- public void setEventControl(String eventFd, String controlFd,
- String... args) throws IOException;
+ public void setEventControl(String eventFd, String controlFd, String... args) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
index 279366a..2b3f3a8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/CgroupCoreFactory.java
@@ -35,10 +35,8 @@ import com.alibaba.jstorm.container.cgroup.core.NetPrioCore;
public class CgroupCoreFactory {
- public static Map<SubSystemType, CgroupCore> getInstance(
- Set<SubSystemType> types, String dir) {
- Map<SubSystemType, CgroupCore> result =
- new HashMap<SubSystemType, CgroupCore>();
+ public static Map<SubSystemType, CgroupCore> getInstance(Set<SubSystemType> types, String dir) {
+ Map<SubSystemType, CgroupCore> result = new HashMap<SubSystemType, CgroupCore>();
for (SubSystemType type : types) {
switch (type) {
case blkio:
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
index 5d487ec..9958114 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/BlkioCore.java
@@ -33,25 +33,18 @@ public class BlkioCore implements CgroupCore {
public static final String BLKIO_WEIGHT_DEVICE = "/blkio.weight_device";
public static final String BLKIO_RESET_STATS = "/blkio.reset_stats";
- public static final String BLKIO_THROTTLE_READ_BPS_DEVICE =
- "/blkio.throttle.read_bps_device";
- public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE =
- "/blkio.throttle.write_bps_device";
- public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE =
- "/blkio.throttle.read_iops_device";
- public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE =
- "/blkio.throttle.write_iops_device";
-
- public static final String BLKIO_THROTTLE_IO_SERVICED =
- "/blkio.throttle.io_serviced";
- public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES =
- "/blkio.throttle.io_service_bytes";
+ public static final String BLKIO_THROTTLE_READ_BPS_DEVICE = "/blkio.throttle.read_bps_device";
+ public static final String BLKIO_THROTTLE_WRITE_BPS_DEVICE = "/blkio.throttle.write_bps_device";
+ public static final String BLKIO_THROTTLE_READ_IOPS_DEVICE = "/blkio.throttle.read_iops_device";
+ public static final String BLKIO_THROTTLE_WRITE_IOPS_DEVICE = "/blkio.throttle.write_iops_device";
+
+ public static final String BLKIO_THROTTLE_IO_SERVICED = "/blkio.throttle.io_serviced";
+ public static final String BLKIO_THROTTLE_IO_SERVICE_BYTES = "/blkio.throttle.io_service_bytes";
public static final String BLKIO_TIME = "/blkio.time";
public static final String BLKIO_SECTORS = "/blkio.sectors";
public static final String BLKIO_IO_SERVICED = "/blkio.io_serviced";
- public static final String BLKIO_IO_SERVICE_BYTES =
- "/blkio.io_service_bytes";
+ public static final String BLKIO_IO_SERVICE_BYTES = "/blkio.io_service_bytes";
public static final String BLKIO_IO_SERVICE_TIME = "/blkio.io_service_time";
public static final String BLKIO_IO_WAIT_TIME = "/blkio.io_wait_time";
public static final String BLKIO_IO_MERGED = "/blkio.io_merged";
@@ -71,28 +64,19 @@ public class BlkioCore implements CgroupCore {
/* weight: 100-1000 */
public void setBlkioWeight(int weight) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT),
- String.valueOf(weight));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT), String.valueOf(weight));
}
public int getBlkioWeight() throws IOException {
- return Integer.valueOf(
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0))
- .intValue();
+ return Integer.valueOf(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT)).get(0)).intValue();
}
- public void setBlkioWeightDevice(Device device, int weight)
- throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE),
- makeContext(device, weight));
+ public void setBlkioWeightDevice(Device device, int weight) throws IOException {
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE), makeContext(device, weight));
}
public Map<Device, Integer> getBlkioWeightDevice() throws IOException {
- List<String> strings =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_WEIGHT_DEVICE));
+ List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_WEIGHT_DEVICE));
Map<Device, Integer> result = new HashMap<Device, Integer>();
for (String string : strings) {
String[] strArgs = string.split(" ");
@@ -104,15 +88,11 @@ public class BlkioCore implements CgroupCore {
}
public void setReadBps(Device device, long bps) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE),
- makeContext(device, bps));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE), makeContext(device, bps));
}
public Map<Device, Long> getReadBps() throws IOException {
- List<String> strings =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_THROTTLE_READ_BPS_DEVICE));
+ List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_BPS_DEVICE));
Map<Device, Long> result = new HashMap<Device, Long>();
for (String string : strings) {
String[] strArgs = string.split(" ");
@@ -124,15 +104,11 @@ public class BlkioCore implements CgroupCore {
}
public void setWriteBps(Device device, long bps) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE),
- makeContext(device, bps));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE), makeContext(device, bps));
}
public Map<Device, Long> getWriteBps() throws IOException {
- List<String> strings =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_THROTTLE_WRITE_BPS_DEVICE));
+ List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_BPS_DEVICE));
Map<Device, Long> result = new HashMap<Device, Long>();
for (String string : strings) {
String[] strArgs = string.split(" ");
@@ -144,15 +120,11 @@ public class BlkioCore implements CgroupCore {
}
public void setReadIOps(Device device, long iops) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE),
- makeContext(device, iops));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE), makeContext(device, iops));
}
public Map<Device, Long> getReadIOps() throws IOException {
- List<String> strings =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_THROTTLE_READ_IOPS_DEVICE));
+ List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_READ_IOPS_DEVICE));
Map<Device, Long> result = new HashMap<Device, Long>();
for (String string : strings) {
String[] strArgs = string.split(" ");
@@ -164,15 +136,11 @@ public class BlkioCore implements CgroupCore {
}
public void setWriteIOps(Device device, long iops) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE),
- makeContext(device, iops));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE), makeContext(device, iops));
}
public Map<Device, Long> getWriteIOps() throws IOException {
- List<String> strings =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_THROTTLE_WRITE_IOPS_DEVICE));
+ List<String> strings = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_WRITE_IOPS_DEVICE));
Map<Device, Long> result = new HashMap<Device, Long>();
for (String string : strings) {
String[] strArgs = string.split(" ");
@@ -183,23 +151,17 @@ public class BlkioCore implements CgroupCore {
return result;
}
- public Map<Device, Map<RecordType, Long>> getThrottleIOServiced()
- throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_THROTTLE_IO_SERVICED)));
+ public Map<Device, Map<RecordType, Long>> getThrottleIOServiced() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICED)));
}
- public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte()
- throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES)));
+ public Map<Device, Map<RecordType, Long>> getThrottleIOServiceByte() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_THROTTLE_IO_SERVICE_BYTES)));
}
public Map<Device, Long> getBlkioTime() throws IOException {
Map<Device, Long> result = new HashMap<Device, Long>();
- List<String> strs =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_TIME));
+ List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_TIME));
for (String str : strs) {
String[] strArgs = str.split(" ");
result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
@@ -209,9 +171,7 @@ public class BlkioCore implements CgroupCore {
public Map<Device, Long> getBlkioSectors() throws IOException {
Map<Device, Long> result = new HashMap<Device, Long>();
- List<String> strs =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- BLKIO_SECTORS));
+ List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_SECTORS));
for (String str : strs) {
String[] strArgs = str.split(" ");
result.put(new Device(strArgs[0]), Long.parseLong(strArgs[1]));
@@ -219,43 +179,32 @@ public class BlkioCore implements CgroupCore {
return result;
}
- public Map<Device, Map<RecordType, Long>> getIOServiced()
- throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_IO_SERVICED)));
+ public Map<Device, Map<RecordType, Long>> getIOServiced() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICED)));
}
- public Map<Device, Map<RecordType, Long>> getIOServiceBytes()
- throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_IO_SERVICE_BYTES)));
+ public Map<Device, Map<RecordType, Long>> getIOServiceBytes() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_BYTES)));
}
- public Map<Device, Map<RecordType, Long>> getIOServiceTime()
- throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_IO_SERVICE_TIME)));
+ public Map<Device, Map<RecordType, Long>> getIOServiceTime() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_SERVICE_TIME)));
}
- public Map<Device, Map<RecordType, Long>> getIOWaitTime()
- throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_IO_WAIT_TIME)));
+ public Map<Device, Map<RecordType, Long>> getIOWaitTime() throws IOException {
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_WAIT_TIME)));
}
public Map<Device, Map<RecordType, Long>> getIOMerged() throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_IO_MERGED)));
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_MERGED)));
}
public Map<Device, Map<RecordType, Long>> getIOQueued() throws IOException {
- return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(
- this.dir, BLKIO_IO_QUEUED)));
+ return this.analyseRecord(CgroupUtils.readFileByLine(Constants.getDir(this.dir, BLKIO_IO_QUEUED)));
}
public void resetStats() throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, BLKIO_RESET_STATS), "1");
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, BLKIO_RESET_STATS), "1");
}
private String makeContext(Device device, Object data) {
@@ -265,8 +214,7 @@ public class BlkioCore implements CgroupCore {
}
private Map<Device, Map<RecordType, Long>> analyseRecord(List<String> strs) {
- Map<Device, Map<RecordType, Long>> result =
- new HashMap<Device, Map<RecordType, Long>>();
+ Map<Device, Map<RecordType, Long>> result = new HashMap<Device, Map<RecordType, Long>>();
for (String str : strs) {
String[] strArgs = str.split(" ");
if (strArgs.length != 3)
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
index 609898e..6a723a0 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuCore.java
@@ -46,62 +46,47 @@ public class CpuCore implements CgroupCore {
}
public void setCpuShares(int weight) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES),
- String.valueOf(weight));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_SHARES), String.valueOf(weight));
}
public int getCpuShares() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPU_SHARES)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_SHARES)).get(0));
}
public void setCpuRtRuntimeUs(long us) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPU_RT_RUNTIME_US),
- String.valueOf(us));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US), String.valueOf(us));
}
public long getCpuRtRuntimeUs() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_RUNTIME_US)).get(0));
}
public void setCpuRtPeriodUs(long us) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPU_RT_PERIOD_US),
- String.valueOf(us));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US), String.valueOf(us));
}
public Long getCpuRtPeriodUs() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_RT_PERIOD_US)).get(0));
}
public void setCpuCfsPeriodUs(long us) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPU_CFS_PERIOD_US),
- String.valueOf(us));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US), String.valueOf(us));
}
public Long getCpuCfsPeriodUs(long us) throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_PERIOD_US)).get(0));
}
public void setCpuCfsQuotaUs(long us) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPU_CFS_QUOTA_US),
- String.valueOf(us));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US), String.valueOf(us));
}
public Long getCpuCfsQuotaUs(long us) throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_CFS_QUOTA_US)).get(0));
}
public Stat getCpuStat() throws IOException {
- return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- CPU_STAT)));
+ return new Stat(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPU_STAT)));
}
public static class Stat {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
index c54421b..8bec196 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpuacctCore.java
@@ -45,14 +45,11 @@ public class CpuacctCore implements CgroupCore {
}
public Long getCpuUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUACCT_USAGE)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE)).get(0));
}
public Map<StatType, Long> getCpuStat() throws IOException {
- List<String> strs =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- CPUACCT_STAT));
+ List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_STAT));
Map<StatType, Long> result = new HashMap<StatType, Long>();
result.put(StatType.user, Long.parseLong(strs.get(0).split(" ")[1]));
result.put(StatType.system, Long.parseLong(strs.get(1).split(" ")[1]));
@@ -60,10 +57,7 @@ public class CpuacctCore implements CgroupCore {
}
public Long[] getPerCpuUsage() throws IOException {
- String str =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU))
- .get(0);
+ String str = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUACCT_USAGE_PERCPU)).get(0);
String[] strArgs = str.split(" ");
Long[] result = new Long[strArgs.length];
for (int i = 0; i < result.length; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
index d693b6c..02bcace 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/CpusetCore.java
@@ -32,18 +32,12 @@ public class CpusetCore implements CgroupCore {
public static final String CPUSET_CPU_EXCLUSIVE = "/cpuset.cpu_exclusive";
public static final String CPUSET_MEM_EXCLUSIVE = "/cpuset.mem_exclusive";
public static final String CPUSET_MEM_HARDWALL = "/cpuset.mem_hardwall";
- public static final String CPUSET_MEMORY_PRESSURE =
- "/cpuset.memory_pressure";
- public static final String CPUSET_MEMORY_PRESSURE_ENABLED =
- "/cpuset.memory_pressure_enabled";
- public static final String CPUSET_MEMORY_SPREAD_PAGE =
- "/cpuset.memory_spread_page";
- public static final String CPUSET_MEMORY_SPREAD_SLAB =
- "/cpuset.memory_spread_slab";
- public static final String CPUSET_SCHED_LOAD_BALANCE =
- "/cpuset.sched_load_balance";
- public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL =
- "/cpuset.sched_relax_domain_level";
+ public static final String CPUSET_MEMORY_PRESSURE = "/cpuset.memory_pressure";
+ public static final String CPUSET_MEMORY_PRESSURE_ENABLED = "/cpuset.memory_pressure_enabled";
+ public static final String CPUSET_MEMORY_SPREAD_PAGE = "/cpuset.memory_spread_page";
+ public static final String CPUSET_MEMORY_SPREAD_SLAB = "/cpuset.memory_spread_slab";
+ public static final String CPUSET_SCHED_LOAD_BALANCE = "/cpuset.sched_load_balance";
+ public static final String CPUSET_SCHED_RELAX_DOMAIN_LEVEL = "/cpuset.sched_relax_domain_level";
private final String dir;
@@ -64,14 +58,11 @@ public class CpusetCore implements CgroupCore {
sb.append(',');
}
sb.deleteCharAt(sb.length() - 1);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS),
- sb.toString());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPUS), sb.toString());
}
public int[] getCpus() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_CPUS)).get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPUS)).get(0);
return parseNums(output);
}
@@ -82,147 +73,97 @@ public class CpusetCore implements CgroupCore {
sb.append(',');
}
sb.deleteCharAt(sb.length() - 1);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS),
- sb.toString());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMS), sb.toString());
}
public int[] getMems() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMS)).get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMS)).get(0);
return parseNums(output);
}
public void setMemMigrate(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE), String.valueOf(flag ? 1 : 0));
}
public boolean isMemMigrate() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(
- 0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_MIGRATE)).get(0));
return output > 0;
}
public void setCpuExclusive(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
}
public boolean isCpuExclusive() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE))
- .get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_CPU_EXCLUSIVE)).get(0));
return output > 0;
}
public void setMemExclusive(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE), String.valueOf(flag ? 1 : 0));
}
public boolean isMemExclusive() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE))
- .get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_EXCLUSIVE)).get(0));
return output > 0;
}
public void setMemHardwall(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_MEM_HARDWALL),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL), String.valueOf(flag ? 1 : 0));
}
public boolean isMemHardwall() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEM_HARDWALL)).get(0));
return output > 0;
}
public int getMemPressure() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE))
- .get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE)).get(0);
return Integer.parseInt(output);
}
public void setMemPressureEnabled(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED), String.valueOf(flag ? 1 : 0));
}
public boolean isMemPressureEnabled() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir,
- CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_PRESSURE_ENABLED)).get(0));
return output > 0;
}
public void setMemSpreadPage(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE), String.valueOf(flag ? 1 : 0));
}
public boolean isMemSpreadPage() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE))
- .get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_PAGE)).get(0));
return output > 0;
}
public void setMemSpreadSlab(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB), String.valueOf(flag ? 1 : 0));
}
public boolean isMemSpreadSlab() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB))
- .get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_MEMORY_SPREAD_SLAB)).get(0));
return output > 0;
}
public void setSchedLoadBlance(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE), String.valueOf(flag ? 1 : 0));
}
public boolean isSchedLoadBlance() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE))
- .get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_LOAD_BALANCE)).get(0));
return output > 0;
}
public void setSchedRelaxDomainLevel(int value) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL),
- String.valueOf(value));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL), String.valueOf(value));
}
public int getSchedRelaxDomainLevel() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir,
- CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, CPUSET_SCHED_RELAX_DOMAIN_LEVEL)).get(0);
return Integer.parseInt(output);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
index 1832668..491fc8f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/DevicesCore.java
@@ -110,8 +110,7 @@ public class DevicesCore implements CgroupCore {
final int prime = 31;
int result = 1;
result = prime * result + accesses;
- result =
- prime * result + ((device == null) ? 0 : device.hashCode());
+ result = prime * result + ((device == null) ? 0 : device.hashCode());
result = prime * result + type;
return result;
}
@@ -161,27 +160,21 @@ public class DevicesCore implements CgroupCore {
}
}
- private void setPermission(String prop, char type, Device device,
- int accesses) throws IOException {
+ private void setPermission(String prop, char type, Device device, int accesses) throws IOException {
Record record = new Record(type, device, accesses);
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop),
- record.toString());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, prop), record.toString());
}
- public void setAllow(char type, Device device, int accesses)
- throws IOException {
+ public void setAllow(char type, Device device, int accesses) throws IOException {
setPermission(DEVICES_ALLOW, type, device, accesses);
}
- public void setDeny(char type, Device device, int accesses)
- throws IOException {
+ public void setDeny(char type, Device device, int accesses) throws IOException {
setPermission(DEVICES_DENY, type, device, accesses);
}
public Record[] getList() throws IOException {
- List<String> output =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- DEVICES_LIST));
+ List<String> output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, DEVICES_LIST));
return Record.parseRecordList(output);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
index c601c3e..e0ad3da 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/FreezerCore.java
@@ -40,13 +40,11 @@ public class FreezerCore implements CgroupCore {
}
public void setState(State state) throws IOException {
- CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE),
- state.name().toUpperCase());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, FREEZER_STATE), state.name().toUpperCase());
}
public State getState() throws IOException {
- return State.getStateValue(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, FREEZER_STATE)).get(0));
+ return State.getStateValue(CgroupUtils.readFileByLine(Constants.getDir(this.dir, FREEZER_STATE)).get(0));
}
public enum State {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
index a2db78c..1b37bd3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/MemoryCore.java
@@ -27,15 +27,11 @@ public class MemoryCore implements CgroupCore {
public static final String MEMORY_STAT = "/memory.stat";
public static final String MEMORY_USAGE_IN_BYTES = "/memory.usage_in_bytes";
- public static final String MEMORY_MEMSW_USAGE_IN_BYTES =
- "/memory.memsw.usage_in_bytes";
- public static final String MEMORY_MAX_USAGE_IN_BYTES =
- "/memory.max_usage_in_bytes";
- public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES =
- "/memory.memsw.max_usage_in_bytes";
+ public static final String MEMORY_MEMSW_USAGE_IN_BYTES = "/memory.memsw.usage_in_bytes";
+ public static final String MEMORY_MAX_USAGE_IN_BYTES = "/memory.max_usage_in_bytes";
+ public static final String MEMORY_MEMSW_MAX_USAGE_IN_BYTES = "/memory.memsw.max_usage_in_bytes";
public static final String MEMORY_LIMIT_IN_BYTES = "/memory.limit_in_bytes";
- public static final String MEMORY_MEMSW_LIMIT_IN_BYTES =
- "/memory.memsw.limit_in_bytes";
+ public static final String MEMORY_MEMSW_LIMIT_IN_BYTES = "/memory.memsw.limit_in_bytes";
public static final String MEMORY_FAILCNT = "/memory.failcnt";
public static final String MEMORY_MEMSW_FAILCNT = "/memory.memsw.failcnt";
public static final String MEMORY_FORCE_EMPTY = "/memory.force_empty";
@@ -115,113 +111,78 @@ public class MemoryCore implements CgroupCore {
}
public Stat getStat() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_STAT)).get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_STAT)).get(0);
Stat stat = new Stat(output);
return stat;
}
public long getPhysicalUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USAGE_IN_BYTES)).get(0));
}
public long getWithSwapUsage() throws IOException {
- return Long
- .parseLong(CgroupUtils
- .readFileByLine(
- Constants.getDir(this.dir,
- MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_USAGE_IN_BYTES)).get(0));
}
public long getMaxPhysicalUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MAX_USAGE_IN_BYTES)).get(0));
}
public long getMaxWithSwapUsage() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES))
- .get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_MAX_USAGE_IN_BYTES)).get(0));
}
public void setPhysicalUsageLimit(long value) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES),
- String.valueOf(value));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES), String.valueOf(value));
}
public long getPhysicalUsageLimit() throws IOException {
- return Long.parseLong(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_LIMIT_IN_BYTES)).get(0));
}
public void setWithSwapUsageLimit(long value) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES),
- String.valueOf(value));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES), String.valueOf(value));
}
public long getWithSwapUsageLimit() throws IOException {
- return Long
- .parseLong(CgroupUtils
- .readFileByLine(
- Constants.getDir(this.dir,
- MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
+ return Long.parseLong(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_LIMIT_IN_BYTES)).get(0));
}
public int getPhysicalFailCount() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_FAILCNT)).get(0));
}
public int getWithSwapFailCount() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_MEMSW_FAILCNT)).get(0));
}
public void clearForceEmpty() throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, MEMORY_FORCE_EMPTY),
- String.valueOf(0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_FORCE_EMPTY), String.valueOf(0));
}
public void setSwappiness(int value) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, MEMORY_SWAPPINESS),
- String.valueOf(value));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS), String.valueOf(value));
}
public int getSwappiness() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_SWAPPINESS)).get(0));
}
public void setUseHierarchy(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, MEMORY_USE_HIERARCHY),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY), String.valueOf(flag ? 1 : 0));
}
public boolean isUseHierarchy() throws IOException {
- int output =
- Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_USE_HIERARCHY))
- .get(0));
+ int output = Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_USE_HIERARCHY)).get(0));
return output > 0;
}
public void setOomControl(boolean flag) throws IOException {
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, MEMORY_OOM_CONTROL),
- String.valueOf(flag ? 1 : 0));
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL), String.valueOf(flag ? 1 : 0));
}
public boolean isOomControl() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, MEMORY_OOM_CONTROL)).get(0);
output = output.split("\n")[0].split("[\\s]")[1];
int value = Integer.parseInt(output);
return value > 0;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
index dd80c0a..e7c376d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetClsCore.java
@@ -58,14 +58,11 @@ public class NetClsCore implements CgroupCore {
StringBuilder sb = new StringBuilder("0x");
sb.append(toHex(major));
sb.append(toHex(minor));
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID), sb.toString());
}
public Device getClassId() throws IOException {
- String output =
- CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0);
+ String output = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_CLS_CLASSID)).get(0);
output = Integer.toHexString(Integer.parseInt(output));
int major = Integer.parseInt(output.substring(0, output.length() - 4));
int minor = Integer.parseInt(output.substring(output.length() - 4));
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
index fd7e899..6b9b344 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/container/cgroup/core/NetPrioCore.java
@@ -44,8 +44,7 @@ public class NetPrioCore implements CgroupCore {
}
public int getPrioId() throws IOException {
- return Integer.parseInt(CgroupUtils.readFileByLine(
- Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
+ return Integer.parseInt(CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_PRIOIDX)).get(0));
}
public void setIfPrioMap(String iface, int priority) throws IOException {
@@ -53,15 +52,12 @@ public class NetPrioCore implements CgroupCore {
sb.append(iface);
sb.append(' ');
sb.append(priority);
- CgroupUtils.writeFileByLine(
- Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
+ CgroupUtils.writeFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP), sb.toString());
}
public Map<String, Integer> getIfPrioMap() throws IOException {
Map<String, Integer> result = new HashMap<String, Integer>();
- List<String> strs =
- CgroupUtils.readFileByLine(Constants.getDir(this.dir,
- NET_PRIO_IFPRIOMAP));
+ List<String> strs = CgroupUtils.readFileByLine(Constants.getDir(this.dir, NET_PRIO_IFPRIOMAP));
for (String str : strs) {
String[] strArgs = str.split(" ");
result.put(strArgs[0], Integer.valueOf(strArgs[1]));
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
index 6c2bd21..f788996 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/DefaultInimbus.java
@@ -37,9 +37,8 @@ public class DefaultInimbus implements INimbus {
}
@Override
- public Collection<WorkerSlot> allSlotsAvailableForScheduling(
- Collection<SupervisorDetails> existingSupervisors,
- Topologies topologies, Set<String> topologiesMissingAssignments) {
+ public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies,
+ Set<String> topologiesMissingAssignments) {
// TODO Auto-generated method stub
Collection<WorkerSlot> result = new HashSet<WorkerSlot>();
for (SupervisorDetails detail : existingSupervisors) {
@@ -50,15 +49,13 @@ public class DefaultInimbus implements INimbus {
}
@Override
- public void assignSlots(Topologies topologies,
- Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+ public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
// TODO Auto-generated method stub
}
@Override
- public String getHostName(
- Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+ public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
index 3858595..97dd079 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusCache.java
@@ -17,104 +17,87 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.cache.RocksDBCache;
import com.alibaba.jstorm.cache.TimeoutMemCache;
-import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.task.TaskInfo;
-import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.utils.OSInfo;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
-public class NimbusCache{
- private static final long serialVersionUID = 1685576554130463610L;
-
+public class NimbusCache {
private static final Logger LOG = LoggerFactory.getLogger(NimbusCache.class);
-
-
+
public static final String TIMEOUT_MEM_CACHE_CLASS = TimeoutMemCache.class.getName();
public static final String ROCKS_DB_CACHE_CLASS = RocksDBCache.class.getName();
-
+
protected JStormCache memCache;
protected JStormCache dbCache;
protected StormClusterState zkCluster;
-
+
public String getNimbusCacheClass(Map conf) {
boolean isLinux = OSInfo.isLinux();
boolean isMac = OSInfo.isMac();
boolean isLocal = StormConfig.local_mode(conf);
-
+
if (isLocal == true) {
return TIMEOUT_MEM_CACHE_CLASS;
}
-
+
if (isLinux == false && isMac == false) {
return TIMEOUT_MEM_CACHE_CLASS;
}
-
+
String nimbusCacheClass = ConfigExtension.getNimbusCacheClass(conf);
if (StringUtils.isBlank(nimbusCacheClass) == false) {
return nimbusCacheClass;
}
-
+
return ROCKS_DB_CACHE_CLASS;
-
+
}
-
+
public NimbusCache(Map conf, StormClusterState zkCluster) {
super();
-
+
String dbCacheClass = getNimbusCacheClass(conf);
LOG.info("NimbusCache db Cache will use {}", dbCacheClass);
-
+
try {
- dbCache = (JStormCache)Utils.newInstance(dbCacheClass);
-
+ dbCache = (JStormCache) Utils.newInstance(dbCacheClass);
+
String dbDir = StormConfig.masterDbDir(conf);
conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir);
-
+
conf.put(RocksDBCache.ROCKSDB_RESET, ConfigExtension.getNimbusCacheReset(conf));
-
+
dbCache.init(conf);
-
+
if (dbCache instanceof TimeoutMemCache) {
memCache = dbCache;
- }else {
+ } else {
memCache = new TimeoutMemCache();
memCache.init(conf);
}
- }catch(java.lang.UnsupportedClassVersionError e) {
-
- if (e.getMessage().indexOf("Unsupported major.minor version") >= 0) {
- LOG.error("!!!Please update jdk version to 7 or higher!!!");
-
- }
- LOG.error("Failed to create NimbusCache!", e);
+ } catch (UnsupportedClassVersionError e) {
+
+ if (e.getMessage().indexOf("Unsupported major.minor version") >= 0) {
+ LOG.error("!!!Please update jdk version to 7 or higher!!!");
+
+ }
+ LOG.error("Failed to create NimbusCache!", e);
throw new RuntimeException(e);
} catch (Exception e) {
LOG.error("Failed to create NimbusCache!", e);
throw new RuntimeException(e);
}
-
+
this.zkCluster = zkCluster;
}
@@ -128,19 +111,15 @@ public class NimbusCache{
public void cleanup() {
dbCache.cleanup();
-
+
}
-
-
+
/**
*
- * In the old design,
- * DBCache will cache all taskInfo/taskErrors, this will be useful for huge topology
+ * In the old design, DBCache will cache all taskInfo/taskErrors, this will be useful for huge topology
*
- * But the latest zk design, taskInfo is only one znode, taskErros has few znode
- * So remove them from DBCache
- * Skip timely refresh taskInfo/taskErrors
+ * But the latest zk design, taskInfo is only one znode, taskErros has few znode So remove them from DBCache Skip timely refresh taskInfo/taskErrors
*
*/
-
+
}