You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:51 UTC
[13/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
index 8d2ba24..78cfe73 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java
@@ -36,10 +36,12 @@ import com.alibaba.jstorm.container.cgroup.core.CgroupCore;
import com.alibaba.jstorm.container.cgroup.core.CpuCore;
import com.alibaba.jstorm.utils.JStormUtils;
+/**
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
+ */
public class CgroupManager {
- public static final Logger LOG = LoggerFactory
- .getLogger(CgroupManager.class);
+ public static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class);
public static final String JSTORM_HIERARCHY_NAME = "jstorm_cpu";
@@ -61,20 +63,16 @@ public class CgroupManager {
// "/cgroup/cpu"
rootDir = ConfigExtension.getCgroupRootDir(conf);
if (rootDir == null)
- throw new RuntimeException(
- "Check configuration file. The supervisor.cgroup.rootdir is missing.");
+ throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing.");
File file = new File(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir);
if (!file.exists()) {
- LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir
- + " is not existing.");
- throw new RuntimeException(
- "Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file.");
+ LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir + " is not existing.");
+ throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file.");
}
center = CgroupCenter.getInstance();
if (center == null)
- throw new RuntimeException(
- "Cgroup error, please check /proc/cgroups");
+ throw new RuntimeException("Cgroup error, please check /proc/cgroups");
this.prepareSubSystem();
}
@@ -90,13 +88,10 @@ public class CgroupManager {
return value;
}
- private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit)
- throws IOException {
+ private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException {
/*
- * User cfs_period & cfs_quota to control the upper limit use of cpu
- * core e.g. If making a process to fully use two cpu cores, set
- * cfs_period_us to 100000 and set cfs_quota_us to 200000 The highest
- * value of "cpu core upper limit" is 10
+ * User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. If making a process to fully use two cpu cores, set cfs_period_us to
+ * 100000 and set cfs_quota_us to 200000 The highest value of "cpu core upper limit" is 10
*/
cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit);
@@ -109,16 +104,13 @@ public class CgroupManager {
}
}
- public String startNewWorker(Map conf, int cpuNum, String workerId)
- throws SecurityException, IOException {
- CgroupCommon workerGroup =
- new CgroupCommon(workerId, h, this.rootCgroup);
+ public String startNewWorker(Map conf, int cpuNum, String workerId) throws SecurityException, IOException {
+ CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup);
this.center.create(workerGroup);
CgroupCore cpu = workerGroup.getCores().get(SubSystemType.cpu);
CpuCore cpuCore = (CpuCore) cpu;
cpuCore.setCpuShares(cpuNum * ONE_CPU_SLOT);
- setCpuUsageUpperLimit(cpuCore,
- ConfigExtension.getWorkerCpuCoreUpperLimit(conf));
+ setCpuUsageUpperLimit(cpuCore, ConfigExtension.getWorkerCpuCoreUpperLimit(conf));
StringBuilder sb = new StringBuilder();
sb.append("cgexec -g cpu:").append(workerGroup.getName()).append(" ");
@@ -126,8 +118,7 @@ public class CgroupManager {
}
public void shutDownWorker(String workerId, boolean isKilled) {
- CgroupCommon workerGroup =
- new CgroupCommon(workerId, h, this.rootCgroup);
+ CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup);
try {
if (isKilled == false) {
for (Integer pid : workerGroup.getTasks()) {
@@ -151,9 +142,7 @@ public class CgroupManager {
if (h == null) {
Set<SubSystemType> types = new HashSet<SubSystemType>();
types.add(SubSystemType.cpu);
- h =
- new Hierarchy(JSTORM_HIERARCHY_NAME, types,
- JSTORM_CPU_HIERARCHY_DIR);
+ h = new Hierarchy(JSTORM_HIERARCHY_NAME, types, JSTORM_CPU_HIERARCHY_DIR);
}
rootCgroup = new CgroupCommon(rootDir, h, h.getRootCgroups());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
index e55aabe..d003a14 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java
@@ -36,6 +36,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
/**
* supervisor Heartbeat, just write SupervisorInfo to ZK
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
class Heartbeat extends RunnableCallback {
@@ -67,8 +68,7 @@ class Heartbeat extends RunnableCallback {
* @param myHostName
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public Heartbeat(Map conf, StormClusterState stormClusterState,
- String supervisorId) {
+ public Heartbeat(Map conf, StormClusterState stormClusterState, String supervisorId) {
String myHostName = JStormServerUtils.getHostName(conf);
@@ -77,15 +77,12 @@ class Heartbeat extends RunnableCallback {
this.conf = conf;
this.myHostName = myHostName;
this.startTime = TimeUtils.current_time_secs();
- this.frequence =
- JStormUtils.parseInt(conf
- .get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+ this.frequence = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
this.hbUpdateTrigger = new AtomicBoolean(true);
initSupervisorInfo(conf);
- LOG.info("Successfully init supervisor heartbeat thread, "
- + supervisorInfo);
+ LOG.info("Successfully init supervisor heartbeat thread, " + supervisorInfo);
}
private void initSupervisorInfo(Map conf) {
@@ -96,32 +93,28 @@ class Heartbeat extends RunnableCallback {
boolean isLocaliP = false;
isLocaliP = myHostName.equals("127.0.0.1");
- if(isLocaliP){
+ if (isLocaliP) {
throw new Exception("the hostname which supervisor get is localhost");
}
- }catch(Exception e1){
+ } catch (Exception e1) {
LOG.error("get supervisor host error!", e1);
throw new RuntimeException(e1);
}
Set<Integer> ports = JStormUtils.listToSet(portList);
- supervisorInfo =
- new SupervisorInfo(myHostName, supervisorId, ports);
+ supervisorInfo = new SupervisorInfo(myHostName, supervisorId, ports);
} else {
- Set<Integer> ports = JStormUtils.listToSet(portList.subList(0, 1));
- supervisorInfo =
- new SupervisorInfo(myHostName, supervisorId, ports);
+ Set<Integer> ports = JStormUtils.listToSet(portList);
+ supervisorInfo = new SupervisorInfo(myHostName, supervisorId, ports);
}
}
@SuppressWarnings("unchecked")
public void update() {
supervisorInfo.setTimeSecs(TimeUtils.current_time_secs());
- supervisorInfo
- .setUptimeSecs((int) (TimeUtils.current_time_secs() - startTime));
+ supervisorInfo.setUptimeSecs((int) (TimeUtils.current_time_secs() - startTime));
try {
- stormClusterState
- .supervisor_heartbeat(supervisorId, supervisorInfo);
+ stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo);
} catch (Exception e) {
LOG.error("Failed to update SupervisorInfo to ZK");
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
index fad1346..4ece066 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java
@@ -62,6 +62,9 @@ import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
+/**
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
+ */
public class Httpserver implements Shutdownable {
private static Logger LOG = LoggerFactory.getLogger(Httpserver.class);
@@ -119,13 +122,11 @@ public class Httpserver implements Shutdownable {
}
- public void handlFailure(HttpExchange t, String errorMsg)
- throws IOException {
+ public void handlFailure(HttpExchange t, String errorMsg) throws IOException {
LOG.error(errorMsg);
byte[] data = errorMsg.getBytes();
- t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST,
- data.length);
+ t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST, data.length);
OutputStream os = t.getResponseBody();
os.write(data);
os.close();
@@ -136,8 +137,7 @@ public class Httpserver implements Shutdownable {
Map<String, String> paramMap = parseRawQuery(uri.getRawQuery());
LOG.info("Receive command " + paramMap);
- String cmd =
- paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD);
+ String cmd = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD);
if (StringUtils.isBlank(cmd) == true) {
handlFailure(t, "Bad Request, Not set command type");
return;
@@ -146,16 +146,13 @@ public class Httpserver implements Shutdownable {
if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW.equals(cmd)) {
handleShowLog(t, paramMap);
return;
- } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST
- .equals(cmd)) {
+ } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST.equals(cmd)) {
handleListDir(t, paramMap);
return;
- } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK
- .equals(cmd)) {
+ } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK.equals(cmd)) {
handleJstack(t, paramMap);
return;
- } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF
- .equals(cmd)) {
+ } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF.equals(cmd)) {
handleShowConf(t, paramMap);
return;
}
@@ -178,8 +175,7 @@ public class Httpserver implements Shutdownable {
if (isChild == false) {
LOG.error("Access one disallowed path: " + canonicalPath);
- throw new IOException(
- "Destination file/path is not accessible.");
+ throw new IOException("Destination file/path is not accessible.");
}
}
@@ -196,34 +192,27 @@ public class Httpserver implements Shutdownable {
return paramMap;
}
- private void handleShowLog(HttpExchange t, Map<String, String> paramMap)
- throws IOException {
+ private void handleShowLog(HttpExchange t, Map<String, String> paramMap) throws IOException {
Pair<Long, byte[]> logPair = queryLog(t, paramMap);
if (logPair == null) {
return;
}
- String size =
- String.format(
- HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT,
- logPair.getFirst());
+ String size = String.format(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT, logPair.getFirst());
byte[] sizeByts = size.getBytes();
byte[] logData = logPair.getSecond();
- t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length
- + logData.length);
+ t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length + logData.length);
OutputStream os = t.getResponseBody();
os.write(sizeByts);
os.write(logData);
os.close();
}
- private Pair<Long, byte[]> queryLog(HttpExchange t,
- Map<String, String> paramMap) throws IOException {
+ private Pair<Long, byte[]> queryLog(HttpExchange t, Map<String, String> paramMap) throws IOException {
- String fileParam =
- paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE);
+ String fileParam = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE);
if (StringUtils.isBlank(fileParam)) {
handlFailure(t, "Bad Request, Params Error, no log file name.");
return null;
@@ -242,8 +231,7 @@ public class Httpserver implements Shutdownable {
long position = fileSize - pageSize;
try {
- String posStr =
- paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS);
+ String posStr = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS);
if (StringUtils.isBlank(posStr) == false) {
long pos = Long.valueOf(posStr);
@@ -258,15 +246,12 @@ public class Httpserver implements Shutdownable {
long size = Math.min(fileSize - position, pageSize);
- LOG.info("logview " + logFile + ", position=" + position
- + ", size=" + size);
+ LOG.info("logview " + logFile + ", position=" + position + ", size=" + size);
fout = fc.map(FileChannel.MapMode.READ_ONLY, position, size);
ret = new byte[(int) size];
fout.get(ret);
- String str =
- new String(ret,
- ConfigExtension.getLogViewEncoding(conf));
+ String str = new String(ret, ConfigExtension.getLogViewEncoding(conf));
return new Pair<Long, byte[]>(fileSize, str.getBytes());
} catch (FileNotFoundException e) {
@@ -288,8 +273,7 @@ public class Httpserver implements Shutdownable {
}
byte[] getJSonFiles(String dir) throws Exception {
- Map<String, FileAttribute> fileMap =
- new HashMap<String, FileAttribute>();
+ Map<String, FileAttribute> fileMap = new HashMap<String, FileAttribute>();
String path = logDir;
if (dir != null) {
@@ -332,13 +316,11 @@ public class Httpserver implements Shutdownable {
return fileJsonStr.getBytes();
}
- void handleListDir(HttpExchange t, Map<String, String> paramMap)
- throws IOException {
+ void handleListDir(HttpExchange t, Map<String, String> paramMap) throws IOException {
byte[] filesJson = "Failed to get file list".getBytes();
try {
- String dir =
- paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR);
+ String dir = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR);
filesJson = getJSonFiles(dir);
} catch (Exception e) {
LOG.error("Failed to list files", e);
@@ -358,15 +340,12 @@ public class Httpserver implements Shutdownable {
try {
LOG.info("Begin to execute " + cmd);
- Process process =
- JStormUtils.launch_process(cmd,
- new HashMap<String, String>(), false);
+ Process process = JStormUtils.launch_process(cmd, new HashMap<String, String>(), false);
// Process process = Runtime.getRuntime().exec(sb.toString());
InputStream stdin = process.getInputStream();
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(stdin));
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
JStormUtils.sleepMs(1000);
@@ -398,10 +377,8 @@ public class Httpserver implements Shutdownable {
}
}
- void handleJstack(HttpExchange t, Map<String, String> paramMap)
- throws IOException {
- String workerPort =
- paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT);
+ void handleJstack(HttpExchange t, Map<String, String> paramMap) throws IOException {
+ String workerPort = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT);
if (workerPort == null) {
handlFailure(t, "Not set worker's port");
return;
@@ -425,8 +402,7 @@ public class Httpserver implements Shutdownable {
os.close();
}
- void handleShowConf(HttpExchange t, Map<String, String> paramMap)
- throws IOException {
+ void handleShowConf(HttpExchange t, Map<String, String> paramMap) throws IOException {
byte[] json = "Failed to get configuration".getBytes();
try {
@@ -452,8 +428,7 @@ public class Httpserver implements Shutdownable {
try {
hs = HttpServer.create(socketAddr, 0);
- hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW,
- new LogHandler(conf));
+ hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW, new LogHandler(conf));
hs.setExecutor(executor);
hs.start();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
index dfee522..8b52607 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java
@@ -50,8 +50,7 @@ import com.alibaba.jstorm.cluster.StormConfig;
* @version
*/
public class SandBoxMaker {
- private static final Logger LOG = LoggerFactory
- .getLogger(SandBoxMaker.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SandBoxMaker.class);
public static final String SANBOX_TEMPLATE_NAME = "sandbox.policy";
@@ -66,8 +65,7 @@ public class SandBoxMaker {
private final boolean isEnable;
- private final Map<String, String> replaceBaseMap =
- new HashMap<String, String>();
+ private final Map<String, String> replaceBaseMap = new HashMap<String, String>();
public SandBoxMaker(Map conf) {
this.conf = conf;
@@ -83,8 +81,7 @@ public class SandBoxMaker {
replaceBaseMap.put(JSTORM_HOME_KEY, jstormHome);
- replaceBaseMap.put(LOCAL_DIR_KEY,
- (String) conf.get(Config.STORM_LOCAL_DIR));
+ replaceBaseMap.put(LOCAL_DIR_KEY, (String) conf.get(Config.STORM_LOCAL_DIR));
LOG.info("JSTORM_HOME is " + jstormHome);
}
@@ -127,26 +124,19 @@ public class SandBoxMaker {
return line;
}
- public String generatePolicyFile(Map<String, String> replaceMap)
- throws IOException {
+ public String generatePolicyFile(Map<String, String> replaceMap) throws IOException {
// dynamic generate policy file, no static file
- String tmpPolicy =
- StormConfig.supervisorTmpDir(conf) + File.separator
- + UUID.randomUUID().toString();
+ String tmpPolicy = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString();
- InputStream inputStream =
- SandBoxMaker.class.getClassLoader().getResourceAsStream(
- SANBOX_TEMPLATE_NAME);
+ InputStream inputStream = SandBoxMaker.class.getClassLoader().getResourceAsStream(SANBOX_TEMPLATE_NAME);
- PrintWriter writer =
- new PrintWriter(new BufferedWriter(new FileWriter(tmpPolicy)));
+ PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(tmpPolicy)));
try {
InputStreamReader inputReader = new InputStreamReader(inputStream);
- BufferedReader reader =
- new BufferedReader(new LineNumberReader(inputReader));
+ BufferedReader reader = new BufferedReader(new LineNumberReader(inputReader));
String line = null;
while ((line = reader.readLine()) != null) {
@@ -177,8 +167,7 @@ public class SandBoxMaker {
* @return
* @throws IOException
*/
- public String sandboxPolicy(String workerId, Map<String, String> replaceMap)
- throws IOException {
+ public String sandboxPolicy(String workerId, Map<String, String> replaceMap) throws IOException {
if (isEnable == false) {
return "";
}
@@ -188,9 +177,7 @@ public class SandBoxMaker {
String tmpPolicy = generatePolicyFile(replaceMap);
File file = new File(tmpPolicy);
- String policyPath =
- StormConfig.worker_root(conf, workerId) + File.separator
- + SANBOX_TEMPLATE_NAME;
+ String policyPath = StormConfig.worker_root(conf, workerId) + File.separator + SANBOX_TEMPLATE_NAME;
File dest = new File(policyPath);
file.renameTo(dest);
@@ -210,9 +197,7 @@ public class SandBoxMaker {
SandBoxMaker maker = new SandBoxMaker(conf);
try {
- System.out.println("sandboxPolicy:"
- + maker.sandboxPolicy("simple",
- new HashMap<String, String>()));
+ System.out.println("sandboxPolicy:" + maker.sandboxPolicy("simple", new HashMap<String, String>()));
} catch (IOException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
index 0b906e3..71859a1 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java
@@ -37,6 +37,9 @@ import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
+/**
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
+ */
public class ShutdownWork extends RunnableCallback {
private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class);
@@ -54,14 +57,9 @@ public class ShutdownWork extends RunnableCallback {
*
* @return the topologys whose workers are shutdown successfully
*/
- public void shutWorker(Map conf, String supervisorId,
- Map<String, String> removed,
- ConcurrentHashMap<String, String> workerThreadPids,
- CgroupManager cgroupManager, boolean block,
- Map<String, Integer> killingWorkers,
- Map<String, Integer> taskCleanupTimeoutMap) {
- Map<String, List<String>> workerId2Pids =
- new HashMap<String, List<String>>();
+ public void shutWorker(Map conf, String supervisorId, Map<String, String> removed, ConcurrentHashMap<String, String> workerThreadPids,
+ CgroupManager cgroupManager, boolean block, Map<String, Integer> killingWorkers, Map<String, Integer> taskCleanupTimeoutMap) {
+ Map<String, List<String>> workerId2Pids = new HashMap<String, List<String>>();
boolean localMode = false;
@@ -78,8 +76,7 @@ public class ShutdownWork extends RunnableCallback {
try {
pids = getPid(conf, workerId);
} catch (IOException e1) {
- LOG.error("Failed to get pid for " + workerId + " of "
- + topologyId);
+ LOG.error("Failed to get pid for " + workerId + " of " + topologyId);
}
workerId2Pids.put(workerId, pids);
@@ -100,15 +97,10 @@ public class ShutdownWork extends RunnableCallback {
JStormUtils.process_killed(Integer.parseInt(pid));
}
- if (taskCleanupTimeoutMap != null
- && taskCleanupTimeoutMap.get(topologyId) != null) {
- maxWaitTime =
- Math.max(maxWaitTime,
- taskCleanupTimeoutMap.get(topologyId));
+ if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) {
+ maxWaitTime = Math.max(maxWaitTime, taskCleanupTimeoutMap.get(topologyId));
} else {
- maxWaitTime =
- Math.max(maxWaitTime, ConfigExtension
- .getTaskCleanupTimeoutSec(conf));
+ maxWaitTime = Math.max(maxWaitTime, ConfigExtension.getTaskCleanupTimeoutSec(conf));
}
} catch (Exception e) {
LOG.info("Failed to shutdown ", e);
@@ -126,8 +118,7 @@ public class ShutdownWork extends RunnableCallback {
List<String> pids = workerId2Pids.get(workerId);
int cleanupTimeout;
- if (taskCleanupTimeoutMap != null
- && taskCleanupTimeoutMap.get(topologyId) != null) {
+ if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) {
cleanupTimeout = taskCleanupTimeoutMap.get(topologyId);
} else {
cleanupTimeout = ConfigExtension.getTaskCleanupTimeoutSec(conf);
@@ -137,8 +128,7 @@ public class ShutdownWork extends RunnableCallback {
if (TimeUtils.current_time_secs() - initCleaupTime > cleanupTimeout) {
if (localMode == false) {
for (String pid : pids) {
- JStormUtils
- .ensure_process_killed(Integer.parseInt(pid));
+ JStormUtils.ensure_process_killed(Integer.parseInt(pid));
if (cgroupManager != null) {
cgroupManager.shutDownWorker(workerId, true);
}
@@ -169,14 +159,12 @@ public class ShutdownWork extends RunnableCallback {
// delete workerid dir, LOCAL_DIR/worker/workerid
PathUtils.rmr(StormConfig.worker_root(conf, workerId));
} catch (Exception e) {
- LOG.warn(e + "Failed to cleanup worker " + workerId
- + ". Will retry later");
+ LOG.warn(e + "Failed to cleanup worker " + workerId + ". Will retry later");
}
}
/**
- * When worker has been started by manually and supervisor, it will return
- * multiple pid
+ * When worker has been started by manually and supervisor, it will return multiple pid
*
* @param conf
* @param workerId
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
index c159f4b..c6bed45a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java
@@ -47,7 +47,6 @@ public class StateHeartbeat {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
index abc2448..c6c2877 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java
@@ -24,6 +24,7 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.alibaba.jstorm.daemon.worker.WorkerReportError;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,33 +52,28 @@ import com.alibaba.jstorm.utils.JStormUtils;
*
* Supevisor workflow 1. write SupervisorInfo to ZK
*
- * 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2
- * release useless worker 2.3 assgin new task to
- * /local-dir/supervisor/localstate 2.4 add one syncProcesses event
+ * 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2 release useless worker 2.3 assgin new task to /local-dir/supervisor/localstate
+ * 2.4 add one syncProcesses event
*
- * 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless
- * worker 3.2 start new worker
+ * 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless worker 3.2 start new worker
*
- * 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write
- * SupervisorInfo to ZK
+ * 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZK
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
public class Supervisor {
private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
-
/**
* create and start one supervisor
*
* @param conf : configurationdefault.yaml storm.yaml
* @param sharedContext : null (right now)
- * @return SupervisorManger: which is used to shutdown all workers and
- * supervisor
+ * @return SupervisorManger: which is used to shutdown all workers and supervisor
*/
@SuppressWarnings("rawtypes")
- public SupervisorManger mkSupervisor(Map conf, IContext sharedContext)
- throws Exception {
+ public SupervisorManger mkSupervisor(Map conf, IContext sharedContext) throws Exception {
LOG.info("Starting Supervisor with conf " + conf);
@@ -91,13 +87,15 @@ public class Supervisor {
* Step 2: create ZK operation instance StromClusterState
*/
- StormClusterState stormClusterState =
- Cluster.mk_storm_cluster_state(conf);
+ StormClusterState stormClusterState = Cluster.mk_storm_cluster_state(conf);
+
+ String hostName = JStormServerUtils.getHostName(conf);
+ WorkerReportError workerReportError =
+ new WorkerReportError(stormClusterState, hostName);
+
/*
- * Step 3, create LocalStat LocalStat is one KV database 4.1 create
- * LocalState instance; 4.2 get supervisorId, if no supervisorId, create
- * one
+ * Step 3, create LocalStat LocalStat is one KV database 4.1 create LocalState instance; 4.2 get supervisorId, if no supervisorId, create one
*/
LocalState localState = StormConfig.supervisorState(conf);
@@ -115,13 +113,11 @@ public class Supervisor {
// sync hearbeat to nimbus
Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId);
hb.update();
- AsyncLoopThread heartbeat =
- new AsyncLoopThread(hb, false, null, Thread.MIN_PRIORITY, true);
+ AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null, Thread.MIN_PRIORITY, true);
threads.add(heartbeat);
// Sync heartbeat to Apsara Container
- AsyncLoopThread syncContainerHbThread =
- SyncContainerHb.mkSupervisorInstance(conf);
+ AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf);
if (syncContainerHbThread != null) {
threads.add(syncContainerHbThread);
}
@@ -129,34 +125,22 @@ public class Supervisor {
// Step 6 create and start sync Supervisor thread
// every supervisor.monitor.frequency.secs second run SyncSupervisor
EventManagerImp processEventManager = new EventManagerImp();
- AsyncLoopThread processEventThread =
- new AsyncLoopThread(processEventManager);
+ AsyncLoopThread processEventThread = new AsyncLoopThread(processEventManager);
threads.add(processEventThread);
- ConcurrentHashMap<String, String> workerThreadPids =
- new ConcurrentHashMap<String, String>();
- SyncProcessEvent syncProcessEvent =
- new SyncProcessEvent(supervisorId, conf, localState,
- workerThreadPids, sharedContext);
+ ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>();
+ SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError);
EventManagerImp syncSupEventManager = new EventManagerImp();
- AsyncLoopThread syncSupEventThread =
- new AsyncLoopThread(syncSupEventManager);
+ AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager);
threads.add(syncSupEventThread);
SyncSupervisorEvent syncSupervisorEvent =
- new SyncSupervisorEvent(supervisorId, conf,
- processEventManager, syncSupEventManager,
- stormClusterState, localState, syncProcessEvent, hb);
-
- int syncFrequence =
- JStormUtils.parseInt(conf
- .get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
- EventManagerPusher syncSupervisorPusher =
- new EventManagerPusher(syncSupEventManager,
- syncSupervisorEvent, syncFrequence);
- AsyncLoopThread syncSupervisorThread =
- new AsyncLoopThread(syncSupervisorPusher);
+ new SyncSupervisorEvent(supervisorId, conf, processEventManager, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb);
+
+ int syncFrequence = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
+ EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequence);
+ AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
threads.add(syncSupervisorThread);
Httpserver httpserver = null;
@@ -168,9 +152,7 @@ public class Supervisor {
}
// SupervisorManger which can shutdown all supervisor and workers
- return new SupervisorManger(conf, supervisorId, threads,
- syncSupEventManager, processEventManager, httpserver,
- stormClusterState, workerThreadPids);
+ return new SupervisorManger(conf, supervisorId, threads, syncSupEventManager, processEventManager, httpserver, stormClusterState, workerThreadPids);
}
/**
@@ -210,7 +192,7 @@ public class Supervisor {
JStormUtils.redirectOutput("/dev/null");
initShutdownHook(supervisorManager);
-
+
while (supervisorManager.isFinishShutdown() == false) {
try {
Thread.sleep(1000);
@@ -222,11 +204,10 @@ public class Supervisor {
} catch (Exception e) {
LOG.error("Failed to start supervisor\n", e);
System.exit(1);
- }finally {
- LOG.info("Shutdown supervisor!!!");
+ } finally {
+ LOG.info("Shutdown supervisor!!!");
}
-
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
index f53ef72..ae89607 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang.builder.ToStringStyle;
/**
* Object stored in ZK /ZK-DIR/supervisors
- *
+ *
* @author Xin.Zhou/Longda
*/
public class SupervisorInfo implements Serializable {
@@ -46,8 +46,7 @@ public class SupervisorInfo implements Serializable {
private transient Set<Integer> availableWorkerPorts;
- public SupervisorInfo(String hostName, String supervisorId,
- Set<Integer> workerPorts) {
+ public SupervisorInfo(String hostName, String supervisorId, Set<Integer> workerPorts) {
this.hostName = hostName;
this.supervisorId = supervisorId;
this.workerPorts = workerPorts;
@@ -80,16 +79,19 @@ public class SupervisorInfo implements Serializable {
public Set<Integer> getWorkerPorts() {
return workerPorts;
}
- public void setAvailableWorkerPorts(Set<Integer> workerPorts){
+
+ public void setAvailableWorkerPorts(Set<Integer> workerPorts) {
if (availableWorkerPorts == null)
availableWorkerPorts = new HashSet<Integer>();
availableWorkerPorts.addAll(workerPorts);
}
+
public Set<Integer> getAvailableWorkerPorts() {
if (availableWorkerPorts == null)
availableWorkerPorts = new HashSet<Integer>();
return availableWorkerPorts;
}
+
public void setWorkerPorts(Set<Integer> workerPorts) {
this.workerPorts = workerPorts;
}
@@ -98,20 +100,11 @@ public class SupervisorInfo implements Serializable {
public int hashCode() {
final int prime = 31;
int result = 1;
- result =
- prime * result + ((hostName == null) ? 0 : hostName.hashCode());
- result =
- prime
- * result
- + ((supervisorId == null) ? 0 : supervisorId.hashCode());
- result =
- prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode());
- result =
- prime * result
- + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode());
- result =
- prime * result
- + ((workerPorts == null) ? 0 : workerPorts.hashCode());
+ result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
+ result = prime * result + ((supervisorId == null) ? 0 : supervisorId.hashCode());
+ result = prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode());
+ result = prime * result + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode());
+ result = prime * result + ((workerPorts == null) ? 0 : workerPorts.hashCode());
return result;
}
@@ -154,19 +147,17 @@ public class SupervisorInfo implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
/**
* get Map<supervisorId, hostname>
- *
+ *
* @param stormClusterState
* @param callback
* @return
*/
- public static Map<String, String> getNodeHost(
- Map<String, SupervisorInfo> supInfos) {
+ public static Map<String, String> getNodeHost(Map<String, SupervisorInfo> supInfos) {
Map<String, String> rtn = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
index a2806de..99c2c76 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java
@@ -39,9 +39,9 @@ import com.alibaba.jstorm.utils.PathUtils;
/**
* supervisor shutdown manager which can shutdown supervisor
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
-public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
- DaemonCommon, Runnable {
+public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable {
private static Logger LOG = LoggerFactory.getLogger(SupervisorManger.class);
@@ -67,11 +67,8 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
private volatile boolean isFinishShutdown = false;
- public SupervisorManger(Map conf, String supervisorId,
- Vector<AsyncLoopThread> threads,
- EventManager processesEventManager, EventManager eventManager,
- Httpserver httpserver, StormClusterState stormClusterState,
- ConcurrentHashMap<String, String> workerThreadPidsAtom) {
+ public SupervisorManger(Map conf, String supervisorId, Vector<AsyncLoopThread> threads, EventManager processesEventManager, EventManager eventManager,
+ Httpserver httpserver, StormClusterState stormClusterState, ConcurrentHashMap<String, String> workerThreadPidsAtom) {
this.conf = conf;
this.supervisorId = supervisorId;
this.shutdown = new AtomicBoolean(false);
@@ -104,8 +101,7 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
// } catch (InterruptedException e) {
// LOG.error(e.getMessage(), e);
// }
- LOG.info("Successfully shutdown thread:"
- + thread.getThread().getName());
+ LOG.info("Successfully shutdown thread:" + thread.getThread().getName());
}
eventManager.shutdown();
processesEventManager.shutdown();
@@ -144,15 +140,13 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon,
return;
}
List<String> myWorkerIds = PathUtils.read_dir_contents(path);
- HashMap<String, String> workerId2topologyIds =
- new HashMap<String, String>();
+ HashMap<String, String> workerId2topologyIds = new HashMap<String, String>();
for (String workerId : myWorkerIds) {
workerId2topologyIds.put(workerId, null);
}
- shutWorker(conf, supervisorId, workerId2topologyIds,
- workerThreadPidsAtom, null, true, null, null);
+ shutWorker(conf, supervisorId, workerId2topologyIds, workerThreadPidsAtom, null, true, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
index d90eb29..01f2a3a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import com.alibaba.jstorm.daemon.worker.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,12 +46,6 @@ import backtype.storm.utils.LocalState;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.daemon.worker.LocalAssignment;
-import com.alibaba.jstorm.daemon.worker.ProcessSimulator;
-import com.alibaba.jstorm.daemon.worker.State;
-import com.alibaba.jstorm.daemon.worker.Worker;
-import com.alibaba.jstorm.daemon.worker.WorkerHeartbeat;
-import com.alibaba.jstorm.daemon.worker.WorkerShutdown;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.PathUtils;
@@ -59,6 +54,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
/**
* SyncProcesses (1) kill bad worker (2) start new worker
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
class SyncProcessEvent extends ShutdownWork {
private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class);
@@ -78,8 +74,7 @@ class SyncProcessEvent extends ShutdownWork {
private SandBoxMaker sandBoxMaker;
/**
- * Due to the worker startTime is put in Supervisor memory, When supervisor
- * restart, the starting worker is likely to be killed
+ * Due to the worker startTime is put in Supervisor memory, When supervisor restart, the starting worker is likely to be killed
*/
private Map<String, Pair<Integer, Integer>> workerIdToStartTimeAndPort;
/**
@@ -95,6 +90,8 @@ class SyncProcessEvent extends ShutdownWork {
// private Supervisor supervisor;
private int lastTime;
+ private WorkerReportError workerReportError;
+
/**
* @param conf
* @param localState
@@ -104,10 +101,8 @@ class SyncProcessEvent extends ShutdownWork {
* @param workerThreadPidsReadLock
* @param workerThreadPidsWriteLock
*/
- public SyncProcessEvent(String supervisorId, Map conf,
- LocalState localState,
- ConcurrentHashMap<String, String> workerThreadPids,
- IContext sharedContext) {
+ public SyncProcessEvent(String supervisorId, Map conf, LocalState localState, ConcurrentHashMap<String, String> workerThreadPids,
+ IContext sharedContext, WorkerReportError workerReportError) {
this.supervisorId = supervisorId;
@@ -122,8 +117,7 @@ class SyncProcessEvent extends ShutdownWork {
this.sandBoxMaker = new SandBoxMaker(conf);
- this.workerIdToStartTimeAndPort =
- new HashMap<String, Pair<Integer, Integer>>();
+ this.workerIdToStartTimeAndPort = new HashMap<String, Pair<Integer, Integer>>();
this.needDownloadTopologys = new AtomicReference<Set>();
@@ -132,30 +126,27 @@ class SyncProcessEvent extends ShutdownWork {
}
killingWorkers = new HashMap<String, Integer>();
+ this.workerReportError = workerReportError;
}
/**
- * @@@ Change the old logic In the old logic, it will store
- * LS_LOCAL_ASSIGNMENTS Map<String, Integer> into LocalState
- *
- * But I don't think LS_LOCAL_ASSIGNMENTS is useful, so remove this
- * logic
+ * @@@ Change the old logic In the old logic, it will store LS_LOCAL_ASSIGNMENTS Map<String, Integer> into LocalState
+ *
+ * But I don't think LS_LOCAL_ASSIGNMENTS is useful, so remove this logic
*/
@SuppressWarnings("unchecked")
@Override
public void run() {
-
+
}
- public void run(Map<Integer, LocalAssignment> localAssignments) {
- LOG.debug("Syncing processes, interval seconds:"
- + TimeUtils.time_delta(lastTime));
+ public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds ) {
+ LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try {
/**
- * Step 1: get assigned tasks from localstat Map<port(type Integer),
- * LocalAssignment>
+ * Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
*/
if (localAssignments == null) {
localAssignments = new HashMap<Integer, LocalAssignment>();
@@ -163,13 +154,11 @@ class SyncProcessEvent extends ShutdownWork {
LOG.debug("Assigned tasks: " + localAssignments);
/**
- * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat
- * Map<workerid [WorkerHeartbeat, state]>
+ * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
*/
Map<String, StateHeartbeat> localWorkerStats = null;
try {
- localWorkerStats =
- getLocalWorkerStats(conf, localState, localAssignments);
+ localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
} catch (Exception e) {
LOG.error("Failed to get Local worker stats");
throw e;
@@ -177,20 +166,14 @@ class SyncProcessEvent extends ShutdownWork {
LOG.debug("Allocated: " + localWorkerStats);
/**
- * Step 3: kill Invalid Workers and remove killed worker from
- * localWorkerStats
+ * Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
*/
Map<String, Integer> taskCleaupTimeoutMap = null;
Set<Integer> keepPorts = null;
try {
- taskCleaupTimeoutMap =
- (Map<String, Integer>) localState
- .get(Common.LS_TASK_CLEANUP_TIMEOUT);
- keepPorts =
- killUselessWorkers(localWorkerStats, localAssignments,
- taskCleaupTimeoutMap);
- localState.put(Common.LS_TASK_CLEANUP_TIMEOUT,
- taskCleaupTimeoutMap);
+ taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
+ keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
+ localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
} catch (IOException e) {
LOG.error("Failed to kill workers", e);
}
@@ -202,7 +185,7 @@ class SyncProcessEvent extends ShutdownWork {
checkNeedUpdateTopologys(localWorkerStats, localAssignments);
// start new workers
- startNewWorkers(keepPorts, localAssignments);
+ startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);
} catch (Exception e) {
LOG.error("Failed Sync Process", e);
@@ -215,14 +198,13 @@ class SyncProcessEvent extends ShutdownWork {
* check all workers is failed or not
*/
@SuppressWarnings("unchecked")
- public void checkNeedUpdateTopologys(
- Map<String, StateHeartbeat> localWorkerStats,
- Map<Integer, LocalAssignment> localAssignments) throws Exception {
+ public void checkNeedUpdateTopologys(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments) throws Exception {
Set<String> topologys = new HashSet<String>();
+ Map<String, Long> topologyAssignTimeStamps = new HashMap<String, Long>();
- for (Map.Entry<Integer, LocalAssignment> entry : localAssignments
- .entrySet()) {
+ for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
topologys.add(entry.getValue().getTopologyId());
+ topologyAssignTimeStamps.put(entry.getValue().getTopologyId(), entry.getValue().getTimeStamp());
}
for (StateHeartbeat stateHb : localWorkerStats.values()) {
@@ -236,32 +218,27 @@ class SyncProcessEvent extends ShutdownWork {
Set<String> needRemoveTopologys = new HashSet<String>();
for (String topologyId : topologys) {
try {
- long lastModifytime =
- StormConfig.get_supervisor_topology_Bianrymodify_time(
- conf, topologyId);
- if ((currTime - lastModifytime) / 1000 < (JStormUtils.MIN_1 * 2)) {
+ long newAssignTime = topologyAssignTimeStamps.get(topologyId);
+ if ((currTime - newAssignTime) / 1000 < (JStormUtils.MIN_1 * 2)) {
LOG.debug("less 2 minite ,so removed " + topologyId);
needRemoveTopologys.add(topologyId);
}
} catch (Exception e) {
- LOG.error(
- "Failed to get the time of file last modification for topology"
- + topologyId, e);
+ LOG.error("Failed to get the time of file last modification for topology" + topologyId, e);
needRemoveTopologys.add(topologyId);
}
}
topologys.removeAll(needRemoveTopologys);
if (topologys.size() > 0) {
- LOG.debug("Following topologys is going to re-download the jars, "
- + topologys);
+ LOG.debug("Following topologys is going to re-download the jars, " + topologys);
}
needDownloadTopologys.set(topologys);
}
/**
* mark all new Workers
- *
+ *
* @param workerIds
* @pdOid 52b11418-7474-446d-bff5-0ecd68f4954f
*/
@@ -271,40 +248,32 @@ class SyncProcessEvent extends ShutdownWork {
for (Entry<Integer, String> entry : workerIds.entrySet()) {
String oldWorkerIds = portToWorkerId.get(entry.getKey());
- if(oldWorkerIds != null){
+ if (oldWorkerIds != null) {
workerIdToStartTimeAndPort.remove(oldWorkerIds);
// update portToWorkerId
- LOG.info("exit port is still occupied by old wokerId, so remove unuseful " +
- oldWorkerIds+ " form workerIdToStartTimeAndPort");
+ LOG.info("exit port is still occupied by old wokerId, so remove unuseful " + oldWorkerIds + " form workerIdToStartTimeAndPort");
}
portToWorkerId.put(entry.getKey(), entry.getValue());
- workerIdToStartTimeAndPort.put(entry.getValue(),
- new Pair<Integer, Integer>(startTime, entry.getKey()));
+ workerIdToStartTimeAndPort.put(entry.getValue(), new Pair<Integer, Integer>(startTime, entry.getKey()));
}
}
/**
- * check new workers if the time is not > *
- * SUPERVISOR_WORKER_START_TIMEOUT_SECS, otherwise info failed
- *
+ * check new workers if the time is not > * SUPERVISOR_WORKER_START_TIMEOUT_SECS, otherwise info failed
+ *
* @param conf
* @pdOid f0a6ab43-8cd3-44e1-8fd3-015a2ec51c6a
*/
- public void checkNewWorkers(Map conf) throws IOException,
- InterruptedException {
+ public void checkNewWorkers(Map conf) throws IOException, InterruptedException {
Set<String> workers = new HashSet<String>();
- for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort
- .entrySet()) {
+ for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort.entrySet()) {
String workerId = entry.getKey();
int startTime = entry.getValue().getFirst();
LocalState ls = StormConfig.worker_state(conf, workerId);
- WorkerHeartbeat whb =
- (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
+ WorkerHeartbeat whb = (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
if (whb == null) {
- if ((TimeUtils.current_time_secs() - startTime) < JStormUtils
- .parseInt(conf
- .get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS))) {
+ if ((TimeUtils.current_time_secs() - startTime) < JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS))) {
LOG.info(workerId + " still hasn't started");
} else {
LOG.error("Failed to start Worker " + workerId);
@@ -321,15 +290,15 @@ class SyncProcessEvent extends ShutdownWork {
this.portToWorkerId.remove(port);
}
}
- public Map<Integer, String> getPortToWorkerId(){
+
+ public Map<Integer, String> getPortToWorkerId() {
return portToWorkerId;
}
/**
* get localstat approved workerId's map
- *
- * @return Map<workerid [workerheart, state]> [workerheart, state] is also a
- * map, key is "workheartbeat" and "state"
+ *
+ * @return Map<workerid [workerheart, state]> [workerheart, state] is also a map, key is "workheartbeat" and "state"
* @param conf
* @param localState
* @param assignedTasks
@@ -337,22 +306,17 @@ class SyncProcessEvent extends ShutdownWork {
* @pdOid 11c9bebb-d082-4c51-b323-dd3d5522a649
*/
@SuppressWarnings("unchecked")
- public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf,
- LocalState localState, Map<Integer, LocalAssignment> assignedTasks)
- throws Exception {
+ public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf, LocalState localState, Map<Integer, LocalAssignment> assignedTasks) throws Exception {
- Map<String, StateHeartbeat> workeridHbstate =
- new HashMap<String, StateHeartbeat>();
+ Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>();
int now = TimeUtils.current_time_secs();
/**
- * Get Map<workerId, WorkerHeartbeat> from
- * local_dir/worker/ids/heartbeat
+ * Get Map<workerId, WorkerHeartbeat> from local_dir/worker/ids/heartbeat
*/
Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf);
- for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat
- .entrySet()) {
+ for (Entry<String, WorkerHeartbeat> entry : idToHeartbeat.entrySet()) {
String workerid = entry.getKey().toString();
@@ -366,10 +330,9 @@ class SyncProcessEvent extends ShutdownWork {
if (timeToPort != null) {
LocalAssignment localAssignment = assignedTasks.get(timeToPort.getSecond());
if (localAssignment == null) {
- LOG.info("Following worker don't exit assignment, so remove this port="
- + timeToPort.getSecond());
+ LOG.info("Following worker don't exit assignment, so remove this port=" + timeToPort.getSecond());
state = State.disallowed;
- //workerId is disallowed ,so remove it from workerIdToStartTimeAndPort
+ // workerId is disallowed ,so remove it from workerIdToStartTimeAndPort
Integer port = this.workerIdToStartTimeAndPort.get(workerid).getSecond();
this.workerIdToStartTimeAndPort.remove(workerid);
this.portToWorkerId.remove(port);
@@ -381,12 +344,21 @@ class SyncProcessEvent extends ShutdownWork {
// isn't assigned task
state = State.disallowed;
- } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf
- .get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {//
+ } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+ if (killingWorkers.containsKey(workerid) == false) {
+ String outTimeInfo = " it is likely to be out of memory, the worker is time out ";
+ workerReportError.report(whb.getTopologyId(), whb.getPort(),
+ whb.getTaskIds(), outTimeInfo);
+ }
state = State.timedOut;
} else {
if (isWorkerDead(workerid)) {
+ if (killingWorkers.containsKey(workerid) == false){
+ String workeDeadInfo = "Worker is dead ";
+ workerReportError.report(whb.getTopologyId(), whb.getPort(),
+ whb.getTaskIds(), workeDeadInfo);
+ }
state = State.timedOut;
} else {
state = State.valid;
@@ -395,13 +367,10 @@ class SyncProcessEvent extends ShutdownWork {
if (state != State.valid) {
if (killingWorkers.containsKey(workerid) == false)
- LOG.info("Worker:" + workerid + " state:" + state
- + " WorkerHeartbeat:" + whb + " assignedTasks:"
- + assignedTasks + " at supervisor time-secs " + now);
+ LOG.info("Worker:" + workerid + " state:" + state + " WorkerHeartbeat:" + whb + " assignedTasks:" + assignedTasks
+ + " at supervisor time-secs " + now);
} else {
- LOG.debug("Worker:" + workerid + " state:" + state
- + " WorkerHeartbeat: " + whb
- + " at supervisor time-secs " + now);
+ LOG.debug("Worker:" + workerid + " state:" + state + " WorkerHeartbeat: " + whb + " at supervisor time-secs " + now);
}
workeridHbstate.put(workerid, new StateHeartbeat(state, whb));
@@ -412,32 +381,26 @@ class SyncProcessEvent extends ShutdownWork {
/**
* check whether the workerheartbeat is allowed in the assignedTasks
- *
+ *
* @param whb : WorkerHeartbeat
* @param assignedTasks
- * @return boolean if true, the assignments(LS-LOCAL-ASSIGNMENTS) is match
- * with workerheart if fasle, is not matched
+ * @return boolean if true, the assignments(LS-LOCAL-ASSIGNMENTS) is match with workerheart if fasle, is not matched
*/
- public boolean matchesAssignment(WorkerHeartbeat whb,
- Map<Integer, LocalAssignment> assignedTasks) {
+ public boolean matchesAssignment(WorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedTasks) {
boolean isMatch = true;
LocalAssignment localAssignment = assignedTasks.get(whb.getPort());
if (localAssignment == null) {
- LOG.debug("Following worker has been removed, port="
- + whb.getPort() + ", assignedTasks=" + assignedTasks);
+ LOG.debug("Following worker has been removed, port=" + whb.getPort() + ", assignedTasks=" + assignedTasks);
isMatch = false;
} else if (!whb.getTopologyId().equals(localAssignment.getTopologyId())) {
// topology id not equal
- LOG.info("topology id not equal whb=" + whb.getTopologyId()
- + ",localAssignment=" + localAssignment.getTopologyId());
+ LOG.info("topology id not equal whb=" + whb.getTopologyId() + ",localAssignment=" + localAssignment.getTopologyId());
isMatch = false;
}/*
- * else if (!(whb.getTaskIds().equals(localAssignment.getTaskIds()))) {
- * // task-id isn't equal LOG.info("task-id isn't equal whb=" +
- * whb.getTaskIds() + ",localAssignment=" +
- * localAssignment.getTaskIds()); isMatch = false; }
+ * else if (!(whb.getTaskIds().equals(localAssignment.getTaskIds()))) { // task-id isn't equal LOG.info("task-id isn't equal whb=" + whb.getTaskIds() +
+ * ",localAssignment=" + localAssignment.getTaskIds()); isMatch = false; }
*/
return isMatch;
@@ -445,17 +408,15 @@ class SyncProcessEvent extends ShutdownWork {
/**
* get all workers heartbeats of the supervisor
- *
+ *
* @param conf
* @return Map<workerId, WorkerHeartbeat>
* @throws IOException
* @throws IOException
*/
- public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf)
- throws Exception {
+ public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception {
- Map<String, WorkerHeartbeat> workerHeartbeats =
- new HashMap<String, WorkerHeartbeat>();
+ Map<String, WorkerHeartbeat> workerHeartbeats = new HashMap<String, WorkerHeartbeat>();
// get the path: STORM-LOCAL-DIR/workers
String path = StormConfig.worker_root(conf);
@@ -480,20 +441,19 @@ class SyncProcessEvent extends ShutdownWork {
/**
* get worker heartbeat by workerid
- *
+ *
* @param conf
* @param workerId
* @returns WorkerHeartbeat
* @throws IOException
*/
- public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId)
- throws Exception {
+ public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) throws Exception {
try {
LocalState ls = StormConfig.worker_state(conf, workerId);
return (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT);
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Failed to get worker Heartbeat", e);
return null;
}
@@ -502,7 +462,7 @@ class SyncProcessEvent extends ShutdownWork {
/**
* launch a worker in local mode
- *
+ *
* @param conf
* @param sharedcontext
* @param topologyId
@@ -512,17 +472,12 @@ class SyncProcessEvent extends ShutdownWork {
* @param workerThreadPidsAtom
* @throws Exception
*/
- public void launchWorker(Map conf, IContext sharedcontext,
- String topologyId, String supervisorId, Integer port,
- String workerId,
- ConcurrentHashMap<String, String> workerThreadPidsAtom)
- throws Exception {
+ public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId,
+ ConcurrentHashMap<String, String> workerThreadPidsAtom) throws Exception {
String pid = UUID.randomUUID().toString();
- WorkerShutdown worker =
- Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId,
- port, workerId, null);
+ WorkerShutdown worker = Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId, port, workerId, null);
ProcessSimulator.registerProcess(pid, worker);
@@ -534,13 +489,11 @@ class SyncProcessEvent extends ShutdownWork {
private Set<String> setFilterJars(Map totalConf) {
Set<String> filterJars = new HashSet<String>();
- boolean enableClassloader =
- ConfigExtension.isEnableTopologyClassLoader(totalConf);
+ boolean enableClassloader = ConfigExtension.isEnableTopologyClassLoader(totalConf);
if (enableClassloader == false) {
// avoid logback vs log4j conflict
boolean enableLog4j = false;
- String userDefLog4jConf =
- ConfigExtension.getUserDefinedLog4jConf(totalConf);
+ String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(totalConf);
if (StringUtils.isBlank(userDefLog4jConf) == false) {
enableLog4j = true;
}
@@ -601,8 +554,7 @@ class SyncProcessEvent extends ShutdownWork {
}
if (stormHome != null) {
- List<String> stormHomeFiles =
- PathUtils.read_dir_contents(stormHome);
+ List<String> stormHomeFiles = PathUtils.read_dir_contents(stormHome);
for (String file : stormHomeFiles) {
if (file.endsWith(".jar")) {
@@ -610,13 +562,10 @@ class SyncProcessEvent extends ShutdownWork {
}
}
- List<String> stormLibFiles =
- PathUtils.read_dir_contents(stormHome + File.separator
- + "lib");
+ List<String> stormLibFiles = PathUtils.read_dir_contents(stormHome + File.separator + "lib");
for (String file : stormLibFiles) {
if (file.endsWith(".jar")) {
- classSet.add(stormHome + File.separator + "lib"
- + File.separator + file);
+ classSet.add(stormHome + File.separator + "lib" + File.separator + file);
}
}
@@ -646,8 +595,7 @@ class SyncProcessEvent extends ShutdownWork {
String childopts = " ";
if (stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) != null) {
- childopts +=
- (String) stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
+ childopts += (String) stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
} else if (ConfigExtension.getWorkerGc(stormConf) != null) {
childopts += ConfigExtension.getWorkerGc(stormConf);
}
@@ -655,8 +603,7 @@ class SyncProcessEvent extends ShutdownWork {
return childopts;
}
- public String getLogParameter(Map conf, String stormHome,
- String topologyName, int port) {
+ public String getLogParameter(Map conf, String stormHome, String topologyName, int port) {
final String LOGBACK_CONF_TAG = "logback.configurationFile";
final String LOGBACK_CONF_TAG_CMD = " -D" + LOGBACK_CONF_TAG + "=";
final String DEFAULT_LOG_CONF = "jstorm.logback.xml";
@@ -664,13 +611,15 @@ class SyncProcessEvent extends ShutdownWork {
String logFileName = JStormUtils.genLogName(topologyName, port);
// String logFileName = topologyId + "-worker-" + port + ".log";
+
StringBuilder commandSB = new StringBuilder();
commandSB.append(" -Dlogfile.name=");
commandSB.append(logFileName);
+ commandSB.append(" -Dtopology.name=").append(topologyName);
+
// commandSB.append(" -Dlog4j.ignoreTCL=true");
- String userDefLogbackConf =
- ConfigExtension.getUserDefinedLogbackConf(conf);
+ String userDefLogbackConf = ConfigExtension.getUserDefinedLogbackConf(conf);
String logConf = System.getProperty(LOGBACK_CONF_TAG);
if (StringUtils.isBlank(userDefLogbackConf) == false) {
@@ -679,9 +628,7 @@ class SyncProcessEvent extends ShutdownWork {
} else if (StringUtils.isBlank(logConf) == false) {
commandSB.append(LOGBACK_CONF_TAG_CMD).append(logConf);
} else if (StringUtils.isBlank(stormHome) == false) {
- commandSB.append(LOGBACK_CONF_TAG_CMD).append(stormHome)
- .append(File.separator).append("conf")
- .append(File.separator).append(DEFAULT_LOG_CONF);
+ commandSB.append(LOGBACK_CONF_TAG_CMD).append(stormHome).append(File.separator).append("conf").append(File.separator).append(DEFAULT_LOG_CONF);
} else {
commandSB.append(LOGBACK_CONF_TAG_CMD + DEFAULT_LOG_CONF);
}
@@ -690,38 +637,35 @@ class SyncProcessEvent extends ShutdownWork {
String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(conf);
if (StringUtils.isBlank(userDefLog4jConf) == false) {
LOG.info("Use user fined log4j conf " + userDefLog4jConf);
- commandSB.append(" -D" + LOG4J_CONF_TAG + "=").append(
- userDefLog4jConf);
+ commandSB.append(" -D" + LOG4J_CONF_TAG + "=").append(userDefLog4jConf);
}
return commandSB.toString();
}
- private String getGcDumpParam(Map totalConf) {
+ private String getGcDumpParam(String topologyName, Map totalConf) {
// String gcPath = ConfigExtension.getWorkerGcPath(totalConf);
String gcPath = JStormUtils.getLogDir();
Date now = new Date();
String nowStr = TimeFormat.getSecond(now);
- StringBuilder gc = new StringBuilder();
-
+ StringBuilder gc = new StringBuilder(256);
gc.append(" -Xloggc:");
- gc.append(gcPath);
- gc.append(File.separator);
- gc.append("%TOPOLOGYID%-worker-%ID%-");
- gc.append(nowStr);
+ gc.append(gcPath).append(File.separator);
+ gc.append(topologyName).append(File.separator);
+ gc.append("%TOPOLOGYID%-worker-%ID%");
gc.append("-gc.log -verbose:gc -XX:HeapDumpPath=");
- gc.append(gcPath).append(File.separator).append("java-%TOPOLOGYID%-")
- .append(nowStr).append(".hprof");
+ gc.append(gcPath).append(File.separator).append(topologyName).append(File.separator).append("java-%TOPOLOGYID%-").append(nowStr).append(".hprof");
gc.append(" ");
+
return gc.toString();
}
/**
* launch a worker in distributed mode
- *
+ *
* @param conf
* @param sharedcontext
* @param topologyId
@@ -731,20 +675,17 @@ class SyncProcessEvent extends ShutdownWork {
* @throws IOException
* @pdOid 6ea369dd-5ce2-4212-864b-1f8b2ed94abb
*/
- public void launchWorker(Map conf, IContext sharedcontext,
- String topologyId, String supervisorId, Integer port,
- String workerId, LocalAssignment assignment) throws IOException {
+ public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId, LocalAssignment assignment)
+ throws IOException {
// STORM-LOCAL-DIR/supervisor/stormdist/topologyId
- String stormroot =
- StormConfig.supervisor_stormdist_root(conf, topologyId);
+ String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
// STORM-LOCAL-DIR/supervisor/stormdist/topologyId/stormjar.jar
String stormjar = StormConfig.stormjar_path(stormroot);
// get supervisor conf
- Map stormConf =
- StormConfig.read_supervisor_topology_conf(conf, topologyId);
+ Map stormConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
Map totalConf = new HashMap();
totalConf.putAll(conf);
@@ -761,12 +702,13 @@ class SyncProcessEvent extends ShutdownWork {
String stormhome = System.getProperty("jstorm.home");
long memSize = assignment.getMem();
+ long memMinSize = ConfigExtension.getMemMinSizePerWorker(totalConf);
int cpuNum = assignment.getCpu();
long memGsize = memSize / JStormUtils.SIZE_1_G;
int gcThreadsNum = memGsize > 4 ? (int) (memGsize * 1.5) : 4;
String childopts = getChildOpts(totalConf);
- childopts += getGcDumpParam(totalConf);
+ childopts += getGcDumpParam(Common.getTopologyNameById(topologyId), totalConf);
Map<String, String> environment = new HashMap<String, String>();
@@ -776,15 +718,13 @@ class SyncProcessEvent extends ShutdownWork {
environment.put("REDIRECT", "false");
}
- environment.put("LD_LIBRARY_PATH",
- (String) totalConf.get(Config.JAVA_LIBRARY_PATH));
+ environment.put("LD_LIBRARY_PATH", (String) totalConf.get(Config.JAVA_LIBRARY_PATH));
StringBuilder commandSB = new StringBuilder();
try {
if (this.cgroupManager != null) {
- commandSB.append(cgroupManager.startNewWorker(totalConf,
- cpuNum, workerId));
+ commandSB.append(cgroupManager.startNewWorker(totalConf, cpuNum, workerId));
}
} catch (Exception e) {
LOG.error("fail to prepare cgroup to workerId: " + workerId, e);
@@ -793,15 +733,21 @@ class SyncProcessEvent extends ShutdownWork {
// commandSB.append("java -server -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n ");
commandSB.append("java -server ");
- commandSB.append(" -Xms" + memSize);
+ commandSB.append(" -Xms" + memMinSize);
commandSB.append(" -Xmx" + memSize + " ");
- commandSB.append(" -Xmn" + memSize / 3 + " ");
- commandSB.append(" -XX:PermSize=" + memSize / 16);
- commandSB.append(" -XX:MaxPermSize=" + memSize / 8);
+ if (memMinSize < (memSize / 2))
+ commandSB.append(" -Xmn" + memMinSize + " ");
+ else
+ commandSB.append(" -Xmn" + memSize / 2 + " ");
+ if (memGsize >= 2) {
+ commandSB.append(" -XX:PermSize=" + memSize / 32);
+ } else {
+ commandSB.append(" -XX:PermSize=" + memSize / 16);
+ }
+ commandSB.append(" -XX:MaxPermSize=" + memSize / 16);
commandSB.append(" -XX:ParallelGCThreads=" + gcThreadsNum);
commandSB.append(" " + childopts);
- commandSB.append(" "
- + (assignment.getJvm() == null ? "" : assignment.getJvm()));
+ commandSB.append(" " + (assignment.getJvm() == null ? "" : assignment.getJvm()));
commandSB.append(" -Djava.library.path=");
commandSB.append((String) totalConf.get(Config.JAVA_LIBRARY_PATH));
@@ -811,20 +757,18 @@ class SyncProcessEvent extends ShutdownWork {
commandSB.append(stormhome);
}
- commandSB.append(getLogParameter(totalConf, stormhome,
- assignment.getTopologyName(), port));
+ String logDir = System.getProperty("jstorm.log.dir");
+ if (logDir != null)
+ commandSB.append(" -Djstorm.log.dir=").append(logDir);
+ commandSB.append(getLogParameter(totalConf, stormhome, assignment.getTopologyName(), port));
String classpath = getClassPath(stormjar, stormhome, totalConf);
- String workerClassPath =
- (String) totalConf.get(Config.TOPOLOGY_CLASSPATH);
- List<String> otherLibs =
- (List<String>) stormConf
- .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
+ String workerClassPath = (String) totalConf.get(Config.TOPOLOGY_CLASSPATH);
+ List<String> otherLibs = (List<String>) stormConf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
StringBuilder sb = new StringBuilder();
if (otherLibs != null) {
for (String libName : otherLibs) {
- sb.append(StormConfig.stormlib_path(stormroot, libName))
- .append(":");
+ sb.append(StormConfig.stormlib_path(stormroot, libName)).append(":");
}
}
workerClassPath = workerClassPath + ":" + sb.toString();
@@ -832,8 +776,7 @@ class SyncProcessEvent extends ShutdownWork {
Map<String, String> policyReplaceMap = new HashMap<String, String>();
String realClassPath = classpath + ":" + workerClassPath;
policyReplaceMap.put(SandBoxMaker.CLASS_PATH_KEY, realClassPath);
- commandSB
- .append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap));
+ commandSB.append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap));
commandSB.append(" -cp ");
// commandSB.append(workerClassPath + ":");
@@ -871,9 +814,7 @@ class SyncProcessEvent extends ShutdownWork {
JStormUtils.launch_process(cmd, environment, true);
}
- private Set<Integer> killUselessWorkers(
- Map<String, StateHeartbeat> localWorkerStats,
- Map<Integer, LocalAssignment> localAssignments,
+ private Set<Integer> killUselessWorkers(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments,
Map<String, Integer> taskCleanupTimeoutMap) {
Map<String, String> removed = new HashMap<String, String>();
Set<Integer> keepPorts = new HashSet<Integer>();
@@ -882,8 +823,7 @@ class SyncProcessEvent extends ShutdownWork {
String workerid = entry.getKey();
StateHeartbeat hbstate = entry.getValue();
- if (workerIdToStartTimeAndPort.containsKey(workerid)
- && hbstate.getState().equals(State.notStarted))
+ if (workerIdToStartTimeAndPort.containsKey(workerid) && hbstate.getState().equals(State.notStarted))
continue;
if (hbstate.getState().equals(State.valid)) {
@@ -891,8 +831,7 @@ class SyncProcessEvent extends ShutdownWork {
keepPorts.add(hbstate.getHeartbeat().getPort());
} else {
if (hbstate.getHeartbeat() != null) {
- removed.put(workerid, hbstate.getHeartbeat()
- .getTopologyId());
+ removed.put(workerid, hbstate.getHeartbeat().getTopologyId());
} else {
removed.put(workerid, null);
}
@@ -910,14 +849,12 @@ class SyncProcessEvent extends ShutdownWork {
}
}
- shutWorker(conf, supervisorId, removed, workerThreadPids,
- cgroupManager, false, killingWorkers, taskCleanupTimeoutMap);
+ shutWorker(conf, supervisorId, removed, workerThreadPids, cgroupManager, false, killingWorkers, taskCleanupTimeoutMap);
Set<String> activeTopologys = new HashSet<String>();
if (killingWorkers.size() == 0) {
// When all workers under killing are killed successfully,
// clean the task cleanup timeout map correspondingly.
- for (Entry<Integer, LocalAssignment> entry : localAssignments
- .entrySet()) {
+ for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
activeTopologys.add(entry.getValue().getTopologyId());
}
@@ -936,8 +873,7 @@ class SyncProcessEvent extends ShutdownWork {
localWorkerStats.remove(removedWorkerId);
}
// Keep the workers which are still under starting
- for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort
- .entrySet()) {
+ for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort.entrySet()) {
String workerId = entry.getKey();
StateHeartbeat hbstate = localWorkerStats.get(workerId);
if (hbstate != null)
@@ -948,14 +884,12 @@ class SyncProcessEvent extends ShutdownWork {
return keepPorts;
}
- private void startNewWorkers(Set<Integer> keepPorts,
- Map<Integer, LocalAssignment> localAssignments) throws Exception {
+ private void startNewWorkers(Set<Integer> keepPorts, Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds)
+ throws Exception {
/**
- * Step 4: get reassigned tasks, which is in assignedTasks, but not in
- * keeperPorts Map<port(type Integer), LocalAssignment>
+ * Step 4: get reassigned tasks, which is in assignedTasks, but not in keeperPorts Map<port(type Integer), LocalAssignment>
*/
- Map<Integer, LocalAssignment> newWorkers =
- JStormUtils.select_keys_pred(keepPorts, localAssignments);
+ Map<Integer, LocalAssignment> newWorkers = JStormUtils.select_keys_pred(keepPorts, localAssignments);
/**
* Step 5: generate new work ids
@@ -965,7 +899,10 @@ class SyncProcessEvent extends ShutdownWork {
for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) {
Integer port = entry.getKey();
LocalAssignment assignment = entry.getValue();
-
+ if (assignment != null && assignment.getTopologyId() != null && downloadFailedTopologyIds.contains(assignment.getTopologyId())) {
+ LOG.info("Can't start this worker: " + port + " about the topology: " + assignment.getTopologyId() + ", due to the damaged binary !!");
+ continue;
+ }
String workerId = UUID.randomUUID().toString();
newWorkerIds.put(port, workerId);
@@ -994,18 +931,14 @@ class SyncProcessEvent extends ShutdownWork {
String clusterMode = StormConfig.cluster_mode(conf);
if (clusterMode.equals("distributed")) {
- launchWorker(conf, sharedContext,
- assignment.getTopologyId(), supervisorId, port,
- workerId, assignment);
+ launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment);
} else if (clusterMode.equals("local")) {
- launchWorker(conf, sharedContext,
- assignment.getTopologyId(), supervisorId, port,
- workerId, workerThreadPids);
+ launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, workerThreadPids);
}
} catch (Exception e) {
- String errorMsg =
- "Failed to launchWorker workerId:" + workerId + ":"
- + port;
+ workerReportError.report(assignment.getTopologyId(), port,
+ assignment.getTaskIds(), new String(JStormUtils.getErrorInfo(e)));
+ String errorMsg = "Failed to launchWorker workerId:" + workerId + ":" + port;
LOG.error(errorMsg, e);
throw e;
}
@@ -1013,8 +946,7 @@ class SyncProcessEvent extends ShutdownWork {
}
/**
- * FIXME, workerIds should be Set, not Collection, but here simplify the
- * logic
+ * FIXME, workerIds should be Set, not Collection, but here simplify the logic
*/
markAllNewWorkers(newWorkerIds);
// try {
@@ -1027,6 +959,9 @@ class SyncProcessEvent extends ShutdownWork {
}
boolean isWorkerDead(String workerId) {
+ if (ConfigExtension.isCheckWorkerAliveBySystemInfo(conf) == false) {
+ return false;
+ }
try {
List<String> pids = getPid(conf, workerId);
@@ -1046,9 +981,7 @@ class SyncProcessEvent extends ShutdownWork {
return true;
} catch (IOException e) {
- LOG.info(
- "Failed to check whether worker is dead through /proc/pid",
- e);
+ LOG.info("Failed to check whether worker is dead through /proc/pid", e);
return false;
}