You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:40 UTC
[02/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
index 6000688..59e14d9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java
@@ -43,12 +43,10 @@ import com.alibaba.jstorm.cluster.StormConfig;
*/
public class JStormServerUtils {
- private static final Logger LOG = LoggerFactory
- .getLogger(JStormServerUtils.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JStormServerUtils.class);
- public static void downloadCodeFromMaster(Map conf, String localRoot,
- String masterCodeDir, String topologyId, boolean isSupervisor)
- throws IOException, TException {
+ public static void downloadCodeFromMaster(Map conf, String localRoot, String masterCodeDir, String topologyId, boolean isSupervisor) throws IOException,
+ TException {
FileUtils.forceMkdir(new File(localRoot));
FileUtils.forceMkdir(new File(StormConfig.stormlib_path(localRoot)));
@@ -64,25 +62,18 @@ public class JStormServerUtils {
String masterStormConfPath = StormConfig.stormconf_path(masterCodeDir);
Utils.downloadFromMaster(conf, masterStormConfPath, localStormConfPath);
- Map stormConf =
- (Map) StormConfig.readLocalObject(topologyId,
- localStormConfPath);
+ Map stormConf = (Map) StormConfig.readLocalObject(topologyId, localStormConfPath);
if (stormConf == null)
throw new IOException("Get topology conf error: " + topologyId);
- List<String> libs =
- (List<String>) stormConf
- .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+ List<String> libs = (List<String>) stormConf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
if (libs == null)
return;
for (String libName : libs) {
- String localStromLibPath =
- StormConfig.stormlib_path(localRoot, libName);
- String masterStormLibPath =
- StormConfig.stormlib_path(masterCodeDir, libName);
- Utils.downloadFromMaster(conf, masterStormLibPath,
- localStromLibPath);
+ String localStromLibPath = StormConfig.stormlib_path(localRoot, libName);
+ String masterStormLibPath = StormConfig.stormlib_path(masterCodeDir, libName);
+ Utils.downloadFromMaster(conf, masterStormLibPath, localStromLibPath);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
index 983f579..ad56815 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
@@ -53,6 +53,7 @@ import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TFieldIdEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,11 +68,9 @@ import com.alibaba.jstorm.client.ConfigExtension;
* JStorm utility
*
* @author yannian/Longda/Xin.Zhou/Xin.Li
- *
*/
public class JStormUtils {
- private static final Logger LOG = LoggerFactory
- .getLogger(JStormUtils.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JStormUtils.class);
public static long SIZE_1_K = 1024;
public static long SIZE_1_M = SIZE_1_K * 1024;
@@ -223,8 +222,7 @@ public class JStormUtils {
}
/**
- * Gets the pid of this JVM, because Java doesn't provide a real way to do
- * this.
+ * Gets the pid of this JVM, because Java doesn't provide a real way to do this.
*
* @return
*/
@@ -238,8 +236,7 @@ public class JStormUtils {
return split[0];
}
- public static void exec_command(String command) throws ExecuteException,
- IOException {
+ public static void exec_command(String command) throws ExecuteException, IOException {
String[] cmdlist = command.split(" ");
CommandLine cmd = new CommandLine(cmdlist[0]);
for (int i = 1; i < cmdlist.length; i++) {
@@ -257,14 +254,12 @@ public class JStormUtils {
* @param dir
* @param destdir
*/
- public static void extract_dir_from_jar(String jarpath, String dir,
- String destdir) {
+ public static void extract_dir_from_jar(String jarpath, String dir, String destdir) {
String cmd = "unzip -qq " + jarpath + " " + dir + "/** -d " + destdir;
try {
exec_command(cmd);
} catch (Exception e) {
- LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd
- + "!\n" + e.getMessage());
+ LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd + "!\n" + e.getMessage());
}
}
@@ -278,8 +273,7 @@ public class JStormUtils {
LOG.info("kill -9 process " + pid);
sleepMs(100);
} catch (ExecuteException e) {
- LOG.info("Error when trying to kill " + pid
- + ". Process has been killed");
+ LOG.info("Error when trying to kill " + pid + ". Process has been killed");
} catch (Exception e) {
LOG.info("Error when trying to kill " + pid + ".Exception ", e);
}
@@ -291,8 +285,7 @@ public class JStormUtils {
exec_command("kill " + pid);
LOG.info("kill process " + pid);
} catch (ExecuteException e) {
- LOG.info("Error when trying to kill " + pid
- + ". Process has been killed. ");
+ LOG.info("Error when trying to kill " + pid + ". Process has been killed. ");
} catch (Exception e) {
LOG.info("Error when trying to kill " + pid + ".Exception ", e);
}
@@ -349,7 +342,8 @@ public class JStormUtils {
String output = null;
try {
String pid = JStormUtils.process_pid();
- output = SystemOperation.exec("top -b -n 1 | grep " + pid);
+ String command = String.format("top -b -n 1 -p %s | grep -w %s", pid, pid);
+ output = SystemOperation.exec(command);
String subStr = output.substring(output.indexOf("S") + 1);
for (int i = 0; i < subStr.length(); i++) {
char ch = subStr.charAt(i);
@@ -369,64 +363,89 @@ public class JStormUtils {
return value;
}
-
+
+ public static Double getDiskUsage() {
+ if (!OSInfo.isLinux() && !OSInfo.isMac()) {
+ return 0.0;
+ }
+ try {
+ String output = SystemOperation.exec("df -h /");
+ if (output != null) {
+ String[] lines = output.split("[\\r\\n]+");
+ if (lines.length >= 2) {
+ String[] parts = lines[1].split("\\s+");
+ if (parts.length >= 5) {
+ String pct = parts[4];
+ if (pct.endsWith("%")) {
+ return Integer.valueOf(pct.substring(0, pct.length() - 1)) / 100.0;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("failed to get disk usage:", e);
+ }
+ return 0.0;
+ }
+
public static Double getMemUsage() {
- if (OSInfo.isLinux() == true) {
- try {
- Double value = 0.0;
+ if (OSInfo.isLinux() == true) {
+ try {
+ Double value = 0.0;
String pid = JStormUtils.process_pid();
- String output = SystemOperation.exec("top -b -n 1 | grep " + pid);
-
- int m = 0;
- String[] strArray = output.split(" ");
- for (int i = 0; i < strArray.length; i++) {
- String info = strArray[i];
- if (info.trim().length() == 0){
- continue;
- }
- if(m == 5) {
- // memory
- String unit = info.substring(info.length() - 1);
-
- if(unit.equalsIgnoreCase("g")) {
- value = Double.parseDouble(info.substring(0, info.length() - 1));
+ String command = String.format("top -b -n 1 -p %s | grep -w %s", pid, pid);
+ String output = SystemOperation.exec(command);
+
+ int m = 0;
+ String[] strArray = output.split(" ");
+ for (int i = 0; i < strArray.length; i++) {
+ String info = strArray[i];
+ if (info.trim().length() == 0) {
+ continue;
+ }
+ if (m == 5) {
+ // memory
+ String unit = info.substring(info.length() - 1);
+
+ if (unit.equalsIgnoreCase("g")) {
+ value = Double.parseDouble(info.substring(0, info.length() - 1));
value *= 1000000000;
- } else if(unit.equalsIgnoreCase("m")) {
- value = Double.parseDouble(info.substring(0, info.length() - 1));
- value *= 1000000;
- } else {
- value = Double.parseDouble(info);
- }
+ } else if (unit.equalsIgnoreCase("m")) {
+ value = Double.parseDouble(info.substring(0, info.length() - 1));
+ value *= 1000000;
+ } else {
+ value = Double.parseDouble(info);
+ }
+
+ //LOG.info("!!!! Get Memory Size:{}, info:{}", value, info);
return value;
- }
- if(m == 8) {
- // cpu usage
-
- }
- if(m == 9) {
- // memory ratio
-
- }
- m++;
- }
+ }
+ if (m == 8) {
+ // cpu usage
+
+ }
+ if (m == 9) {
+ // memory ratio
+
+ }
+ m++;
+ }
} catch (Exception e) {
LOG.warn("Failed to get memory usage .");
}
- }
-
- // this will be incorrect
- MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ }
+
+ // this will be incorrect
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
return Double.valueOf(memoryUsage.getUsed());
}
/**
- * If it is backend, please set resultHandler, such as
- * DefaultExecuteResultHandler If it is frontend,
- * ByteArrayOutputStream.toString get the result
- *
+ * If it is backend, please set resultHandler, such as DefaultExecuteResultHandler If it is frontend, ByteArrayOutputStream.toString get the result
+ * <p/>
* This function don't care whether the command is successfully or not
*
* @param command
@@ -436,9 +455,8 @@ public class JStormUtils {
* @return
* @throws IOException
*/
- public static ByteArrayOutputStream launchProcess(String command,
- final Map environment, final String workDir,
- ExecuteResultHandler resultHandler) throws IOException {
+ public static ByteArrayOutputStream launchProcess(String command, final Map environment, final String workDir, ExecuteResultHandler resultHandler)
+ throws IOException {
String[] cmdlist = command.split(" ");
@@ -479,8 +497,7 @@ public class JStormUtils {
}
- protected static java.lang.Process launchProcess(final String[] cmdlist,
- final Map<String, String> environment) throws IOException {
+ protected static Process launchProcess(final String[] cmdlist, final Map<String, String> environment) throws IOException {
ArrayList<String> buff = new ArrayList<String>();
for (String tok : cmdlist) {
if (!tok.isEmpty()) {
@@ -499,33 +516,26 @@ public class JStormUtils {
}
/**
- * @@@ it should use DefaultExecutor to start a process, but some little
- * problem have been found, such as exitCode/output string so still use
- * the old method to start process
- *
* @param command
* @param environment
* @param backend
* @return
* @throws IOException
+ * @@@ it should use DefaultExecutor to start a process, but some little problem have been found, such as exitCode/output string so still use the old method
+ * to start process
*/
- public static java.lang.Process launch_process(final String command,
- final Map<String, String> environment, boolean backend)
- throws IOException {
+ public static Process launch_process(final String command, final Map<String, String> environment, boolean backend) throws IOException {
if (backend == true) {
new Thread(new Runnable() {
@Override
public void run() {
- String[] cmdlist =
- (new String("nohup " + command + " &")).split(" ");
+ String[] cmdlist = (new String("nohup " + command + " &")).split(" ");
try {
launchProcess(cmdlist, environment);
} catch (IOException e) {
- LOG.error(
- "Failed to run " + command + ":" + e.getCause(),
- e);
+ LOG.error("Failed to run " + command + ":" + e.getCause(), e);
}
}
}).start();
@@ -568,9 +578,8 @@ public class JStormUtils {
}
/**
- *
* if the list exist repeat string, return the repeated string
- *
+ * <p/>
* this function will be used to check wheter bolt or spout exist same id
*
* @param sets
@@ -629,7 +638,7 @@ public class JStormUtils {
return rtn;
}
- public static <T> Long bit_xor_vals(java.util.List<T> vals) {
+ public static <T> Long bit_xor_vals(List<T> vals) {
Long rtn = 0l;
for (T n : vals) {
rtn = bit_xor(rtn, n);
@@ -638,7 +647,7 @@ public class JStormUtils {
return rtn;
}
- public static <T> Long bit_xor_vals_sets(java.util.Set<T> vals) {
+ public static <T> Long bit_xor_vals_sets(Set<T> vals) {
Long rtn = 0l;
for (T n : vals) {
rtn = bit_xor(rtn, n);
@@ -675,7 +684,7 @@ public class JStormUtils {
return rtn;
}
- public static <V> List<V> mk_list(java.util.Set<V> args) {
+ public static <V> List<V> mk_list(Set<V> args) {
ArrayList<V> rtn = new ArrayList<V>();
if (args != null) {
for (V o : args) {
@@ -712,8 +721,7 @@ public class JStormUtils {
} else if (o instanceof Long) {
return (Long) o;
} else {
- throw new RuntimeException("Invalid value "
- + o.getClass().getName() + " " + o);
+ throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
}
}
@@ -733,8 +741,18 @@ public class JStormUtils {
} else if (o instanceof Double) {
return (Double) o;
} else {
- throw new RuntimeException("Invalid value "
- + o.getClass().getName() + " " + o);
+ throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
+ }
+ }
+
+ public static Double parseDouble(Object o, double defaultValue) {
+ if (o == null) {
+ return defaultValue;
+ }
+ try {
+ return parseDouble(o);
+ } catch (Exception ignored) {
+ return defaultValue;
}
}
@@ -769,8 +787,7 @@ public class JStormUtils {
} else if (o instanceof Integer) {
return (Integer) o;
} else {
- throw new RuntimeException("Invalid value "
- + o.getClass().getName() + " " + o);
+ throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
}
}
@@ -791,6 +808,20 @@ public class JStormUtils {
}
}
+ public static Boolean parseBoolean(Object o) {
+ if (o == null) {
+ return null;
+ }
+
+ if (o instanceof String) {
+ return Boolean.valueOf((String) o);
+ } else if (o instanceof Boolean) {
+ return (Boolean) o;
+ } else {
+ throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o);
+ }
+ }
+
public static boolean parseBoolean(Object o, boolean defaultValue) {
if (o == null) {
return defaultValue;
@@ -863,8 +894,7 @@ public class JStormUtils {
} else if (oldValue instanceof BigInteger) {
return ((BigInteger) oldValue).add((BigInteger) newValue);
} else if (oldValue instanceof Number) {
- return ((Number) oldValue).doubleValue()
- + ((Number) newValue).doubleValue();
+ return ((Number) oldValue).doubleValue() + ((Number) newValue).doubleValue();
} else {
return null;
}
@@ -933,8 +963,7 @@ public class JStormUtils {
public static String formatSimpleDouble(Double value) {
try {
- java.text.DecimalFormat form =
- new java.text.DecimalFormat("##0.000");
+ java.text.DecimalFormat form = new java.text.DecimalFormat("##0.000");
String s = form.format(value);
return s;
} catch (Exception e) {
@@ -955,8 +984,7 @@ public class JStormUtils {
public static double formatDoubleDecPoint4(Double value) {
try {
- java.text.DecimalFormat form =
- new java.text.DecimalFormat("###.0000");
+ java.text.DecimalFormat form = new java.text.DecimalFormat("###.0000");
String s = form.format(value);
return Double.valueOf(s);
} catch (Exception e) {
@@ -1041,18 +1069,13 @@ public class JStormUtils {
}
/**
- * @@@ Todo
- *
* @return
+ * @@@ Todo
*/
public static Long getPhysicMemorySize() {
Object object;
try {
- object =
- ManagementFactory.getPlatformMBeanServer().getAttribute(
- new ObjectName("java.lang", "type",
- "OperatingSystem"),
- "TotalPhysicalMemorySize");
+ object = ManagementFactory.getPlatformMBeanServer().getAttribute(new ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize");
} catch (Exception e) {
LOG.warn("Failed to get system physical memory size,", e);
return null;
@@ -1089,19 +1112,15 @@ public class JStormUtils {
public static String getLogFileName() {
try {
- Logger rootLogger =
- LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
+ Logger rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
if (rootLogger instanceof ch.qos.logback.classic.Logger) {
- ch.qos.logback.classic.Logger logbackLogger =
- (ch.qos.logback.classic.Logger) rootLogger;
+ ch.qos.logback.classic.Logger logbackLogger = (ch.qos.logback.classic.Logger) rootLogger;
// Logger framework is Logback
- for (Iterator<ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent>> index =
- logbackLogger.iteratorForAppenders(); index.hasNext();) {
- ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent> appender =
- index.next();
+ for (Iterator<ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent>> index = logbackLogger.iteratorForAppenders(); index
+ .hasNext();) {
+ ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent> appender = index.next();
if (appender instanceof ch.qos.logback.core.FileAppender) {
- ch.qos.logback.core.FileAppender fileAppender =
- (ch.qos.logback.core.FileAppender) appender;
+ ch.qos.logback.core.FileAppender fileAppender = (ch.qos.logback.core.FileAppender) appender;
return fileAppender.getFile();
}
}
@@ -1156,8 +1175,7 @@ public class JStormUtils {
FileOutputStream workerOut = new FileOutputStream(new File(file));
- PrintStream ps =
- new PrintStream(new BufferedOutputStream(workerOut), true);
+ PrintStream ps = new PrintStream(new BufferedOutputStream(workerOut), true);
System.setOut(ps);
System.setErr(ps);
@@ -1170,13 +1188,11 @@ public class JStormUtils {
return new AsyncLoopDefaultKill();
}
- public static TreeMap<Integer, Integer> integer_divided(int sum,
- int num_pieces) {
+ public static TreeMap<Integer, Integer> integer_divided(int sum, int num_pieces) {
return Utils.integerDivided(sum, num_pieces);
}
- public static <K, V> HashMap<K, V> filter_val(RunnableCallback fn,
- Map<K, V> amap) {
+ public static <K, V> HashMap<K, V> filter_val(RunnableCallback fn, Map<K, V> amap) {
HashMap<K, V> rtn = new HashMap<K, V>();
for (Entry<K, V> entry : amap.entrySet()) {
@@ -1191,16 +1207,14 @@ public class JStormUtils {
}
public static List<Integer> getSupervisorPortList(Map conf) {
- List<Integer> portList =
- (List<Integer>) conf.get(Config.SUPERVISOR_SLOTS_PORTS);
+ List<Integer> portList = (List<Integer>) conf.get(Config.SUPERVISOR_SLOTS_PORTS);
if (portList != null && portList.size() > 0) {
return portList;
}
LOG.info("Generate port list through CPU cores and system memory size");
- double cpuWeight =
- ConfigExtension.getSupervisorSlotsPortCpuWeight(conf);
+ double cpuWeight = ConfigExtension.getSupervisorSlotsPortCpuWeight(conf);
int sysCpuNum = 4;
try {
sysCpuNum = Runtime.getRuntime().availableProcessors();
@@ -1211,11 +1225,11 @@ public class JStormUtils {
int cpuPortNum = (int) (sysCpuNum / cpuWeight);
if (cpuPortNum < 1) {
- LOG.info("Invalid supervisor.slots.port.cpu.weight setting :"
- + cpuWeight + ", cpu cores:" + sysCpuNum);
+ LOG.info("Invalid supervisor.slots.port.cpu.weight setting :" + cpuWeight + ", cpu cores:" + sysCpuNum);
cpuPortNum = 1;
}
+ Double memWeight = ConfigExtension.getSupervisorSlotsPortMemWeight(conf);
int memPortNum = Integer.MAX_VALUE;
Long physicalMemSize = JStormUtils.getPhysicMemorySize();
if (physicalMemSize == null) {
@@ -1223,7 +1237,7 @@ public class JStormUtils {
} else {
LOG.info("Get system memory size :" + physicalMemSize);
long workerMemSize = ConfigExtension.getMemSizePerWorker(conf);
- memPortNum = (int) (physicalMemSize / workerMemSize);
+ memPortNum = (int) (physicalMemSize / (workerMemSize * memWeight));
if (memPortNum < 1) {
LOG.info("Invalide worker.memory.size setting:" + workerMemSize);
memPortNum = 4;
@@ -1261,14 +1275,10 @@ public class JStormUtils {
}
public static Object createDisruptorWaitStrategy(Map conf) {
- String waitStrategy =
- (String) conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY);
+ String waitStrategy = (String) conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY);
Object ret;
-
- if (waitStrategy.indexOf("TimeoutBlockingWaitStrategy") != -1) {
- long timeout =
- parseLong(conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT),
- 10);
+ if (waitStrategy.contains("TimeoutBlockingWaitStrategy")) {
+ long timeout = parseLong(conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10);
ret = Utils.newInstance(waitStrategy, timeout, TimeUnit.MILLISECONDS);
} else {
ret = Utils.newInstance(waitStrategy);
@@ -1276,4 +1286,64 @@ public class JStormUtils {
return ret;
}
+
+ public static Object thriftToObject(Object obj) {
+ Object ret = null;
+ if (obj instanceof org.apache.thrift.TBase) {
+ ret = thriftToMap((org.apache.thrift.TBase)obj);
+ }else if (obj instanceof List) {
+ ret = new ArrayList<>();
+ for (Object item : (List)obj) {
+ ((List)ret).add(thriftToObject(item));
+ }
+ }else if (obj instanceof Map) {
+ ret = new HashMap<String, Object>();
+ Set<Entry> entrySet = ((Map)obj).entrySet();
+ for (Entry entry : entrySet) {
+ ((Map)ret).put(String.valueOf(entry.getKey()), thriftToObject(entry.getValue()));
+ }
+ }else {
+
+ ret = String.valueOf(obj);
+ }
+
+ return ret;
+ }
+
+ public static Map<String, Object> thriftToMap(
+ org.apache.thrift.TBase thriftObj) {
+ Map<String, Object> ret = new HashMap<String, Object>();
+
+ int i = 1;
+ TFieldIdEnum field = thriftObj.fieldForId(i);
+ while(field != null) {
+ if (thriftObj.isSet(field)) {
+ Object obj = thriftObj.getFieldValue(field);
+ ret.put(field.getFieldName(), thriftToObject(obj));
+
+ }
+ field = thriftObj.fieldForId(++i);
+ }
+
+ return ret;
+ }
+
+ public static List<Map<String, Object>> thriftToMap(List thriftObjs) {
+ List<Map<String, Object> > ret = new ArrayList<Map<String, Object> > () ;
+
+ for (Object thriftObj : thriftObjs) {
+ ret.add(thriftToMap((org.apache.thrift.TBase)thriftObj));
+ }
+
+ return ret;
+ }
+
+
+ public static long halfValueOfSum(long v1, long v2, boolean increment) {
+ long ret = (v1 + v2) / 2;
+ if (increment) {
+ ret += (v1 + v2) % 2;
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
index d082fcc..91be977 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java
@@ -21,121 +21,121 @@ import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
public class LoadConf {
- private static final Logger LOG = LoggerFactory.getLogger(LoadConf.class);
-
- public static List<URL> findResources(String name) {
- try {
- Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
- List<URL> ret = new ArrayList<URL>();
- while (resources.hasMoreElements()) {
- ret.add(resources.nextElement());
- }
- return ret;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- *
- * @param name
- * @param mustExist -- if this is true, the file must exist, otherwise throw exception
- * @param canMultiple -- if this is false and there is multiple conf, it will throw exception
- * @return
- */
- public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) {
- InputStream in = null;
- boolean confFileEmpty = false;
- try {
- in = getConfigFileInputStream(name, canMultiple);
- if (null != in) {
- Yaml yaml = new Yaml(new SafeConstructor());
- Map ret = (Map) yaml.load(new InputStreamReader(in));
- if (null != ret) {
- return new HashMap(ret);
- } else {
- confFileEmpty = true;
- }
- }
-
- if (mustExist) {
- if (confFileEmpty)
- throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs");
- else
- throw new RuntimeException("Could not find config file on classpath " + name);
- } else {
- return new HashMap();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (null != in) {
- try {
- in.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- public static InputStream getConfigFileInputStream(String configFilePath, boolean canMultiple) throws IOException {
- if (null == configFilePath) {
- throw new IOException("Could not find config file, name not specified");
- }
-
- HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath));
- if (resources.isEmpty()) {
- File configFile = new File(configFilePath);
- if (configFile.exists()) {
- return new FileInputStream(configFile);
- }
- } else if (resources.size() > 1 && canMultiple == false) {
- throw new IOException("Found multiple " + configFilePath
- + " resources. You're probably bundling the Storm jars with your topology jar. " + resources);
- } else {
- LOG.info("Using " + configFilePath + " from resources");
- URL resource = resources.iterator().next();
- return resource.openStream();
- }
- return null;
- }
-
- public static InputStream getConfigFileInputStream(String configFilePath) throws IOException {
- return getConfigFileInputStream(configFilePath, true);
- }
-
- public static Map LoadYaml(String confPath) {
-
- return findAndReadYaml(confPath, true, true);
-
- }
-
- public static Map LoadProperty(String prop) {
-
- InputStream in = null;
- Properties properties = new Properties();
-
- try {
- in = getConfigFileInputStream(prop);
- properties.load(in);
- } catch (FileNotFoundException e) {
- throw new RuntimeException("No such file " + prop);
- } catch (Exception e1) {
- throw new RuntimeException("Failed to read config file");
- } finally {
- if (null != in) {
- try {
- in.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- Map ret = new HashMap();
- ret.putAll(properties);
- return ret;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(LoadConf.class);
+
+ public static List<URL> findResources(String name) {
+ try {
+ Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
+ List<URL> ret = new ArrayList<URL>();
+ while (resources.hasMoreElements()) {
+ ret.add(resources.nextElement());
+ }
+ return ret;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ *
+ * @param name
+ * @param mustExist -- if this is true, the file must exist, otherwise throw exception
+ * @param canMultiple -- if this is false and there is multiple conf, it will throw exception
+ * @return
+ */
+ public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) {
+ InputStream in = null;
+ boolean confFileEmpty = false;
+ try {
+ in = getConfigFileInputStream(name, canMultiple);
+ if (null != in) {
+ Yaml yaml = new Yaml(new SafeConstructor());
+ Map ret = (Map) yaml.load(new InputStreamReader(in));
+ if (null != ret) {
+ return new HashMap(ret);
+ } else {
+ confFileEmpty = true;
+ }
+ }
+
+ if (mustExist) {
+ if (confFileEmpty)
+ throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs");
+ else
+ throw new RuntimeException("Could not find config file on classpath " + name);
+ } else {
+ return new HashMap();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (null != in) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ public static InputStream getConfigFileInputStream(String configFilePath, boolean canMultiple) throws IOException {
+ if (null == configFilePath) {
+ throw new IOException("Could not find config file, name not specified");
+ }
+
+ HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath));
+ if (resources.isEmpty()) {
+ File configFile = new File(configFilePath);
+ if (configFile.exists()) {
+ return new FileInputStream(configFile);
+ }
+ } else if (resources.size() > 1 && canMultiple == false) {
+ throw new IOException("Found multiple " + configFilePath + " resources. You're probably bundling the Storm jars with your topology jar. "
+ + resources);
+ } else {
+ LOG.info("Using " + configFilePath + " from resources");
+ URL resource = resources.iterator().next();
+ return resource.openStream();
+ }
+ return null;
+ }
+
+ public static InputStream getConfigFileInputStream(String configFilePath) throws IOException {
+ return getConfigFileInputStream(configFilePath, true);
+ }
+
+ public static Map LoadYaml(String confPath) {
+
+ return findAndReadYaml(confPath, true, true);
+
+ }
+
+ public static Map LoadProperty(String prop) {
+
+ InputStream in = null;
+ Properties properties = new Properties();
+
+ try {
+ in = getConfigFileInputStream(prop);
+ properties.load(in);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("No such file " + prop);
+ } catch (Exception e1) {
+ throw new RuntimeException("Failed to read config file");
+ } finally {
+ if (null != in) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ Map ret = new HashMap();
+ ret.putAll(properties);
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
index 8bca599..e005f38 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java
@@ -103,41 +103,39 @@ public class NetWorkUtils {
try {
address = InetAddress.getByName(host);
} catch (UnknownHostException e) {
- LOG.warn("NetWorkUtil can't transfer hostname(" + host
- + ") to ip, return hostname", e);
+ LOG.warn("NetWorkUtil can't transfer hostname(" + host + ") to ip, return hostname", e);
return host;
}
return address.getHostAddress();
}
-
- public static List<String> host2Ip(List<String> servers) {
- if (servers == null || servers.size() == 0) {
- return new ArrayList<String>();
- }
-
- Set<String> ret = new HashSet<String>();
- for (String server : servers) {
- if (StringUtils.isBlank(server)) {
- continue;
- }
-
- InetAddress ia;
- try {
- ia = InetAddress.getByName(server);
- } catch (UnknownHostException e) {
- // TODO Auto-generated catch block
- LOG.info("Fail to get address of ", server);
- continue;
- }
- if (ia.isLoopbackAddress() || ia.isAnyLocalAddress()) {
- ret.add(NetWorkUtils.ip());
- }else {
- ret.add(ia.getHostAddress());
- }
- }
-
-
- return JStormUtils.mk_list(ret);
+
+ public static List<String> host2Ip(List<String> servers) {
+ if (servers == null || servers.size() == 0) {
+ return new ArrayList<String>();
+ }
+
+ Set<String> ret = new HashSet<String>();
+ for (String server : servers) {
+ if (StringUtils.isBlank(server)) {
+ continue;
+ }
+
+ InetAddress ia;
+ try {
+ ia = InetAddress.getByName(server);
+ } catch (UnknownHostException e) {
+ // TODO Auto-generated catch block
+ LOG.info("Fail to get address of ", server);
+ continue;
+ }
+ if (ia.isLoopbackAddress() || ia.isAnyLocalAddress()) {
+ ret.add(NetWorkUtils.ip());
+ } else {
+ ret.add(ia.getHostAddress());
+ }
+ }
+
+ return JStormUtils.mk_list(ret);
}
public static String ip2Host(String ip) {
@@ -145,8 +143,7 @@ public class NetWorkUtils {
try {
address = InetAddress.getByName(ip);
} catch (UnknownHostException e) {
- LOG.warn("NetWorkUtil can't transfer ip(" + ip
- + ") to hostname, return ip", e);
+ LOG.warn("NetWorkUtil can't transfer ip(" + ip + ") to hostname, return ip", e);
return ip;
}
return address.getHostName();
@@ -168,13 +165,13 @@ public class NetWorkUtils {
return StringUtils.equalsIgnoreCase(ip1, ip2);
}
-
+
public static void main(String[] args) {
- List<String> servers = new ArrayList<String>();
- servers.add("localhost");
-
- System.out.println(host2Ip(servers));
-
+ List<String> servers = new ArrayList<String>();
+ servers.add("localhost");
+
+ System.out.println(host2Ip(servers));
+
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
index d4f6e0f..f5acda7 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java
@@ -17,141 +17,144 @@
*/
package com.alibaba.jstorm.utils;
-public class OSInfo {
-
- private static String OS = System.getProperty("os.name").toLowerCase();
-
- private static OSInfo _instance = new OSInfo();
-
- private EPlatform platform;
-
- private OSInfo(){}
-
- public static boolean isLinux(){
- return OS.indexOf("linux")>=0;
- }
-
- public static boolean isMacOS(){
- return OS.indexOf("mac")>=0&&OS.indexOf("os")>0&&OS.indexOf("x")<0;
- }
-
- public static boolean isMacOSX(){
- return OS.indexOf("mac")>=0&&OS.indexOf("os")>0&&OS.indexOf("x")>0;
- }
-
+public class OSInfo {
+
+ private static String OS = System.getProperty("os.name").toLowerCase();
+
+ private static OSInfo _instance = new OSInfo();
+
+ private EPlatform platform;
+
+ private OSInfo() {
+ }
+
+ public static boolean isLinux() {
+ return OS.indexOf("linux") >= 0;
+ }
+
+ public static boolean isMacOS() {
+ return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0 && OS.indexOf("x") < 0;
+ }
+
+ public static boolean isMacOSX() {
+ return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0 && OS.indexOf("x") > 0;
+ }
+
public static boolean isMac() {
- return OS.indexOf("mac")>=0&&OS.indexOf("os")>0;
- }
-
- public static boolean isWindows(){
- return OS.indexOf("windows")>=0;
- }
-
- public static boolean isOS2(){
- return OS.indexOf("os/2")>=0;
- }
-
- public static boolean isSolaris(){
- return OS.indexOf("solaris")>=0;
- }
-
- public static boolean isSunOS(){
- return OS.indexOf("sunos")>=0;
- }
-
- public static boolean isMPEiX(){
- return OS.indexOf("mpe/ix")>=0;
- }
-
- public static boolean isHPUX(){
- return OS.indexOf("hp-ux")>=0;
- }
-
- public static boolean isAix(){
- return OS.indexOf("aix")>=0;
- }
-
- public static boolean isOS390(){
- return OS.indexOf("os/390")>=0;
- }
-
- public static boolean isFreeBSD(){
- return OS.indexOf("freebsd")>=0;
- }
-
- public static boolean isIrix(){
- return OS.indexOf("irix")>=0;
- }
-
- public static boolean isDigitalUnix(){
- return OS.indexOf("digital")>=0&&OS.indexOf("unix")>0;
- }
-
- public static boolean isNetWare(){
- return OS.indexOf("netware")>=0;
- }
-
- public static boolean isOSF1(){
- return OS.indexOf("osf1")>=0;
- }
-
- public static boolean isOpenVMS(){
- return OS.indexOf("openvms")>=0;
- }
-
- /**
- * Get OS name
- * @return OS name
- */
- public static EPlatform getOSname(){
- if(isAix()){
- _instance.platform = EPlatform.AIX;
- }else if (isDigitalUnix()) {
- _instance.platform = EPlatform.Digital_Unix;
- }else if (isFreeBSD()) {
- _instance.platform = EPlatform.FreeBSD;
- }else if (isHPUX()) {
- _instance.platform = EPlatform.HP_UX;
- }else if (isIrix()) {
- _instance.platform = EPlatform.Irix;
- }else if (isLinux()) {
- _instance.platform = EPlatform.Linux;
- }else if (isMacOS()) {
- _instance.platform = EPlatform.Mac_OS;
- }else if (isMacOSX()) {
- _instance.platform = EPlatform.Mac_OS_X;
- }else if (isMPEiX()) {
- _instance.platform = EPlatform.MPEiX;
- }else if (isNetWare()) {
- _instance.platform = EPlatform.NetWare_411;
- }else if (isOpenVMS()) {
- _instance.platform = EPlatform.OpenVMS;
- }else if (isOS2()) {
- _instance.platform = EPlatform.OS2;
- }else if (isOS390()) {
- _instance.platform = EPlatform.OS390;
- }else if (isOSF1()) {
- _instance.platform = EPlatform.OSF1;
- }else if (isSolaris()) {
- _instance.platform = EPlatform.Solaris;
- }else if (isSunOS()) {
- _instance.platform = EPlatform.SunOS;
- }else if (isWindows()) {
- _instance.platform = EPlatform.Windows;
- }else{
- _instance.platform = EPlatform.Others;
- }
- return _instance.platform;
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- System.out.println( System.getProperty("os.name") );
- System.out.println( System.getProperty("os.version") );
- System.out.println( System.getProperty("os.arch") );
-
- System.out.println(OSInfo.getOSname());
- }
-
-}
+ return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0;
+ }
+
+ public static boolean isWindows() {
+ return OS.indexOf("windows") >= 0;
+ }
+
+ public static boolean isOS2() {
+ return OS.indexOf("os/2") >= 0;
+ }
+
+ public static boolean isSolaris() {
+ return OS.indexOf("solaris") >= 0;
+ }
+
+ public static boolean isSunOS() {
+ return OS.indexOf("sunos") >= 0;
+ }
+
+ public static boolean isMPEiX() {
+ return OS.indexOf("mpe/ix") >= 0;
+ }
+
+ public static boolean isHPUX() {
+ return OS.indexOf("hp-ux") >= 0;
+ }
+
+ public static boolean isAix() {
+ return OS.indexOf("aix") >= 0;
+ }
+
+ public static boolean isOS390() {
+ return OS.indexOf("os/390") >= 0;
+ }
+
+ public static boolean isFreeBSD() {
+ return OS.indexOf("freebsd") >= 0;
+ }
+
+ public static boolean isIrix() {
+ return OS.indexOf("irix") >= 0;
+ }
+
+ public static boolean isDigitalUnix() {
+ return OS.indexOf("digital") >= 0 && OS.indexOf("unix") > 0;
+ }
+
+ public static boolean isNetWare() {
+ return OS.indexOf("netware") >= 0;
+ }
+
+ public static boolean isOSF1() {
+ return OS.indexOf("osf1") >= 0;
+ }
+
+ public static boolean isOpenVMS() {
+ return OS.indexOf("openvms") >= 0;
+ }
+
+ /**
+ * Get OS name
+ *
+ * @return OS name
+ */
+ public static EPlatform getOSname() {
+ if (isAix()) {
+ _instance.platform = EPlatform.AIX;
+ } else if (isDigitalUnix()) {
+ _instance.platform = EPlatform.Digital_Unix;
+ } else if (isFreeBSD()) {
+ _instance.platform = EPlatform.FreeBSD;
+ } else if (isHPUX()) {
+ _instance.platform = EPlatform.HP_UX;
+ } else if (isIrix()) {
+ _instance.platform = EPlatform.Irix;
+ } else if (isLinux()) {
+ _instance.platform = EPlatform.Linux;
+ } else if (isMacOS()) {
+ _instance.platform = EPlatform.Mac_OS;
+ } else if (isMacOSX()) {
+ _instance.platform = EPlatform.Mac_OS_X;
+ } else if (isMPEiX()) {
+ _instance.platform = EPlatform.MPEiX;
+ } else if (isNetWare()) {
+ _instance.platform = EPlatform.NetWare_411;
+ } else if (isOpenVMS()) {
+ _instance.platform = EPlatform.OpenVMS;
+ } else if (isOS2()) {
+ _instance.platform = EPlatform.OS2;
+ } else if (isOS390()) {
+ _instance.platform = EPlatform.OS390;
+ } else if (isOSF1()) {
+ _instance.platform = EPlatform.OSF1;
+ } else if (isSolaris()) {
+ _instance.platform = EPlatform.Solaris;
+ } else if (isSunOS()) {
+ _instance.platform = EPlatform.SunOS;
+ } else if (isWindows()) {
+ _instance.platform = EPlatform.Windows;
+ } else {
+ _instance.platform = EPlatform.Others;
+ }
+ return _instance.platform;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ System.out.println(System.getProperty("os.name"));
+ System.out.println(System.getProperty("os.version"));
+ System.out.println(System.getProperty("os.arch"));
+
+ System.out.println(OSInfo.getOSname());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
index 13b1d98..30c2726 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java
@@ -39,8 +39,25 @@ public class OlderFileFilter implements FileFilter {
long current_time = System.currentTimeMillis();
- return (pathname.isFile() && (pathname.lastModified() + seconds * 1000 <= current_time))
- || pathname.isDirectory();
+ return (pathname.lastModified() + seconds * 1000 <= current_time) ;
+ }
+
+
+ public static void main(String[] args) {
+ long current_time = System.currentTimeMillis();
+ String test = "test";
+
+
+ File file = new File(test);
+ file.delete();
+ file.mkdir();
+ file.setLastModified(current_time);
+
+ JStormUtils.sleepMs(10 * 1000);
+
+ File newFile = new File(test);
+ System.out.println("modify time: " + newFile.lastModified() + ", raw:" + current_time);
+
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
index 49d35d6..1bc8b56 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java
@@ -43,11 +43,11 @@ public class Pair<F, S> {
}
@Override
- public String toString(){
+ public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("first:"+ first);
+ sb.append("first:" + first);
sb.append(":");
- sb.append("sencond:"+ second);
+ sb.append("sencond:" + second);
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
index 939b81b..b3732dc 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java
@@ -44,7 +44,7 @@ public class PathUtils {
*/
public static List<String> tokenize_path(String path) {
String[] toks = path.split(SEPERATOR);
- java.util.ArrayList<String> rtn = new ArrayList<String>();
+ ArrayList<String> rtn = new ArrayList<String>();
for (String str : toks) {
if (!str.isEmpty()) {
rtn.add(str);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
index e3be73f..20b9535 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java
@@ -20,8 +20,7 @@ package com.alibaba.jstorm.utils;
import java.util.ArrayList;
/**
- * Shuffle the Range, This class is used in shuffle grouping, it is better than
- * random, which can't make sure balance.
+ * Shuffle the Range, This class is used in shuffle grouping, it is better than random, which can't make sure balance.
*
* @author yannian
*
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
index 454e987..877e1d6 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java
@@ -28,9 +28,8 @@ import java.util.concurrent.LinkedBlockingDeque;
/**
* RotatingMap must be used under thread-safe environment
*
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and expirationSecs * (1 +
- * 1 / (numBuckets-1)) to actually expire the message.
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
*
* get, put, remove, containsKey, and size take O(numBuckets) time to run.
*
@@ -45,8 +44,7 @@ public class RotatingMap<K, V> implements TimeOutMap<K, V> {
private final Object lock = new Object();
- public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback,
- boolean isSingleThread) {
+ public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback, boolean isSingleThread) {
if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
@@ -121,8 +119,7 @@ public class RotatingMap<K, V> implements TimeOutMap<K, V> {
/**
* Remove item from Rotate
*
- * On the side of performance, scanning from header is faster On the side of
- * logic, it should scan from the end to first.
+ * On the side of performance, scanning from header is faster On the side of logic, it should scan from the end to first.
*
* @param key
* @return
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
index ba7547b..5bc2252 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java
@@ -25,20 +25,16 @@ import org.slf4j.LoggerFactory;
public class SystemOperation {
- public static final Logger LOG = LoggerFactory
- .getLogger(SystemOperation.class);
+ public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class);
public static boolean isRoot() throws IOException {
String result = SystemOperation.exec("echo $EUID").substring(0, 1);
- return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true
- : false;
+ return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false;
};
- public static void mount(String name, String target, String type,
- String data) throws IOException {
+ public static void mount(String name, String target, String type, String data) throws IOException {
StringBuilder sb = new StringBuilder();
- sb.append("mount -t ").append(type).append(" -o ").append(data)
- .append(" ").append(name).append(" ").append(target);
+ sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target);
SystemOperation.exec(sb.toString());
}
@@ -50,9 +46,7 @@ public class SystemOperation {
public static String exec(String cmd) throws IOException {
LOG.debug("Shell cmd: " + cmd);
- Process process =
- new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd })
- .start();
+ Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start();
try {
process.waitFor();
String output = IOUtils.toString(process.getInputStream());
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
index c55751c..5116c4f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java
@@ -17,34 +17,22 @@
*/
package com.alibaba.jstorm.utils;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.JavaObject;
-import backtype.storm.generated.JavaObjectArg;
-import backtype.storm.generated.NullStruct;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
import backtype.storm.generated.StormTopology._Fields;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.generated.TopologyInitialStatus;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.IBolt;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.cluster.StormStatus;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Thrift utils
@@ -57,8 +45,7 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType;
public class Thrift {
private static Logger LOG = LoggerFactory.getLogger(Thrift.class);
- public static StormStatus topologyInitialStatusToStormStatus(
- TopologyInitialStatus tStatus) {
+ public static StormStatus topologyInitialStatusToStormStatus(TopologyInitialStatus tStatus) {
if (tStatus.equals(TopologyInitialStatus.ACTIVE)) {
return new StormStatus(StatusType.active);
} else {
@@ -79,16 +66,13 @@ public class Thrift {
paraTypes[i] = Integer.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) {
paraTypes[i] = Long.class;
- } else if (arg.getSetField().equals(
- JavaObjectArg._Fields.STRING_ARG)) {
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) {
paraTypes[i] = String.class;
} else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) {
paraTypes[i] = Boolean.class;
- } else if (arg.getSetField().equals(
- JavaObjectArg._Fields.BINARY_ARG)) {
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) {
paraTypes[i] = ByteBuffer.class;
- } else if (arg.getSetField().equals(
- JavaObjectArg._Fields.DOUBLE_ARG)) {
+ } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) {
paraTypes[i] = Double.class;
} else {
paraTypes[i] = Object.class;
@@ -113,8 +97,7 @@ public class Thrift {
public static List<String> fieldGrouping(Grouping grouping) {
if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) {
- throw new IllegalArgumentException(
- "Tried to get grouping fields from non fields grouping");
+ throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping");
}
return grouping.get_fields();
@@ -152,9 +135,11 @@ public class Thrift {
return Grouping.direct(new NullStruct());
}
- private static ComponentCommon mkComponentcommon(
- Map<GlobalStreamId, Grouping> inputs,
- HashMap<String, StreamInfo> output_spec, Integer parallelism_hint) {
+ public static Grouping mkAllGrouping() {
+ return Grouping.all(new NullStruct());
+ }
+
+ private static ComponentCommon mkComponentcommon(Map<GlobalStreamId, Grouping> inputs, HashMap<String, StreamInfo> output_spec, Integer parallelism_hint) {
ComponentCommon ret = new ComponentCommon(inputs, output_spec);
if (parallelism_hint != null) {
ret.set_parallelism_hint(parallelism_hint);
@@ -162,8 +147,7 @@ public class Thrift {
return ret;
}
- public static Bolt mkBolt(Map<GlobalStreamId, Grouping> inputs, IBolt bolt,
- HashMap<String, StreamInfo> output, Integer p) {
+ public static Bolt mkBolt(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, HashMap<String, StreamInfo> output, Integer p) {
ComponentCommon common = mkComponentcommon(inputs, output, p);
byte[] boltSer = Utils.serialize(bolt);
ComponentObject component = ComponentObject.serialized_java(boltSer);
@@ -171,8 +155,7 @@ public class Thrift {
}
public static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null;
- public static StormTopology._Fields[] SPOUT_FIELDS = {
- StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
+ public static StormTopology._Fields[] SPOUT_FIELDS = { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS };
static {
Set<_Fields> keys = StormTopology.metaDataMap.keySet();
STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()];
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
index c56e307..4d2ea0f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java
@@ -24,9 +24,8 @@ import java.util.Map;
import java.util.Map.Entry;
/**
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and expirationSecs * (1 +
- * 1 / (numBuckets-1)) to actually expire the message.
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
*
* get, put, remove, containsKey, and size take O(numBuckets) time to run.
*
@@ -42,8 +41,7 @@ public class TimeCacheMap<K, V> implements TimeOutMap<K, V> {
private Thread _cleaner;
private ExpiredCallback _callback;
- public TimeCacheMap(int expirationSecs, int numBuckets,
- ExpiredCallback<K, V> callback) {
+ public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
index 8468310..00e5cf3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java
@@ -25,14 +25,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and expirationSecs * (1 +
- * 1 / (numBuckets-1)) to actually expire the message.
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
*
* get, put, remove, containsKey, and size take O(numBuckets) time to run.
*
- * The advantage of this design is that the expiration thread only locks the
- * object for O(1) time, meaning the object is essentially always available for
+ * The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for
* poll/offer
*/
public class TimeCacheQueue<K> {
@@ -44,8 +42,7 @@ public class TimeCacheQueue<K> {
}
public static class DefaultExpiredCallback<K> implements ExpiredCallback<K> {
- protected static final Logger LOG = LoggerFactory
- .getLogger(TimeCacheQueue.DefaultExpiredCallback.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(DefaultExpiredCallback.class);
protected String queueName;
@@ -54,8 +51,7 @@ public class TimeCacheQueue<K> {
}
public void expire(K entry) {
- LOG.info("TimeCacheQueue " + queueName + " entry:" + entry
- + ", timeout");
+ LOG.info("TimeCacheQueue " + queueName + " entry:" + entry + ", timeout");
}
}
@@ -65,8 +61,7 @@ public class TimeCacheQueue<K> {
protected Thread _cleaner;
protected ExpiredCallback _callback;
- public TimeCacheQueue(int expirationSecs, int numBuckets,
- ExpiredCallback<K> callback) {
+ public TimeCacheQueue(int expirationSecs, int numBuckets, ExpiredCallback<K> callback) {
if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
@@ -130,8 +125,7 @@ public class TimeCacheQueue<K> {
public K poll() {
synchronized (_lock) {
- Iterator<LinkedBlockingDeque<K>> itor =
- _buckets.descendingIterator();
+ Iterator<LinkedBlockingDeque<K>> itor = _buckets.descendingIterator();
while (itor.hasNext()) {
LinkedBlockingDeque<K> bucket = itor.next();
K entry = bucket.poll();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
index a5c189f..fbae631 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java
@@ -40,29 +40,24 @@ public class TimeFormat {
public static final long ONE_DAY_HOURS = 24;
- public static final long ONE_MINUTE_MILLISECONDS = ONE_MINUTE_SECONDS
- * ONE_SECOND_MILLISECONDS;
+ public static final long ONE_MINUTE_MILLISECONDS = ONE_MINUTE_SECONDS * ONE_SECOND_MILLISECONDS;
- public static final long ONE_HOUR_MILLISECONDS = ONE_HOUR_MINUTES
- * ONE_MINUTE_MILLISECONDS;
+ public static final long ONE_HOUR_MILLISECONDS = ONE_HOUR_MINUTES * ONE_MINUTE_MILLISECONDS;
- public static final long ONE_DAY_MILLISECONDS = ONE_DAY_HOURS
- * ONE_HOUR_MILLISECONDS;
+ public static final long ONE_DAY_MILLISECONDS = ONE_DAY_HOURS * ONE_HOUR_MILLISECONDS;
public static Date convertDate(String dateStr, String format) {
Date date = null;
try {
if (format != null) {
- SimpleDateFormat simpleDateFormat =
- new SimpleDateFormat(format);
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
date = simpleDateFormat.parse(dateStr);
} else {
date = new Date(dateStr);
}
} catch (Exception ex) {
- log.error("Failed to convert " + dateStr + " to Date, format:"
- + format);
+ log.error("Failed to convert " + dateStr + " to Date, format:" + format);
return null;
}
return date;
@@ -77,8 +72,7 @@ public class TimeFormat {
ret = sdf.format(date);
} catch (Exception e) {
- log.error("Failed to convert " + date + " to String, format:"
- + format);
+ log.error("Failed to convert " + date + " to String, format:" + format);
return null;
}
return ret;
@@ -207,12 +201,9 @@ public class TimeFormat {
tomorrow.set(Calendar.MINUTE, 0);
Date startTime = tomorrow.getTime();
- long hourdiff =
- (startTime.getTime() - current.getTime())
- / ONE_HOUR_MILLISECONDS;
+ long hourdiff = (startTime.getTime() - current.getTime()) / ONE_HOUR_MILLISECONDS;
- System.out.println("Current:" + current + ", tomorrow" + startTime
- + ", diff hour" + hourdiff);
+ System.out.println("Current:" + current + ", tomorrow" + startTime + ", diff hour" + hourdiff);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
index 8c9bd3d..9068731 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,19 +18,23 @@
package com.alibaba.jstorm.utils;
import backtype.storm.utils.Time;
+import com.alibaba.jstorm.metric.AsmWindow;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
/**
* Time utils
- *
+ *
* @author yannian
- *
*/
public class TimeUtils {
+ public static final long NS_PER_MS = 1000000L;
+
/**
* Take care of int overflow
- *
- * @return
*/
public static int current_time_secs() {
return (int) (Time.currentTimeMillis() / 1000);
@@ -38,8 +42,6 @@ public class TimeUtils {
/**
* Take care of int overflow
- *
- * @return
*/
public static int time_delta(int time_secs) {
return current_time_secs() - time_secs;
@@ -48,4 +50,91 @@ public class TimeUtils {
public static long time_delta_ms(long time_ms) {
return System.currentTimeMillis() - time_ms;
}
+
+ public static final long NS_PER_US = 1000l;
+
+ public static final int ONE_SEC = 1;
+ public static final int SEC_PER_MIN = 60;
+ public static final int SEC_PER_DAY = 86400;
+
+ public static boolean isTimeAligned() {
+ return current_time_secs() % SEC_PER_DAY % SEC_PER_MIN == 0;
+ }
+
+ public static int secOffset() {
+ return current_time_secs() % SEC_PER_DAY % SEC_PER_MIN;
+ }
+
+ public static int secOffset(long ts) {
+ return (int) (ts % SEC_PER_DAY % SEC_PER_MIN);
+ }
+
+ public static int winSecOffset(long ts, int window) {
+ return (int) (ts / 1000 % SEC_PER_DAY % window);
+ }
+
+ public static long alignTimeToWin(long ts, int win) {
+ if (win != AsmWindow.D1_WINDOW) {
+ long curTimeSec = ts / 1000;
+ return (curTimeSec - curTimeSec % win) * 1000;
+ } else {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(ts);
+ int year = cal.get(Calendar.YEAR);
+ int month = cal.get(Calendar.MONTH);
+ int day = cal.get(Calendar.DAY_OF_MONTH);
+ int hour = cal.get(Calendar.HOUR);
+ int min = cal.get(Calendar.MINUTE);
+ int sec = cal.get(Calendar.SECOND);
+ if (sec + min + hour > 0) {
+ cal.set(year, month, day + 1, 0, 0, 0);
+ }
+ return cal.getTimeInMillis();
+ }
+ }
+
+ public static long alignTimeToMin(long ts) {
+ return alignTimeToWin(ts, AsmWindow.M1_WINDOW);
+ }
+
+ public static String toTimeStr(Date time) {
+ int hour = time.getHours();
+ int min = time.getMinutes();
+ StringBuilder sb = new StringBuilder();
+ if (hour < 10) {
+ sb.append(0).append(hour);
+ } else {
+ sb.append(hour);
+ }
+ sb.append(":");
+ if (min < 10) {
+ sb.append(0).append(min);
+ } else {
+ sb.append(min);
+ }
+ return sb.toString();
+ }
+
+ public static String format(int curTimeSec) {
+ return format(new Date(curTimeSec * 1000L), "yyyy-MM-dd HH:mm:ss");
+ }
+
+ public static String format(Date time, String fmt) {
+ SimpleDateFormat df = new SimpleDateFormat(fmt);
+ return df.format(time);
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.M1_WINDOW)));
+ System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.M10_WINDOW)));
+ System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.H2_WINDOW)));
+ System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.D1_WINDOW)));
+
+ Calendar cal = Calendar.getInstance();
+ cal.set(2015, 6, 23, 0, 0, 0);
+ System.out.println(new Date(alignTimeToWin(cal.getTimeInMillis(), AsmWindow.D1_WINDOW)));
+
+ System.out.println(format(TimeUtils.current_time_secs()));
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
index 09c25a5..eab0212 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java
@@ -32,8 +32,7 @@ public class ZkEventTypes {
map.put(Watcher.Event.EventType.NodeCreated, ":node-created");
map.put(Watcher.Event.EventType.NodeDeleted, ":node-deleted");
map.put(Watcher.Event.EventType.NodeDataChanged, ":node-data-changed");
- map.put(Watcher.Event.EventType.NodeChildrenChanged,
- ":node-children-changed");
+ map.put(Watcher.Event.EventType.NodeChildrenChanged, ":node-children-changed");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
index b726781..a098730 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java
@@ -20,34 +20,44 @@ package com.alibaba.jstorm.zk;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.alibaba.jstorm.cluster.Cluster;
+import org.apache.log4j.Logger;
import backtype.storm.Config;
import backtype.storm.utils.Utils;
-import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.cluster.DistributedClusterState;
import com.google.common.collect.Maps;
public class ZkTool {
- private static Logger LOG = LoggerFactory.getLogger(ZkTool.class);
+ private static Logger LOG = Logger.getLogger(ZkTool.class);
public static final String READ_CMD = "read";
public static final String RM_CMD = "rm";
+ public static final String LIST_CMD = "list";
+
+ public static final String CLEAN_CMD = "clean";
+
public static void usage() {
LOG.info("Read ZK node's data, please do as following:");
LOG.info(ZkTool.class.getName() + " read zkpath");
LOG.info("\nDelete topology backup assignment, please do as following:");
LOG.info(ZkTool.class.getName() + " rm topologyname");
+
+ LOG.info("\nlist subdirectory of zkPath , please do as following:");
+ LOG.info(ZkTool.class.getName() + " list zkpath");
+
+ LOG.info("\nDelete all nodes about a topologyId of zk , please do as following:");
+ LOG.info(ZkTool.class.getName() + " clean topologyId");
+
}
public static String getData(DistributedClusterState zkClusterState,
- String path) throws Exception {
+ String path) throws Exception {
byte[] data = zkClusterState.get_data(path, false);
if (data == null || data.length == 0) {
return null;
@@ -58,6 +68,135 @@ public class ZkTool {
return obj.toString();
}
+
+ public static void list(String path) {
+ DistributedClusterState zkClusterState = null;
+
+ try {
+ conf.put(Config.STORM_ZOOKEEPER_ROOT, "/");
+
+ zkClusterState = new DistributedClusterState(conf);
+
+ List<String> children = zkClusterState.get_children(path, false);
+ if (children == null || children.isEmpty() ) {
+ LOG.info("No children of " + path);
+ }
+ else
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Zk node children of " + path + "\n");
+ for (String str : children){
+ sb.append(" " + str + ",");
+ }
+ sb.append("\n");
+ LOG.info(sb.toString());
+ }
+ } catch (Exception e) {
+ if (zkClusterState == null) {
+ LOG.error("Failed to connect ZK ", e);
+ } else {
+ LOG.error("Failed to list children of " + path + "\n", e);
+ }
+ } finally {
+ if (zkClusterState != null) {
+ zkClusterState.close();
+ }
+ }
+ }
+ /**
+ * warnning! use this method cann't delete zkCache right now because of
+ * new DistributedClusterState(conf)
+ */
+ public static void cleanTopology( String topologyId){
+ DistributedClusterState zkClusterState = null;
+ try {
+ zkClusterState = new DistributedClusterState(conf);
+ String rootDir = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+ String assignmentPath = "/assignments/"+ topologyId;
+ String stormBase = "/topology/"+ topologyId;
+ String taskbeats = "/taskbeats/"+ topologyId;
+ String tasks = "/tasks/"+ topologyId;
+ String taskerrors = "/taskerrors/"+ topologyId;
+ String monitor = "/monitor/"+ topologyId;
+ if (zkClusterState.node_existed(assignmentPath, false)){
+ try {
+ zkClusterState.delete_node(assignmentPath);
+ } catch (Exception e) {
+ LOG.error("Could not remove assignments for " + topologyId, e);
+ }
+ }else {
+ LOG.info(" node of " + rootDir + assignmentPath + " isn't existed ");
+
+ }
+
+ if (zkClusterState.node_existed(stormBase, false)){
+ try {
+ zkClusterState.delete_node(stormBase);
+ } catch (Exception e) {
+ LOG.error("Failed to remove storm base for " + topologyId, e);
+ }
+ }else {
+ LOG.info(" node of " + rootDir + stormBase + " isn't existed ");
+
+ }
+
+ if (zkClusterState.node_existed(taskbeats, false)){
+ try {
+ zkClusterState.delete_node(taskbeats);
+ } catch (Exception e) {
+ LOG.error("Failed to remove taskbeats for " + topologyId, e);
+ }
+ }else {
+ LOG.info(" node of " + rootDir + taskbeats + " isn't existed ");
+
+ }
+
+ if (zkClusterState.node_existed(tasks, false)){
+ try {
+ zkClusterState.delete_node(tasks);
+ } catch (Exception e) {
+ LOG.error("Failed to remove tasks for " + topologyId, e);
+ }
+ }else {
+ LOG.info(" node of " + rootDir + tasks + " isn't existed ");
+
+ }
+
+ if (zkClusterState.node_existed(taskerrors, false)){
+ try {
+ zkClusterState.delete_node(taskerrors);
+ } catch (Exception e) {
+ LOG.error("Failed to remove taskerrors for " + topologyId, e);
+ }
+ }else {
+ LOG.info(" node of " + rootDir + taskerrors + " isn't existed ");
+
+ }
+
+ if (zkClusterState.node_existed(monitor, false)){
+ try {
+ zkClusterState.delete_node(monitor);
+ } catch (Exception e) {
+ LOG.error("Failed to remove monitor for " + topologyId, e);
+ }
+ }else {
+ LOG.info(" node of " + rootDir + monitor + " isn't existed ");
+
+ }
+ } catch (Exception e) {
+ if (zkClusterState == null) {
+ LOG.error("Failed to connect ZK ", e);
+ } else {
+ LOG.error("Failed to clean topolodyId: " + topologyId + "\n", e);
+ }
+ } finally {
+ if (zkClusterState != null) {
+ zkClusterState.close();
+ }
+ }
+
+ }
+
public static void readData(String path) {
DistributedClusterState zkClusterState = null;
@@ -110,8 +249,7 @@ public class ZkTool {
if (tid.equals(topologyName)) {
LOG.info("Find backup " + topologyName);
- String topologyPath =
- Cluster.assignment_bak_path(topologyName);
+ String topologyPath = assignment_bak_path(topologyName);
zkClusterState.delete_node(topologyPath);
LOG.info("Successfully delete topology " + topologyName
@@ -161,12 +299,21 @@ public class ZkTool {
} else if (args[0].equalsIgnoreCase(RM_CMD)) {
rmBakTopology(args[1]);
+ } else if (args[0].equalsIgnoreCase(LIST_CMD)) {
+ list(args[1]);
+ } else if (args[0].equalsIgnoreCase(CLEAN_CMD)) {
+ cleanTopology(args[1]);
}
}
/*******************************************************************/
+ public static String assignment_bak_path(String id) {
+ return Cluster.ASSIGNMENTS_BAK_SUBTREE + Cluster.ZK_SEPERATOR
+ + id;
+ }
+
@SuppressWarnings("rawtypes")
public static ClusterState mk_distributed_cluster_state(Map _conf)
throws Exception {
@@ -177,7 +324,8 @@ public class ZkTool {
throws Exception {
Map<String, String> ret = Maps.newHashMap();
List<String> followers =
- cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE, false);
+ cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE,
+ false);
if (followers == null || followers.size() == 0) {
return ret;
}