You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:50 UTC
[12/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
index 32aa0f1..9ca1d8a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java
@@ -17,33 +17,11 @@
*/
package com.alibaba.jstorm.daemon.supervisor;
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.io.FileExistsException;
-import org.apache.commons.io.FileUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
import backtype.storm.utils.LocalState;
-
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.worker.LocalAssignment;
@@ -55,16 +33,26 @@ import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
+import org.apache.commons.io.FileExistsException;
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.*;
+import java.util.Map.Entry;
/**
- * supervisor SynchronizeSupervisor workflow (1) writer local assignment to
- * LocalState (2) download new Assignment's topology (3) remove useless Topology
- * (4) push one SyncProcessEvent to SyncProcessEvent's EventManager
+ * supervisor SynchronizeSupervisor workflow (1) writer local assignment to LocalState (2) download new Assignment's topology (3) remove useless Topology (4)
+ * push one SyncProcessEvent to SyncProcessEvent's EventManager
+ * @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
class SyncSupervisorEvent extends RunnableCallback {
- private static final Logger LOG = LoggerFactory
- .getLogger(SyncSupervisorEvent.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class);
// private Supervisor supervisor;
@@ -95,10 +83,8 @@ class SyncSupervisorEvent extends RunnableCallback {
* @param localState
* @param syncProcesses
*/
- public SyncSupervisorEvent(String supervisorId, Map conf,
- EventManager processEventManager, EventManager syncSupEventManager,
- StormClusterState stormClusterState, LocalState localState,
- SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
+ public SyncSupervisorEvent(String supervisorId, Map conf, EventManager processEventManager, EventManager syncSupEventManager,
+ StormClusterState stormClusterState, LocalState localState, SyncProcessEvent syncProcesses, Heartbeat heartbeat) {
this.syncProcesses = syncProcesses;
this.processEventManager = processEventManager;
@@ -112,38 +98,30 @@ class SyncSupervisorEvent extends RunnableCallback {
@Override
public void run() {
- LOG.debug("Synchronizing supervisor, interval seconds:"
- + TimeUtils.time_delta(lastTime));
+ LOG.debug("Synchronizing supervisor, interval seconds:" + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try {
- RunnableCallback syncCallback =
- new EventManagerZkPusher(this, syncSupEventManager);
+ RunnableCallback syncCallback = new EventManagerZkPusher(this, syncSupEventManager);
/**
- * Step 1: get all assignments and register /ZK-dir/assignment and
- * every assignment watch
+ * Step 1: get all assignments and register /ZK-dir/assignment and every assignment watch
*
*/
- Map<String, Assignment> assignments =
- Cluster.get_all_assignment(stormClusterState, syncCallback);
+ Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, syncCallback);
LOG.debug("Get all assignments " + assignments);
/**
- * Step 2: get topologyIds list from
- * STORM-LOCAL-DIR/supervisor/stormdist/
+ * Step 2: get topologyIds list from STORM-LOCAL-DIR/supervisor/stormdist/
*/
- List<String> downloadedTopologyIds =
- StormConfig.get_supervisor_toplogy_list(conf);
+ List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf);
LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);
/**
- * Step 3: get <port,LocalAssignments> from ZK local node's
- * assignment
+ * Step 3: get <port,LocalAssignments> from ZK local node's assignment
*/
- Map<Integer, LocalAssignment> zkAssignment =
- getLocalAssign(stormClusterState, supervisorId, assignments);
+ Map<Integer, LocalAssignment> zkAssignment = getLocalAssign(stormClusterState, supervisorId, assignments);
Map<Integer, LocalAssignment> localAssignment;
Set<String> updateTopologys;
@@ -152,35 +130,31 @@ class SyncSupervisorEvent extends RunnableCallback {
*/
try {
LOG.debug("Writing local assignment " + zkAssignment);
- localAssignment =
- (Map<Integer, LocalAssignment>) localState
- .get(Common.LS_LOCAL_ASSIGNMENTS);
+ localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS);
if (localAssignment == null) {
localAssignment = new HashMap<Integer, LocalAssignment>();
}
localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment);
- updateTopologys =
- getUpdateTopologys(localAssignment, zkAssignment);
- Set<String> reDownloadTopologys =
- getNeedReDownloadTopologys(localAssignment);
+ updateTopologys = getUpdateTopologys(localAssignment, zkAssignment, assignments);
+ Set<String> reDownloadTopologys = getNeedReDownloadTopologys(localAssignment);
if (reDownloadTopologys != null) {
updateTopologys.addAll(reDownloadTopologys);
}
} catch (IOException e) {
- LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment
- + " of localState failed");
+ LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " of localState failed");
throw e;
}
/**
* Step 5: download code from ZK
*/
- Map<String, String> topologyCodes =
- getTopologyCodeLocations(assignments, supervisorId);
+ Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId);
+
+ // downloadFailedTopologyIds which can't finished download binary from nimbus
+ Set<String> downloadFailedTopologyIds = new HashSet<String>();
- downloadTopology(topologyCodes, downloadedTopologyIds,
- updateTopologys, assignments);
+ downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologys, assignments, downloadFailedTopologyIds);
/**
* Step 6: remove any downloaded useless topology
@@ -191,7 +165,7 @@ class SyncSupervisorEvent extends RunnableCallback {
* Step 7: push syncProcesses Event
*/
// processEventManager.add(syncProcesses);
- syncProcesses.run(zkAssignment);
+ syncProcesses.run(zkAssignment, downloadFailedTopologyIds);
// If everything is OK, set the trigger to update heartbeat of
// supervisor
@@ -209,11 +183,9 @@ class SyncSupervisorEvent extends RunnableCallback {
* @param conf
* @param topologyId
* @param masterCodeDir
- * @param clusterMode
* @throws IOException
*/
- private void downloadStormCode(Map conf, String topologyId,
- String masterCodeDir) throws IOException, TException {
+ private void downloadStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
String clusterMode = StormConfig.cluster_mode(conf);
if (clusterMode.endsWith("distributed")) {
@@ -224,17 +196,14 @@ class SyncSupervisorEvent extends RunnableCallback {
}
}
- private void downloadLocalStormCode(Map conf, String topologyId,
- String masterCodeDir) throws IOException, TException {
+ private void downloadLocalStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
// STORM-LOCAL-DIR/supervisor/stormdist/storm-id
- String stormroot =
- StormConfig.supervisor_stormdist_root(conf, topologyId);
+ String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
FileUtils.copyDirectory(new File(masterCodeDir), new File(stormroot));
- ClassLoader classloader =
- Thread.currentThread().getContextClassLoader();
+ ClassLoader classloader = Thread.currentThread().getContextClassLoader();
String resourcesJar = resourcesJar();
@@ -244,20 +213,16 @@ class SyncSupervisorEvent extends RunnableCallback {
if (resourcesJar != null) {
- LOG.info("Extracting resources from jar at " + resourcesJar
- + " to " + targetDir);
+ LOG.info("Extracting resources from jar at " + resourcesJar + " to " + targetDir);
- JStormUtils.extract_dir_from_jar(resourcesJar,
- StormConfig.RESOURCES_SUBDIR, stormroot);// extract dir
+ JStormUtils.extract_dir_from_jar(resourcesJar, StormConfig.RESOURCES_SUBDIR, stormroot);// extract dir
// from jar;;
// util.clj
} else if (url != null) {
- LOG.info("Copying resources at " + url.toString() + " to "
- + targetDir);
+ LOG.info("Copying resources at " + url.toString() + " to " + targetDir);
- FileUtils.copyDirectory(new File(url.getFile()), (new File(
- targetDir)));
+ FileUtils.copyDirectory(new File(url.getFile()), (new File(targetDir)));
}
}
@@ -271,27 +236,21 @@ class SyncSupervisorEvent extends RunnableCallback {
* @throws IOException
* @throws TException
*/
- private void downloadDistributeStormCode(Map conf, String topologyId,
- String masterCodeDir) throws IOException, TException {
+ private void downloadDistributeStormCode(Map conf, String topologyId, String masterCodeDir) throws IOException, TException {
// STORM_LOCAL_DIR/supervisor/tmp/(UUID)
- String tmproot =
- StormConfig.supervisorTmpDir(conf) + File.separator
- + UUID.randomUUID().toString();
+ String tmproot = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString();
// STORM_LOCAL_DIR/supervisor/stormdist/topologyId
- String stormroot =
- StormConfig.supervisor_stormdist_root(conf, topologyId);
+ String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
- JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir,
- topologyId, true);
+ JStormServerUtils.downloadCodeFromMaster(conf, tmproot, masterCodeDir, topologyId, true);
// tmproot/stormjar.jar
String localFileJarTmp = StormConfig.stormjar_path(tmproot);
// extract dir from jar
- JStormUtils.extract_dir_from_jar(localFileJarTmp,
- StormConfig.RESOURCES_SUBDIR, tmproot);
+ JStormUtils.extract_dir_from_jar(localFileJarTmp, StormConfig.RESOURCES_SUBDIR, tmproot);
File srcDir = new File(tmproot);
File destDir = new File(stormroot);
@@ -325,8 +284,7 @@ class SyncSupervisorEvent extends RunnableCallback {
List<String> rtn = new ArrayList<String>();
int size = jarPaths.size();
for (int i = 0; i < size; i++) {
- if (JStormUtils.zipContainsDir(jarPaths.get(i),
- StormConfig.RESOURCES_SUBDIR)) {
+ if (JStormUtils.zipContainsDir(jarPaths.get(i), StormConfig.RESOURCES_SUBDIR)) {
rtn.add(jarPaths.get(i));
}
}
@@ -342,24 +300,19 @@ class SyncSupervisorEvent extends RunnableCallback {
*
* @param stormClusterState
* @param supervisorId
- * @param callback
* @throws Exception
* @returns map: {port,LocalAssignment}
*/
- private Map<Integer, LocalAssignment> getLocalAssign(
- StormClusterState stormClusterState, String supervisorId,
- Map<String, Assignment> assignments) throws Exception {
+ private Map<Integer, LocalAssignment> getLocalAssign(StormClusterState stormClusterState, String supervisorId, Map<String, Assignment> assignments)
+ throws Exception {
- Map<Integer, LocalAssignment> portLA =
- new HashMap<Integer, LocalAssignment>();
+ Map<Integer, LocalAssignment> portLA = new HashMap<Integer, LocalAssignment>();
for (Entry<String, Assignment> assignEntry : assignments.entrySet()) {
String topologyId = assignEntry.getKey();
Assignment assignment = assignEntry.getValue();
- Map<Integer, LocalAssignment> portTasks =
- readMyTasks(stormClusterState, topologyId, supervisorId,
- assignment);
+ Map<Integer, LocalAssignment> portTasks = readMyTasks(stormClusterState, topologyId, supervisorId, assignment);
if (portTasks == null) {
continue;
}
@@ -374,8 +327,7 @@ class SyncSupervisorEvent extends RunnableCallback {
if (!portLA.containsKey(port)) {
portLA.put(port, la);
} else {
- throw new RuntimeException(
- "Should not have multiple topologys assigned to one port");
+ throw new RuntimeException("Should not have multiple topologys assigned to one port");
}
}
}
@@ -389,30 +341,27 @@ class SyncSupervisorEvent extends RunnableCallback {
* @param stormClusterState
* @param topologyId
* @param supervisorId
- * @param callback
* @return Map: {port, LocalAssignment}
* @throws Exception
*/
- private Map<Integer, LocalAssignment> readMyTasks(
- StormClusterState stormClusterState, String topologyId,
- String supervisorId, Assignment assignmenInfo) throws Exception {
+ private Map<Integer, LocalAssignment> readMyTasks(StormClusterState stormClusterState, String topologyId, String supervisorId, Assignment assignmentInfo)
+ throws Exception {
- Map<Integer, LocalAssignment> portTasks =
- new HashMap<Integer, LocalAssignment>();
+ Map<Integer, LocalAssignment> portTasks = new HashMap<Integer, LocalAssignment>();
- Set<ResourceWorkerSlot> workers = assignmenInfo.getWorkers();
+ Set<ResourceWorkerSlot> workers = assignmentInfo.getWorkers();
if (workers == null) {
- LOG.error("No worker of assignement's " + assignmenInfo);
+ LOG.error("No worker of assignment's " + assignmentInfo);
return portTasks;
}
for (ResourceWorkerSlot worker : workers) {
if (!supervisorId.equals(worker.getNodeId()))
continue;
- portTasks.put(worker.getPort(), new LocalAssignment(topologyId,
- worker.getTasks(), Common.topologyIdToName(topologyId),
- worker.getMemSize(), worker.getCpu(), worker.getJvm(),
- assignmenInfo.getTimeStamp()));
+ portTasks.put(
+ worker.getPort(),
+ new LocalAssignment(topologyId, worker.getTasks(), Common.topologyIdToName(topologyId), worker.getMemSize(), worker.getCpu(), worker
+ .getJvm(), assignmentInfo.getTimeStamp()));
}
return portTasks;
@@ -421,14 +370,10 @@ class SyncSupervisorEvent extends RunnableCallback {
/**
* get mastercodedir for every topology
*
- * @param stormClusterState
- * @param callback
* @throws Exception
* @returns Map: <topologyId, master-code-dir> from zookeeper
*/
- public static Map<String, String> getTopologyCodeLocations(
- Map<String, Assignment> assignments, String supervisorId)
- throws Exception {
+ public static Map<String, String> getTopologyCodeLocations(Map<String, Assignment> assignments, String supervisorId) throws Exception {
Map<String, String> rtn = new HashMap<String, String>();
for (Entry<String, Assignment> entry : assignments.entrySet()) {
@@ -448,9 +393,8 @@ class SyncSupervisorEvent extends RunnableCallback {
return rtn;
}
- public void downloadTopology(Map<String, String> topologyCodes,
- List<String> downloadedTopologyIds, Set<String> updateTopologys,
- Map<String, Assignment> assignments) throws Exception {
+ public void downloadTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds, Set<String> updateTopologys,
+ Map<String, Assignment> assignments, Set<String> downloadFailedTopologyIds) throws Exception {
Set<String> downloadTopologys = new HashSet<String>();
@@ -459,38 +403,53 @@ class SyncSupervisorEvent extends RunnableCallback {
String topologyId = entry.getKey();
String masterCodeDir = entry.getValue();
- if (!downloadedTopologyIds.contains(topologyId)
- || updateTopologys.contains(topologyId)) {
+ if (!downloadedTopologyIds.contains(topologyId) || updateTopologys.contains(topologyId)) {
- LOG.info("Downloading code for storm id " + topologyId
- + " from " + masterCodeDir);
+ LOG.info("Downloading code for storm id " + topologyId + " from " + masterCodeDir);
- try {
- downloadStormCode(conf, topologyId, masterCodeDir);
- // Update assignment timeStamp
- StormConfig.write_supervisor_topology_timestamp(conf,
- topologyId, assignments.get(topologyId)
- .getTimeStamp());
- } catch (IOException e) {
- LOG.error(e + " downloadStormCode failed " + "topologyId:"
- + topologyId + "masterCodeDir:" + masterCodeDir);
+ int retry = 0;
+ while (retry < 3) {
+ try {
+ downloadStormCode(conf, topologyId, masterCodeDir);
+ // Update assignment timeStamp
+ StormConfig.write_supervisor_topology_timestamp(conf, topologyId, assignments.get(topologyId).getTimeStamp());
+ break;
+ } catch (IOException e) {
+ LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir);
- } catch (TException e) {
- LOG.error(e + " downloadStormCode failed " + "topologyId:"
- + topologyId + "masterCodeDir:" + masterCodeDir);
+ } catch (TException e) {
+ LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + masterCodeDir);
+ }
+ retry++;
+ }
+ if (retry < 3) {
+ LOG.info("Finished downloading code for storm id " + topologyId + " from " + masterCodeDir);
+ downloadTopologys.add(topologyId);
+ } else {
+ LOG.error("Cann't download code for storm id " + topologyId + " from " + masterCodeDir);
+ downloadFailedTopologyIds.add(topologyId);
}
- LOG.info("Finished downloading code for storm id " + topologyId
- + " from " + masterCodeDir);
- downloadTopologys.add(topologyId);
+ }
+ }
+ // clear directory of topologyId is dangerous , so it only clear the topologyId which
+ // isn't contained by downloadedTopologyIds
+ for (String topologyId : downloadFailedTopologyIds) {
+ if (!downloadedTopologyIds.contains(topologyId)) {
+ try {
+ String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId);
+ File destDir = new File(stormroot);
+ FileUtils.deleteQuietly(destDir);
+ } catch (Exception e) {
+ LOG.error("Cann't clear directory about storm id " + topologyId + " on supervisor ");
+ }
}
}
updateTaskCleanupTimeout(downloadTopologys);
}
- public void removeUselessTopology(Map<String, String> topologyCodes,
- List<String> downloadedTopologyIds) {
+ public void removeUselessTopology(Map<String, String> topologyCodes, List<String> downloadedTopologyIds) {
for (String topologyId : downloadedTopologyIds) {
if (!topologyCodes.containsKey(topologyId)) {
@@ -499,9 +458,7 @@ class SyncSupervisorEvent extends RunnableCallback {
String path = null;
try {
- path =
- StormConfig.supervisor_stormdist_root(conf,
- topologyId);
+ path = StormConfig.supervisor_stormdist_root(conf, topologyId);
PathUtils.rmr(path);
} catch (IOException e) {
String errMsg = "rmr the path:" + path + "failed\n";
@@ -511,13 +468,11 @@ class SyncSupervisorEvent extends RunnableCallback {
}
}
- private Set<String> getUpdateTopologys(
- Map<Integer, LocalAssignment> localAssignments,
- Map<Integer, LocalAssignment> zkAssignments) {
+ private Set<String> getUpdateTopologys(Map<Integer, LocalAssignment> localAssignments, Map<Integer, LocalAssignment> zkAssignments,
+ Map<String, Assignment> assignments) {
Set<String> ret = new HashSet<String>();
if (localAssignments != null && zkAssignments != null) {
- for (Entry<Integer, LocalAssignment> entry : localAssignments
- .entrySet()) {
+ for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) {
Integer port = entry.getKey();
LocalAssignment localAssignment = entry.getValue();
@@ -526,14 +481,11 @@ class SyncSupervisorEvent extends RunnableCallback {
if (localAssignment == null || zkAssignment == null)
continue;
- if (localAssignment.getTopologyId().equals(
- zkAssignment.getTopologyId())
- && localAssignment.getTimeStamp() < zkAssignment
- .getTimeStamp())
+ Assignment assignment = assignments.get(localAssignment.getTopologyId());
+ if (localAssignment.getTopologyId().equals(zkAssignment.getTopologyId()) && assignment != null
+ && assignment.isTopologyChange(localAssignment.getTimeStamp()))
if (ret.add(localAssignment.getTopologyId())) {
- LOG.info("Topology-" + localAssignment.getTopologyId()
- + " has been updated. LocalTs="
- + localAssignment.getTimeStamp() + ", ZkTs="
+ LOG.info("Topology-" + localAssignment.getTopologyId() + " has been updated. LocalTs=" + localAssignment.getTimeStamp() + ", ZkTs="
+ zkAssignment.getTimeStamp());
}
}
@@ -542,49 +494,37 @@ class SyncSupervisorEvent extends RunnableCallback {
return ret;
}
- private Set<String> getNeedReDownloadTopologys(
- Map<Integer, LocalAssignment> localAssignment) {
- Set<String> reDownloadTopologys =
- syncProcesses.getTopologyIdNeedDownload().getAndSet(null);
+ private Set<String> getNeedReDownloadTopologys(Map<Integer, LocalAssignment> localAssignment) {
+ Set<String> reDownloadTopologys = syncProcesses.getTopologyIdNeedDownload().getAndSet(null);
if (reDownloadTopologys == null || reDownloadTopologys.size() == 0)
return null;
Set<String> needRemoveTopologys = new HashSet<String>();
- Map<Integer, String> portToStartWorkerId =
- syncProcesses.getPortToWorkerId();
- for (Entry<Integer, LocalAssignment> entry : localAssignment
- .entrySet()) {
+ Map<Integer, String> portToStartWorkerId = syncProcesses.getPortToWorkerId();
+ for (Entry<Integer, LocalAssignment> entry : localAssignment.entrySet()) {
if (portToStartWorkerId.containsKey(entry.getKey()))
needRemoveTopologys.add(entry.getValue().getTopologyId());
}
- LOG.debug(
- "worker is starting on these topology, so delay download topology binary: "
- + needRemoveTopologys);
+ LOG.debug("worker is starting on these topology, so delay download topology binary: " + needRemoveTopologys);
reDownloadTopologys.removeAll(needRemoveTopologys);
if (reDownloadTopologys.size() > 0)
- LOG.info("Following topologys is going to re-download the jars, "
- + reDownloadTopologys);
+ LOG.info("Following topologys is going to re-download the jars, " + reDownloadTopologys);
return reDownloadTopologys;
}
private void updateTaskCleanupTimeout(Set<String> topologys) {
Map topologyConf = null;
- Map<String, Integer> taskCleanupTimeouts =
- new HashMap<String, Integer>();
+ Map<String, Integer> taskCleanupTimeouts = new HashMap<String, Integer>();
for (String topologyId : topologys) {
try {
- topologyConf =
- StormConfig.read_supervisor_topology_conf(conf,
- topologyId);
+ topologyConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
} catch (IOException e) {
LOG.info("Failed to read conf for " + topologyId);
}
Integer cleanupTimeout = null;
if (topologyConf != null) {
- cleanupTimeout =
- JStormUtils.parseInt(topologyConf
- .get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC));
+ cleanupTimeout = JStormUtils.parseInt(topologyConf.get(ConfigExtension.TASK_CLEANUP_TIMEOUT_SEC));
}
if (cleanupTimeout == null) {
@@ -596,9 +536,7 @@ class SyncSupervisorEvent extends RunnableCallback {
Map<String, Integer> localTaskCleanupTimeouts = null;
try {
- localTaskCleanupTimeouts =
- (Map<String, Integer>) localState
- .get(Common.LS_TASK_CLEANUP_TIMEOUT);
+ localTaskCleanupTimeouts = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
} catch (IOException e) {
LOG.error("Failed to read local task cleanup timeout map", e);
}
@@ -609,8 +547,7 @@ class SyncSupervisorEvent extends RunnableCallback {
localTaskCleanupTimeouts.putAll(taskCleanupTimeouts);
try {
- localState.put(Common.LS_TASK_CLEANUP_TIMEOUT,
- localTaskCleanupTimeouts);
+ localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, localTaskCleanupTimeouts);
} catch (IOException e) {
LOG.error("Failed to write local task cleanup timeout map", e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
index 81e4374..394c134 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/BatchDrainerRunable.java
@@ -39,8 +39,7 @@ import com.alibaba.jstorm.utils.Pair;
*
*/
public class BatchDrainerRunable extends DisruptorRunable {
- private final static Logger LOG = LoggerFactory
- .getLogger(BatchDrainerRunable.class);
+ private final static Logger LOG = LoggerFactory.getLogger(BatchDrainerRunable.class);
public BatchDrainerRunable(WorkerData workerData) {
super(workerData.getSendingQueue(), MetricDef.BATCH_DRAINER_THREAD);
@@ -50,8 +49,7 @@ public class BatchDrainerRunable extends DisruptorRunable {
@Override
public void handleEvent(Object event, boolean endOfBatch) throws Exception {
- Pair<IConnection, List<TaskMessage>> pair =
- (Pair<IConnection, List<TaskMessage>>) event;
+ Pair<IConnection, List<TaskMessage>> pair = (Pair<IConnection, List<TaskMessage>>) event;
pair.getFirst().send(pair.getSecond());
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
index a260323..47e73b8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ContextMaker.java
@@ -17,25 +17,23 @@
*/
package com.alibaba.jstorm.daemon.worker;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.ThriftTopologyUtils;
-
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* ContextMaker This class is used to create TopologyContext
@@ -56,8 +54,7 @@ public class ContextMaker {
@SuppressWarnings("rawtypes")
public ContextMaker(WorkerData workerData) {
/*
- * Map stormConf, String topologyId, String workerId, HashMap<Integer,
- * String> tasksToComponent, Integer port, List<Integer> workerTasks
+ * Map stormConf, String topologyId, String workerId, HashMap<Integer, String> tasksToComponent, Integer port, List<Integer> workerTasks
*/
this.workerData = workerData;
this.workerTasks = JStormUtils.mk_list(workerData.getTaskids());
@@ -67,12 +64,9 @@ public class ContextMaker {
String topologyId = workerData.getTopologyId();
String workerId = workerData.getWorkerId();
- String distroot =
- StormConfig
- .supervisor_stormdist_root(stormConf, topologyId);
+ String distroot = StormConfig.supervisor_stormdist_root(stormConf, topologyId);
- resourcePath =
- StormConfig.supervisor_storm_resources_path(distroot);
+ resourcePath = StormConfig.supervisor_storm_resources_path(distroot);
pidDir = StormConfig.worker_pids_root(stormConf, workerId);
@@ -85,43 +79,32 @@ public class ContextMaker {
}
}
- public TopologyContext makeTopologyContext(StormTopology topology,
- Integer taskId, clojure.lang.Atom openOrPrepareWasCalled) {
+ public TopologyContext makeTopologyContext(StormTopology topology, Integer taskId, clojure.lang.Atom openOrPrepareWasCalled) {
Map stormConf = workerData.getStormConf();
String topologyId = workerData.getTopologyId();
- HashMap<String, Map<String, Fields>> componentToStreamToFields =
- new HashMap<String, Map<String, Fields>>();
+ HashMap<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
Set<String> components = ThriftTopologyUtils.getComponentIds(topology);
for (String component : components) {
- Map<String, Fields> streamToFieldsMap =
- new HashMap<String, Fields>();
+ Map<String, Fields> streamToFieldsMap = new HashMap<String, Fields>();
- Map<String, StreamInfo> streamInfoMap =
- ThriftTopologyUtils.getComponentCommon(topology, component)
- .get_streams();
+ Map<String, StreamInfo> streamInfoMap = ThriftTopologyUtils.getComponentCommon(topology, component).get_streams();
for (Entry<String, StreamInfo> entry : streamInfoMap.entrySet()) {
String streamId = entry.getKey();
StreamInfo streamInfo = entry.getValue();
- streamToFieldsMap.put(streamId,
- new Fields(streamInfo.get_output_fields()));
+ streamToFieldsMap.put(streamId, new Fields(streamInfo.get_output_fields()));
}
componentToStreamToFields.put(component, streamToFieldsMap);
}
- return new TopologyContext(topology, stormConf,
- workerData.getTasksToComponent(),
- workerData.getComponentToSortedTasks(),
- componentToStreamToFields, topologyId, resourcePath, pidDir,
- taskId, workerData.getPort(), workerTasks,
- workerData.getDefaultResources(),
- workerData.getUserResources(), workerData.getExecutorData(),
- workerData.getRegisteredMetrics(), openOrPrepareWasCalled);
+ return new TopologyContext(topology, stormConf, workerData.getTasksToComponent(), workerData.getComponentToSortedTasks(), componentToStreamToFields,
+ topologyId, resourcePath, pidDir, taskId, workerData.getPort(), workerTasks, workerData.getDefaultResources(), workerData.getUserResources(),
+ workerData.getExecutorData(), workerData.getRegisteredMetrics(), openOrPrepareWasCalled, workerData.getZkCluster());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
index 3477cc4..c19947a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/DrainerRunable.java
@@ -46,8 +46,7 @@ import com.alibaba.jstorm.utils.Pair;
*
*/
public class DrainerRunable extends DisruptorRunable {
- private final static Logger LOG = LoggerFactory
- .getLogger(DrainerRunable.class);
+ private final static Logger LOG = LoggerFactory.getLogger(DrainerRunable.class);
private DisruptorQueue transferQueue;
private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
@@ -92,8 +91,7 @@ public class DrainerRunable extends DisruptorRunable {
if (conn.isClosed() == true) {
// if connection has been closed, just skip the package
- LOG.debug("Skip one tuple of " + taskId
- + ", due to close connection of " + nodePort);
+ LOG.debug("Skip one tuple of " + taskId + ", due to close connection of " + nodePort);
return;
}
@@ -113,11 +111,8 @@ public class DrainerRunable extends DisruptorRunable {
}
public void handleFinish() {
- for (Entry<IConnection, List<TaskMessage>> entry : dispatchMap
- .entrySet()) {
- Pair<IConnection, List<TaskMessage>> pair =
- new Pair<IConnection, List<TaskMessage>>(entry.getKey(),
- entry.getValue());
+ for (Entry<IConnection, List<TaskMessage>> entry : dispatchMap.entrySet()) {
+ Pair<IConnection, List<TaskMessage>> pair = new Pair<IConnection, List<TaskMessage>>(entry.getKey(), entry.getValue());
sendingQueue.publish(pair);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
index 312c57f..1221680 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/LocalAssignment.java
@@ -38,8 +38,7 @@ public class LocalAssignment implements Serializable {
private String jvm;
private long timeStamp;
- public LocalAssignment(String topologyId, Set<Integer> taskIds,
- String topologyName, long mem, int cpu, String jvm, long timeStamp) {
+ public LocalAssignment(String topologyId, Set<Integer> taskIds, String topologyName, long mem, int cpu, String jvm, long timeStamp) {
this.topologyId = topologyId;
this.taskIds = new HashSet<Integer>(taskIds);
this.topologyName = topologyName;
@@ -105,13 +104,8 @@ public class LocalAssignment implements Serializable {
result = prime * result + ((jvm == null) ? 0 : jvm.hashCode());
result = prime * result + (int) (mem ^ (mem >>> 32));
result = prime * result + ((taskIds == null) ? 0 : taskIds.hashCode());
- result =
- prime * result
- + ((topologyId == null) ? 0 : topologyId.hashCode());
- result =
- prime
- * result
- + ((topologyName == null) ? 0 : topologyName.hashCode());
+ result = prime * result + ((topologyId == null) ? 0 : topologyId.hashCode());
+ result = prime * result + ((topologyName == null) ? 0 : topologyName.hashCode());
result = prime * result + (int) (timeStamp & 0xffffffff);
return result;
}
@@ -156,7 +150,6 @@ public class LocalAssignment implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
index 628e0f5..056b6f3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ProcessSimulator.java
@@ -33,8 +33,7 @@ public class ProcessSimulator {
* skip old function name: pid-counter
*/
- protected static ConcurrentHashMap<String, WorkerShutdown> processMap =
- new ConcurrentHashMap<String, WorkerShutdown>();
+ protected static ConcurrentHashMap<String, WorkerShutdown> processMap = new ConcurrentHashMap<String, WorkerShutdown>();
/**
* Register process handler old function name: register-process
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
index 3f8acfc..bde8232 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshActive.java
@@ -34,8 +34,7 @@ import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormUtils;
/**
- * Timely check whether topology is active or not and whether the metrics
- * monitor is enable or disable from ZK
+ * Timely check whether topology is active or not and whether the metrics monitor is enable or disable from ZK
*
* @author yannian/Longda
*
@@ -63,9 +62,7 @@ public class RefreshActive extends RunnableCallback {
this.conf = workerData.getStormConf();
this.zkCluster = workerData.getZkCluster();
this.topologyId = workerData.getTopologyId();
- this.frequence =
- JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS),
- 10);
+ this.frequence = JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 10);
}
@Override
@@ -91,8 +88,7 @@ public class RefreshActive extends RunnableCallback {
return;
}
- LOG.info("Old TopologyStatus:" + oldTopologyStatus
- + ", new TopologyStatus:" + newTopologyStatus);
+ LOG.info("Old TopologyStatus:" + oldTopologyStatus + ", new TopologyStatus:" + newTopologyStatus);
List<TaskShutdownDameon> tasks = workerData.getShutdownTasks();
if (tasks == null) {
@@ -120,8 +116,7 @@ public class RefreshActive extends RunnableCallback {
boolean newMonitorEnable = base.isEnableMonitor();
boolean oldMonitorEnable = monitorEnable.get();
if (newMonitorEnable != oldMonitorEnable) {
- LOG.info("Change MonitorEnable from " + oldMonitorEnable
- + " to " + newMonitorEnable);
+ LOG.info("Change MonitorEnable from " + oldMonitorEnable + " to " + newMonitorEnable);
monitorEnable.set(newMonitorEnable);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
index 48cc945..130985b 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/RefreshConnections.java
@@ -17,23 +17,10 @@
*/
package com.alibaba.jstorm.daemon.worker;
-import java.io.FileNotFoundException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.scheduler.WorkerSlot;
-
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
@@ -42,9 +29,15 @@ import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -56,8 +49,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
*
*/
public class RefreshConnections extends RunnableCallback {
- private static Logger LOG = LoggerFactory
- .getLogger(RefreshConnections.class);
+ private static Logger LOG = LoggerFactory.getLogger(RefreshConnections.class);
private WorkerData workerData;
@@ -102,13 +94,9 @@ public class RefreshConnections extends RunnableCallback {
this.supervisorId = workerData.getSupervisorId();
// this.endpoint_socket_lock = endpoint_socket_lock;
- frequence =
- JStormUtils
- .parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 5);
+ frequence = JStormUtils.parseInt(conf.get(Config.TASK_REFRESH_POLL_SECS), 5);
- taskTimeoutSecs =
- JStormUtils.parseInt(
- conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
+ taskTimeoutSecs = JStormUtils.parseInt(conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
taskTimeoutSecs = taskTimeoutSecs * 3;
}
@@ -122,8 +110,7 @@ public class RefreshConnections extends RunnableCallback {
//
synchronized (this) {
- Assignment assignment =
- zkCluster.assignment_info(topologyId, this);
+ Assignment assignment = zkCluster.assignment_info(topologyId, this);
if (assignment == null) {
String errMsg = "Failed to get Assignment of " + topologyId;
LOG.error(errMsg);
@@ -137,47 +124,39 @@ public class RefreshConnections extends RunnableCallback {
// updated. If so, the outbound
// task map should be updated accordingly.
try {
- Long localAssignmentTS =
- StormConfig.read_supervisor_topology_timestamp(
- conf, topologyId);
- if (localAssignmentTS.longValue() > workerData
- .getAssignmentTs().longValue()) {
+ Long localAssignmentTS = StormConfig.read_supervisor_topology_timestamp(conf, topologyId);
+ if (localAssignmentTS.longValue() > workerData.getAssignmentTs().longValue()) {
try {
- if (assignment.getAssignmentType() == AssignmentType.Config) {
+ if (assignment.getAssignmentType() == AssignmentType.UpdateTopology) {
LOG.info("Get config reload request for " + topologyId);
// If config was updated, notify all tasks
List<TaskShutdownDameon> taskShutdowns = workerData.getShutdownTasks();
Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
workerData.getStormConf().putAll(newConf);
for (TaskShutdownDameon taskSD : taskShutdowns) {
- taskSD.updateConf(newConf);
+ taskSD.update(newConf);
}
- workerData.setAssignmentType(AssignmentType.Config);
+ workerData.setAssignmentType(AssignmentType.UpdateTopology);
} else {
Set<Integer> addedTasks = getAddedTasks(assignment);
- Set<Integer> removedTasks =
- getRemovedTasks(assignment);
-
+ Set<Integer> removedTasks = getRemovedTasks(assignment);
+ Set<Integer> updatedTasks = getUpdatedTasks(assignment);
+
workerData.updateWorkerData(assignment);
-
- if (removedTasks.size() > 0)
- shutdownTasks(removedTasks);
- if (addedTasks.size() > 0)
- createTasks(addedTasks);
-
- Set<Integer> tmpOutboundTasks =
- Worker.worker_output_tasks(workerData);
+
+ shutdownTasks(removedTasks);
+ createTasks(addedTasks);
+ updateTasks(updatedTasks);
+
+ Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData);
if (outboundTasks.equals(tmpOutboundTasks) == false) {
for (int taskId : tmpOutboundTasks) {
if (outboundTasks.contains(taskId) == false)
- workerData
- .addOutboundTaskStatusIfAbsent(taskId);
+ workerData.addOutboundTaskStatusIfAbsent(taskId);
}
- for (int taskId : workerData
- .getOutboundTaskStatus().keySet()) {
+ for (int taskId : workerData.getOutboundTaskStatus().keySet()) {
if (tmpOutboundTasks.contains(taskId) == false) {
- workerData
- .removeOutboundTaskStatus(taskId);
+ workerData.removeOutboundTaskStatus(taskId);
}
}
workerData.setOutboundTasks(tmpOutboundTasks);
@@ -196,23 +175,19 @@ public class RefreshConnections extends RunnableCallback {
}
} catch (FileNotFoundException e) {
- LOG.warn(
- "Failed to read supervisor topology timeStamp for "
- + topologyId + " port="
- + workerData.getPort(), e);
+ LOG.warn("Failed to read supervisor topology timeStamp for " + topologyId + " port=" + workerData.getPort(), e);
}
Set<ResourceWorkerSlot> workers = assignment.getWorkers();
if (workers == null) {
- String errMsg =
- "Failed to get taskToResource of " + topologyId;
+ String errMsg = "Failed to get taskToResource of " + topologyId;
LOG.error(errMsg);
return;
}
- workerData.getWorkerToResource().addAll(workers);
- Map<Integer, WorkerSlot> my_assignment =
- new HashMap<Integer, WorkerSlot>();
+ workerData.updateWorkerToResource(workers);
+
+ Map<Integer, WorkerSlot> my_assignment = new HashMap<Integer, WorkerSlot>();
Map<String, String> node = assignment.getNodeHost();
@@ -220,11 +195,13 @@ public class RefreshConnections extends RunnableCallback {
Set<WorkerSlot> need_connections = new HashSet<WorkerSlot>();
Set<Integer> localTasks = new HashSet<Integer>();
+ Set<Integer> localNodeTasks = new HashSet<Integer>();
if (workers != null && outboundTasks != null) {
for (ResourceWorkerSlot worker : workers) {
- if (supervisorId.equals(worker.getNodeId())
- && worker.getPort() == workerData.getPort())
+ if (supervisorId.equals(worker.getNodeId()))
+ localNodeTasks.addAll(worker.getTasks());
+ if (supervisorId.equals(worker.getNodeId()) && worker.getPort() == workerData.getPort())
localTasks.addAll(worker.getTasks());
for (Integer id : worker.getTasks()) {
if (outboundTasks.contains(id)) {
@@ -236,6 +213,7 @@ public class RefreshConnections extends RunnableCallback {
}
taskNodeport.putAll(my_assignment);
workerData.setLocalTasks(localTasks);
+ workerData.setLocalNodeTasks(localNodeTasks);
// get which connection need to be remove or add
Set<WorkerSlot> current_connections = nodeportSocket.keySet();
@@ -274,18 +252,9 @@ public class RefreshConnections extends RunnableCallback {
nodeportSocket.remove(node_port).close();
}
- // Update the status of all outbound tasks
+ // check the status of connections to all outbound tasks
for (Integer taskId : outboundTasks) {
- boolean isActive = false;
- int currentTime = TimeUtils.current_time_secs();
- TaskHeartbeat tHB =
- zkCluster.task_heartbeat(topologyId, taskId);
- if (tHB != null) {
- int taskReportTime = tHB.getTimeSecs();
- if ((currentTime - taskReportTime) < taskTimeoutSecs)
- isActive = true;
- }
- workerData.updateOutboundTaskStatus(taskId, isActive);
+ workerData.updateOutboundTaskStatus(taskId, isOutTaskConnected(taskId));
}
}
} catch (Exception e) {
@@ -307,16 +276,13 @@ public class RefreshConnections extends RunnableCallback {
private Set<Integer> getAddedTasks(Assignment assignment) {
Set<Integer> ret = new HashSet<Integer>();
try {
- Set<Integer> taskIds =
- assignment.getCurrentWorkerTasks(
- workerData.getSupervisorId(), workerData.getPort());
+ Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort());
for (Integer taskId : taskIds) {
if (!(workerData.getTaskids().contains(taskId)))
ret.add(taskId);
}
} catch (Exception e) {
- LOG.warn("Failed to get added task list for"
- + workerData.getTopologyId());
+ LOG.warn("Failed to get added task list for" + workerData.getTopologyId());
;
}
return ret;
@@ -325,22 +291,36 @@ public class RefreshConnections extends RunnableCallback {
private Set<Integer> getRemovedTasks(Assignment assignment) {
Set<Integer> ret = new HashSet<Integer>();
try {
- Set<Integer> taskIds =
- assignment.getCurrentWorkerTasks(
- workerData.getSupervisorId(), workerData.getPort());
+ Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort());
for (Integer taskId : workerData.getTaskids()) {
if (!(taskIds.contains(taskId)))
ret.add(taskId);
}
} catch (Exception e) {
- LOG.warn("Failed to get removed task list for"
- + workerData.getTopologyId());
+ LOG.warn("Failed to get removed task list for" + workerData.getTopologyId());
;
}
return ret;
}
+ private Set<Integer> getUpdatedTasks(Assignment assignment) {
+ Set<Integer> ret = new HashSet<Integer>();
+ try {
+ Set<Integer> taskIds = assignment.getCurrentWorkerTasks(workerData.getSupervisorId(), workerData.getPort());
+ for (Integer taskId : taskIds) {
+ if ((workerData.getTaskids().contains(taskId)))
+ ret.add(taskId);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get updated task list for" + workerData.getTopologyId());
+ }
+ return ret;
+ }
+
private void createTasks(Set<Integer> tasks) {
+ if (tasks == null)
+ return;
+
for (Integer taskId : tasks) {
try {
TaskShutdownDameon shutdown = Task.mk_task(workerData, taskId);
@@ -352,17 +332,50 @@ public class RefreshConnections extends RunnableCallback {
}
private void shutdownTasks(Set<Integer> tasks) {
- for (Integer taskId : tasks) {
+ if (tasks == null)
+ return;
+
+ List<TaskShutdownDameon> shutdowns = workerData.getShutdownDaemonbyTaskIds(tasks);
+ for (TaskShutdownDameon shutdown : shutdowns) {
try {
- List<TaskShutdownDameon> shutdowns =
- workerData.getShutdownDaemonbyTaskIds(tasks);
- for (TaskShutdownDameon shutdown : shutdowns) {
- shutdown.shutdown();
- }
+ shutdown.shutdown();
} catch (Exception e) {
- LOG.error("Failed to shutdown task-" + taskId, e);
+ LOG.error("Failed to shutdown task-" + shutdown.getTaskId(), e);
}
}
}
+ private void updateTasks(Set<Integer> tasks) {
+ if (tasks == null)
+ return;
+
+ List<TaskShutdownDameon> shutdowns = workerData.getShutdownDaemonbyTaskIds(tasks);
+ for (TaskShutdownDameon shutdown : shutdowns) {
+ try {
+ shutdown.getTask().updateTaskData();
+ } catch (Exception e) {
+ LOG.error("Failed to update task-" + shutdown.getTaskId(), e);
+ }
+ }
+ }
+
+ private boolean isOutTaskConnected(int taskId) {
+ boolean ret = false;
+
+ if (workerData.getInnerTaskTransfer().get(taskId) != null) {
+ // Connections to inner tasks should be done after initialization.
+ // So return true here for all inner tasks.
+ ret = true;
+ } else {
+ WorkerSlot slot = taskNodeport.get(taskId);
+ if (slot != null) {
+ IConnection connection = nodeportSocket.get(slot);
+ if (connection != null) {
+ ret = connection.available();
+ }
+ }
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
index 2006b05..97932b9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/ShutdownableDameon.java
@@ -21,7 +21,6 @@ import backtype.storm.daemon.Shutdownable;
import com.alibaba.jstorm.cluster.DaemonCommon;
-public interface ShutdownableDameon extends Shutdownable, DaemonCommon,
- Runnable {
+public interface ShutdownableDameon extends Shutdownable, DaemonCommon, Runnable {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
index 21dc37c..a769cc1 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/VirtualPortDispatch.java
@@ -38,23 +38,21 @@ import com.alibaba.jstorm.utils.DisruptorRunable;
*
*/
public class VirtualPortDispatch extends DisruptorRunable {
- private final static Logger LOG = LoggerFactory
- .getLogger(VirtualPortDispatch.class);
+ private final static Logger LOG = LoggerFactory.getLogger(VirtualPortDispatch.class);
private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
private IConnection recvConnection;
- public VirtualPortDispatch(WorkerData workerData,
- IConnection recvConnection, DisruptorQueue recvQueue) {
+ public VirtualPortDispatch(WorkerData workerData, IConnection recvConnection, DisruptorQueue recvQueue) {
super(recvQueue, MetricDef.DISPATCH_THREAD);
this.recvConnection = recvConnection;
this.deserializeQueues = workerData.getDeserializeQueues();
}
-
+
public void shutdownRecv() {
- // don't need send shutdown command to every task
+ // don't need send shutdown command to every task
// due to every task has been shutdown by workerData.active
// at the same time queue has been fulll
// byte shutdownCmd[] = { TaskStatus.SHUTDOWN };
@@ -87,8 +85,7 @@ public class VirtualPortDispatch extends DisruptorRunable {
DisruptorQueue queue = deserializeQueues.get(task);
if (queue == null) {
- LOG.warn("Received invalid message directed at port " + task
- + ". Dropping...");
+ LOG.warn("Received invalid message directed at port " + task + ". Dropping...");
return;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
index d5cf9c8..2bf4c9c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/Worker.java
@@ -17,22 +17,6 @@
*/
package com.alibaba.jstorm.daemon.worker;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
@@ -41,30 +25,31 @@ import backtype.storm.messaging.IContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
import com.alibaba.jstorm.daemon.worker.hearbeat.WorkerHeartbeatRunable;
-import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
/**
* worker entrance
*
* @author yannian/Longda
- *
*/
public class Worker {
@@ -76,26 +61,14 @@ public class Worker {
private WorkerData workerData;
@SuppressWarnings({ "rawtypes", "unchecked" })
- public Worker(Map conf, IContext context, String topology_id,
- String supervisor_id, int port, String worker_id, String jar_path)
- throws Exception {
-
- workerData =
- new WorkerData(conf, context, topology_id, supervisor_id, port,
- worker_id, jar_path);
-
+ public Worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
+ workerData = new WorkerData(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
}
/**
* get current task's output task list
- *
- * @param tasks_component
- * @param mk_topology_context
- * @param task_ids
- * @throws Exception
*/
public static Set<Integer> worker_output_tasks(WorkerData workerData) {
-
ContextMaker context_maker = workerData.getContextMaker();
Set<Integer> task_ids = workerData.getTaskids();
StormTopology topology = workerData.getSysTopology();
@@ -103,16 +76,13 @@ public class Worker {
Set<Integer> rtn = new HashSet<Integer>();
for (Integer taskid : task_ids) {
- TopologyContext context =
- context_maker.makeTopologyContext(topology, taskid, null);
+ TopologyContext context = context_maker.makeTopologyContext(topology, taskid, null);
// <StreamId, <ComponentId, Grouping>>
- Map<String, Map<String, Grouping>> targets =
- context.getThisTargets();
+ Map<String, Map<String, Grouping>> targets = context.getThisTargets();
for (Map<String, Grouping> e : targets.values()) {
for (String componentId : e.keySet()) {
- List<Integer> tasks =
- context.getComponentTasks(componentId);
+ List<Integer> tasks = context.getComponentTasks(componentId);
rtn.addAll(tasks);
}
}
@@ -140,45 +110,46 @@ public class Worker {
Set<Integer> taskids = workerData.getTaskids();
+ Set<Thread> threads = new HashSet<Thread>();
+ List<Task> taskArrayList = new ArrayList<Task>();
for (int taskid : taskids) {
-
- TaskShutdownDameon t = Task.mk_task(workerData, taskid);
-
- shutdowntasks.add(t);
+ Task task = new Task(workerData, taskid);
+ Thread thread =new Thread(task);
+ threads.add(thread);
+ taskArrayList.add(task);
+ thread.start();
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ for (Task t : taskArrayList){
+ shutdowntasks.add(t.getTaskShutdownDameon());
}
-
return shutdowntasks;
}
-
+
@Deprecated
private DisruptorQueue startDispatchDisruptor() {
- Map stormConf = workerData.getStormConf();
-
- int queue_size =
- Utils.getInt(
- stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE),
- 1024);
- WaitStrategy waitStrategy =
- (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
- DisruptorQueue recvQueue =
- DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI,
- queue_size, waitStrategy);
+ Map stormConf = workerData.getStormConf();
+
+ int queue_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024);
+ WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
+ DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI, queue_size, waitStrategy);
// stop consumerStarted
recvQueue.consumerStarted();
-
+
return recvQueue;
}
private void startDispatchThread() {
- // remove dispatch thread, send tuple directly from nettyserver
- //startDispatchDisruptor();
+ // remove dispatch thread, send tuple directly from nettyserver
+ // startDispatchDisruptor();
IContext context = workerData.getContext();
String topologyId = workerData.getTopologyId();
- IConnection recvConnection =
- context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues());
-
+ IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues());
+
workerData.setRecvConnection(recvConnection);
}
@@ -191,40 +162,27 @@ public class Worker {
// so create client connection before create task
// refresh connection
RefreshConnections refreshConn = makeRefreshConnections();
- AsyncLoopThread refreshconn =
- new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY,
- true);
+ AsyncLoopThread refreshconn = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true);
threads.add(refreshconn);
// refresh ZK active status
RefreshActive refreshZkActive = new RefreshActive(workerData);
- AsyncLoopThread refreshzk =
- new AsyncLoopThread(refreshZkActive, false,
- Thread.MIN_PRIORITY, true);
+ AsyncLoopThread refreshzk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true);
threads.add(refreshzk);
// Sync heartbeat to Apsara Container
- AsyncLoopThread syncContainerHbThread =
- SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
+ AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
if (syncContainerHbThread != null) {
threads.add(syncContainerHbThread);
}
- JStormMetricsReporter metricReporter =
- new JStormMetricsReporter(workerData);
- AsyncLoopThread metricThread = new AsyncLoopThread(metricReporter);
- threads.add(metricThread);
-
- // create task heartbeat
- TaskHeartbeatRunable taskHB = new TaskHeartbeatRunable(workerData);
- AsyncLoopThread taskHBThread = new AsyncLoopThread(taskHB);
- threads.add(taskHBThread);
+ JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData);
+ metricReporter.init();
+ workerData.setMetricsReporter(metricReporter);
// refresh hearbeat to Local dir
RunnableCallback heartbeat_fn = new WorkerHeartbeatRunable(workerData);
- AsyncLoopThread hb =
- new AsyncLoopThread(heartbeat_fn, false, null,
- Thread.NORM_PRIORITY, true);
+ AsyncLoopThread hb = new AsyncLoopThread(heartbeat_fn, false, null, Thread.NORM_PRIORITY, true);
threads.add(hb);
// shutdown task callbacks
@@ -239,7 +197,6 @@ public class Worker {
* create worker instance and run it
*
* @param conf
- * @param mq_context
* @param topology_id
* @param supervisor_id
* @param port
@@ -248,9 +205,8 @@ public class Worker {
* @throws Exception
*/
@SuppressWarnings("rawtypes")
- public static WorkerShutdown mk_worker(Map conf, IContext context,
- String topology_id, String supervisor_id, int port,
- String worker_id, String jar_path) throws Exception {
+ public static WorkerShutdown mk_worker(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path)
+ throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("topologyId:" + topology_id + ", ");
@@ -260,9 +216,7 @@ public class Worker {
LOG.info("Begin to run worker:" + sb.toString());
- Worker w =
- new Worker(conf, context, topology_id, supervisor_id, port,
- worker_id, jar_path);
+ Worker w = new Worker(conf, context, topology_id, supervisor_id, port, worker_id, jar_path);
w.redirectOutput();
@@ -271,8 +225,7 @@ public class Worker {
public void redirectOutput() {
- if (System.getenv("REDIRECT") == null
- || !System.getenv("REDIRECT").equals("true")) {
+ if (System.getenv("REDIRECT") == null || !System.getenv("REDIRECT").equals("true")) {
return;
}
@@ -283,9 +236,7 @@ public class Worker {
DEFAULT_OUT_TARGET_FILE += ".out";
}
- String outputFile =
- ConfigExtension.getWorkerRedirectOutputFile(workerData
- .getStormConf());
+ String outputFile = ConfigExtension.getWorkerRedirectOutputFile(workerData.getStormConf());
if (outputFile == null) {
outputFile = DEFAULT_OUT_TARGET_FILE;
} else {
@@ -302,7 +253,6 @@ public class Worker {
outputFile = DEFAULT_OUT_TARGET_FILE;
}
}
-
} catch (Exception e) {
LOG.warn("Failed to touch " + outputFile, e);
outputFile = DEFAULT_OUT_TARGET_FILE;
@@ -318,9 +268,7 @@ public class Worker {
}
/**
- * Have one problem if the worker's start parameter length is longer than
- * 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find
- * worker
+ * Have one problem if the worker's start parameter length is longer than 4096, ps -ef|grep com.alibaba.jstorm.daemon.worker.Worker can't find worker
*
* @param port
*/
@@ -341,15 +289,11 @@ public class Worker {
try {
LOG.info("Begin to execute " + sb.toString());
- Process process =
- JStormUtils.launch_process(sb.toString(),
- new HashMap<String, String>(), false);
-
+ Process process = JStormUtils.launch_process(sb.toString(), new HashMap<String, String>(), false);
// Process process = Runtime.getRuntime().exec(sb.toString());
InputStream stdin = process.getInputStream();
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(stdin));
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
JStormUtils.sleepMs(1000);
@@ -405,7 +349,6 @@ public class Worker {
LOG.info("Skip kill myself");
continue;
}
-
Integer pid = Integer.valueOf(fields[1]);
LOG.info("Find one process :" + pid.toString());
@@ -415,9 +358,7 @@ public class Worker {
continue;
}
}
-
}
-
return ret;
} catch (IOException e) {
LOG.info("Failed to execute " + sb.toString());
@@ -429,13 +370,10 @@ public class Worker {
}
public static void killOldWorker(String port) {
-
List<Integer> oldPids = getOldPortPids(port);
for (Integer pid : oldPids) {
-
JStormUtils.kill(pid);
}
-
}
/**
@@ -456,7 +394,6 @@ public class Worker {
}
StringBuilder sb = new StringBuilder();
-
try {
String topology_id = args[0];
String supervisor_id = args[1];
@@ -476,9 +413,7 @@ public class Worker {
sb.append("workerId:" + worker_id + ", ");
sb.append("jar_path:" + jar_path + "\n");
- WorkerShutdown sd =
- mk_worker(conf, null, topology_id, supervisor_id,
- Integer.parseInt(port_str), worker_id, jar_path);
+ WorkerShutdown sd = mk_worker(conf, null, topology_id, supervisor_id, Integer.parseInt(port_str), worker_id, jar_path);
sd.join();
LOG.info("Successfully shutdown worker " + sb.toString());