You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:53 UTC
[15/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
index e984455..fab05ea 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusTransition.java
@@ -17,51 +17,36 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.alibaba.jstorm.callback.Callback;
-import com.alibaba.jstorm.callback.impl.ActiveTransitionCallback;
-import com.alibaba.jstorm.callback.impl.DoRebalanceTransitionCallback;
-import com.alibaba.jstorm.callback.impl.DoneRebalanceTransitionCallback;
-import com.alibaba.jstorm.callback.impl.InactiveTransitionCallback;
-import com.alibaba.jstorm.callback.impl.KillTransitionCallback;
-import com.alibaba.jstorm.callback.impl.ReassignTransitionCallback;
-import com.alibaba.jstorm.callback.impl.RebalanceTransitionCallback;
-import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
-import com.alibaba.jstorm.callback.impl.UpdateConfTransitionCallback;
+import com.alibaba.jstorm.callback.impl.*;
import com.alibaba.jstorm.cluster.StormBase;
import com.alibaba.jstorm.cluster.StormStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Status changing
*
* @author version1: lixin version2: Longda
- *
- *
- *
*/
public class StatusTransition {
- private final static Logger LOG = LoggerFactory
- .getLogger(StatusTransition.class);
+ private final static Logger LOG = LoggerFactory.getLogger(StatusTransition.class);
private NimbusData data;
- private Map<String, Object> topologyLocks =
- new ConcurrentHashMap<String, Object>();
+ private Map<String, Object> topologyLocks = new ConcurrentHashMap<String, Object>();
public StatusTransition(NimbusData data) {
this.data = data;
}
- public <T> void transition(String topologyid, boolean errorOnNoTransition,
- StatusType changeStatus, T... args) throws Exception {
+ public <T> void transition(String topologyid, boolean errorOnNoTransition, StatusType changeStatus, T... args) throws Exception {
// lock outside
Object lock = topologyLocks.get(topologyid);
if (lock == null) {
@@ -70,8 +55,7 @@ public class StatusTransition {
}
if (data.getIsShutdown().get() == true) {
- LOG.info("Nimbus is in shutdown, skip this event " + topologyid
- + ":" + changeStatus);
+ LOG.info("Nimbus is in shutdown, skip this event " + topologyid + ":" + changeStatus);
return;
}
@@ -86,50 +70,34 @@ public class StatusTransition {
/**
* Changing status
*
- * @param topologyId
- * @param errorOnNTransition if it is true, failure will throw exception
* @param args -- will be used in the status changing callback
- *
*/
- public <T> void transitionLock(String topologyid,
- boolean errorOnNoTransition, StatusType changeStatus, T... args)
- throws Exception {
+ public <T> void transitionLock(String topologyid, boolean errorOnNoTransition, StatusType changeStatus, T... args) throws Exception {
// get ZK's topology node's data, which is StormBase
- StormBase stormbase =
- data.getStormClusterState().storm_base(topologyid, null);
+ StormBase stormbase = data.getStormClusterState().storm_base(topologyid, null);
if (stormbase == null) {
- LOG.error("Cannot apply event changing status "
- + changeStatus.getStatus() + " to " + topologyid
- + " because failed to get StormBase from ZK");
+ LOG.error("Cannot apply event changing status " + changeStatus.getStatus() + " to " + topologyid + " because failed to get StormBase from ZK");
return;
}
StormStatus currentStatus = stormbase.getStatus();
if (currentStatus == null) {
- LOG.error("Cannot apply event changing status "
- + changeStatus.getStatus() + " to " + topologyid
- + " because topologyStatus is null in ZK");
+ LOG.error("Cannot apply event changing status " + changeStatus.getStatus() + " to " + topologyid + " because topologyStatus is null in ZK");
return;
}
// <currentStatus, Map<changingStatus, callback>>
- Map<StatusType, Map<StatusType, Callback>> callbackMap =
- stateTransitions(topologyid, currentStatus);
+ Map<StatusType, Map<StatusType, Callback>> callbackMap = stateTransitions(topologyid, currentStatus);
// get current changingCallbacks
- Map<StatusType, Callback> changingCallbacks =
- callbackMap.get(currentStatus.getStatusType());
+ Map<StatusType, Callback> changingCallbacks = callbackMap.get(currentStatus.getStatusType());
- if (changingCallbacks == null
- || changingCallbacks.containsKey(changeStatus) == false
- || changingCallbacks.get(changeStatus) == null) {
+ if (changingCallbacks == null || changingCallbacks.containsKey(changeStatus) == false || changingCallbacks.get(changeStatus) == null) {
String msg =
- "No transition for event: changing status:"
- + changeStatus.getStatus() + ", current status: "
- + currentStatus.getStatusType() + " topology-id: "
- + topologyid;
+ "No transition for event: changing status:" + changeStatus.getStatus() + ", current status: " + currentStatus.getStatusType()
+ + " topology-id: " + topologyid;
LOG.info(msg);
if (errorOnNoTransition) {
throw new RuntimeException(msg);
@@ -144,12 +112,10 @@ public class StatusTransition {
StormStatus newStatus = (StormStatus) obj;
// update status to ZK
data.getStormClusterState().update_storm(topologyid, newStatus);
- LOG.info("Successfully updated " + topologyid + " as status "
- + newStatus);
+ LOG.info("Successfully updated " + topologyid + " as status " + newStatus);
}
- LOG.info("Successfully apply event changing status "
- + changeStatus.getStatus() + " to " + topologyid);
+ LOG.info("Successfully apply event changing status " + changeStatus.getStatus() + " to " + topologyid);
return;
}
@@ -157,104 +123,74 @@ public class StatusTransition {
/**
* generate status changing map
*
- *
- *
* @param topologyid
- * @param status
- * @return
- *
- * Map<StatusType, Map<StatusType, Callback>> means
- * Map<currentStatus, Map<changingStatus, Callback>>
+ * @return Map<StatusType, Map<StatusType, Callback>> means Map<currentStatus, Map<changingStatus, Callback>>
*/
- private Map<StatusType, Map<StatusType, Callback>> stateTransitions(
- String topologyid, StormStatus currentStatus) {
+ private Map<StatusType, Map<StatusType, Callback>> stateTransitions(String topologyid, StormStatus currentStatus) {
/**
*
- * 1. Status: this status will be stored in ZK
- * killed/inactive/active/rebalancing 2. action:
+ * 1. Status: this status will be stored in ZK killed/inactive/active/rebalancing 2. action:
*
- * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger
- * this only valid when current status is active inactivate -- client
- * will trigger this action, only valid when current status is active
- * activate -- client will trigger this action only valid when current
- * status is inactive startup -- when nimbus startup, it will trigger
- * this action only valid when current status is killed/rebalancing kill
- * -- client kill topology will trigger this action, only valid when
- * current status is active/inactive/killed remove -- 30 seconds after
- * client submit kill command, it will do this action, only valid when
- * current status is killed rebalance -- client submit rebalance
- * command, only valid when current status is active/deactive
- * do_rebalance -- 30 seconds after client submit rebalance command, it
- * will do this action, only valid when current status is rebalance
+ * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this only valid when current status is active inactivate -- client will trigger
+ * this action, only valid when current status is active activate -- client will trigger this action only valid when current status is inactive startup
+ * -- when nimbus startup, it will trigger this action only valid when current status is killed/rebalancing kill -- client kill topology will trigger
+ * this action, only valid when current status is active/inactive/killed remove -- 30 seconds after client submit kill command, it will do this action,
+ * only valid when current status is killed rebalance -- client submit rebalance command, only valid when current status is active/deactive do_rebalance
+ * -- 30 seconds after client submit rebalance command, it will do this action, only valid when current status is rebalance
*/
- Map<StatusType, Map<StatusType, Callback>> rtn =
- new HashMap<StatusType, Map<StatusType, Callback>>();
+ Map<StatusType, Map<StatusType, Callback>> rtn = new HashMap<StatusType, Map<StatusType, Callback>>();
// current status is active
- Map<StatusType, Callback> activeMap =
- new HashMap<StatusType, Callback>();
- activeMap.put(StatusType.monitor, new ReassignTransitionCallback(data,
- topologyid));
+ Map<StatusType, Callback> activeMap = new HashMap<StatusType, Callback>();
+ activeMap.put(StatusType.monitor, new ReassignTransitionCallback(data, topologyid));
activeMap.put(StatusType.inactivate, new InactiveTransitionCallback());
activeMap.put(StatusType.startup, null);
activeMap.put(StatusType.activate, null);
- activeMap.put(StatusType.kill, new KillTransitionCallback(data,
- topologyid));
+ activeMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid));
activeMap.put(StatusType.remove, null);
- activeMap.put(StatusType.rebalance, new RebalanceTransitionCallback(
- data, topologyid, currentStatus));
+ activeMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus));
activeMap.put(StatusType.do_rebalance, null);
activeMap.put(StatusType.done_rebalance, null);
- activeMap.put(StatusType.update_conf, new UpdateConfTransitionCallback(
- data, topologyid, currentStatus));
+ activeMap.put(StatusType.update_topology, new UpdateTopologyTransitionCallback(data, topologyid, currentStatus));
rtn.put(StatusType.active, activeMap);
// current status is inactive
- Map<StatusType, Callback> inactiveMap =
- new HashMap<StatusType, Callback>();
+ Map<StatusType, Callback> inactiveMap = new HashMap<StatusType, Callback>();
- inactiveMap.put(StatusType.monitor, new ReassignTransitionCallback(
- data, topologyid, new StormStatus(StatusType.inactive)));
+ inactiveMap.put(StatusType.monitor, new ReassignTransitionCallback(data, topologyid, new StormStatus(StatusType.inactive)));
inactiveMap.put(StatusType.inactivate, null);
inactiveMap.put(StatusType.startup, null);
inactiveMap.put(StatusType.activate, new ActiveTransitionCallback());
- inactiveMap.put(StatusType.kill, new KillTransitionCallback(data,
- topologyid));
+ inactiveMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid));
inactiveMap.put(StatusType.remove, null);
- inactiveMap.put(StatusType.rebalance, new RebalanceTransitionCallback(
- data, topologyid, currentStatus));
+ inactiveMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus));
inactiveMap.put(StatusType.do_rebalance, null);
inactiveMap.put(StatusType.done_rebalance, null);
- inactiveMap.put(StatusType.update_conf, null);
+ inactiveMap.put(StatusType.update_topology, null);
rtn.put(StatusType.inactive, inactiveMap);
// current status is killed
- Map<StatusType, Callback> killedMap =
- new HashMap<StatusType, Callback>();
+ Map<StatusType, Callback> killedMap = new HashMap<StatusType, Callback>();
killedMap.put(StatusType.monitor, null);
killedMap.put(StatusType.inactivate, null);
- killedMap.put(StatusType.startup, new KillTransitionCallback(data,
- topologyid));
+ killedMap.put(StatusType.startup, new KillTransitionCallback(data, topologyid));
killedMap.put(StatusType.activate, null);
- killedMap.put(StatusType.kill, new KillTransitionCallback(data,
- topologyid));
- killedMap.put(StatusType.remove, new RemoveTransitionCallback(data,
- topologyid));
+ killedMap.put(StatusType.kill, new KillTransitionCallback(data, topologyid));
+ killedMap.put(StatusType.remove, new RemoveTransitionCallback(data, topologyid));
killedMap.put(StatusType.rebalance, null);
killedMap.put(StatusType.do_rebalance, null);
killedMap.put(StatusType.done_rebalance, null);
- killedMap.put(StatusType.update_conf, null);
+ killedMap.put(StatusType.update_topology, null);
rtn.put(StatusType.killed, killedMap);
// current status is under rebalancing
- Map<StatusType, Callback> rebalancingMap =
- new HashMap<StatusType, Callback>();
+ Map<StatusType, Callback> rebalancingMap = new HashMap<StatusType, Callback>();
StatusType rebalanceOldStatus = StatusType.active;
if (currentStatus.getOldStatus() != null) {
@@ -267,20 +203,14 @@ public class StatusTransition {
rebalancingMap.put(StatusType.monitor, null);
rebalancingMap.put(StatusType.inactivate, null);
- rebalancingMap.put(StatusType.startup, new RebalanceTransitionCallback(
- data, topologyid, new StormStatus(rebalanceOldStatus)));
+ rebalancingMap.put(StatusType.startup, new RebalanceTransitionCallback(data, topologyid, new StormStatus(rebalanceOldStatus)));
rebalancingMap.put(StatusType.activate, null);
rebalancingMap.put(StatusType.kill, null);
rebalancingMap.put(StatusType.remove, null);
- rebalancingMap
- .put(StatusType.rebalance, new RebalanceTransitionCallback(
- data, topologyid, currentStatus));
- rebalancingMap.put(StatusType.do_rebalance,
- new DoRebalanceTransitionCallback(data, topologyid,
- new StormStatus(rebalanceOldStatus)));
- rebalancingMap.put(StatusType.done_rebalance,
- new DoneRebalanceTransitionCallback(data, topologyid));
- rebalancingMap.put(StatusType.update_conf, null);
+ rebalancingMap.put(StatusType.rebalance, new RebalanceTransitionCallback(data, topologyid, currentStatus));
+ rebalancingMap.put(StatusType.do_rebalance, new DoRebalanceTransitionCallback(data, topologyid, new StormStatus(rebalanceOldStatus)));
+ rebalancingMap.put(StatusType.done_rebalance, new DoneRebalanceTransitionCallback(data, topologyid));
+ rebalancingMap.put(StatusType.update_topology, null);
rtn.put(StatusType.rebalancing, rebalancingMap);
/**
@@ -288,7 +218,6 @@ public class StatusTransition {
*/
return rtn;
-
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
index cf785b7..d0f68ff 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/StatusType.java
@@ -20,21 +20,14 @@ package com.alibaba.jstorm.daemon.nimbus;
/**
* topology status:
*
- * 1. Status: this status will be stored in ZK
- * killed/inactive/active/rebalancing 2. action:
+ * 1. Status: this status will be stored in ZK killed/inactive/active/rebalancing 2. action:
*
- * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this
- * only valid when current status is active inactivate -- client will trigger
- * this action, only valid when current status is active activate -- client will
- * trigger this action only valid when current status is inactive startup --
- * when nimbus startup, it will trigger this action only valid when current
- * status is killed/rebalancing kill -- client kill topology will trigger this
- * action, only valid when current status is active/inactive/killed remove -- 30
- * seconds after client submit kill command, it will do this action, only valid
- * when current status is killed rebalance -- client submit rebalance command,
- * only valid when current status is active/deactive do_rebalance -- 30 seconds
- * after client submit rebalance command, it will do this action, only valid
- * when current status is rebalance
+ * monitor -- every Config.NIMBUS_MONITOR_FREQ_SECS seconds will trigger this only valid when current status is active inactivate -- client will trigger this
+ * action, only valid when current status is active activate -- client will trigger this action only valid when current status is inactive startup -- when
+ * nimbus startup, it will trigger this action only valid when current status is killed/rebalancing kill -- client kill topology will trigger this action, only
+ * valid when current status is active/inactive/killed remove -- 30 seconds after client submit kill command, it will do this action, only valid when current
+ * status is killed rebalance -- client submit rebalance command, only valid when current status is active/deactive do_rebalance -- 30 seconds after client
+ * submit rebalance command, it will do this action, only valid when current status is rebalance
*
*
*
@@ -43,13 +36,11 @@ package com.alibaba.jstorm.daemon.nimbus;
public enum StatusType {
// status
- active("active"), inactive("inactive"), rebalancing("rebalancing"), killed(
- "killed"),
+ active("active"), inactive("inactive"), rebalancing("rebalancing"), killed("killed"),
// actions
- activate("activate"), inactivate("inactivate"), monitor("monitor"), startup(
- "startup"), kill("kill"), remove("remove"), rebalance("rebalance"), do_rebalance(
- "do-rebalance"), done_rebalance("done-rebalance"), update_conf("update-config");
+ activate("activate"), inactivate("inactivate"), monitor("monitor"), startup("startup"), kill("kill"), remove("remove"), rebalance("rebalance"), do_rebalance(
+ "do-rebalance"), done_rebalance("done-rebalance"), update_topology("update-topoloogy");
private String status;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
index fd6f461..51da198 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssign.java
@@ -17,33 +17,12 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.generated.StormTopology;
import backtype.storm.scheduler.WorkerSlot;
-
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.StormBase;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.cluster.StormStatus;
+import com.alibaba.jstorm.cluster.*;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskStartEvent;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.AssignmentBak;
@@ -51,15 +30,23 @@ import com.alibaba.jstorm.schedule.IToplogyScheduler;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyScheduler;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
public class TopologyAssign implements Runnable {
- private final static Logger LOG = LoggerFactory
- .getLogger(TopologyAssign.class);
+ private final static Logger LOG = LoggerFactory.getLogger(TopologyAssign.class);
/**
* private constructor function to avoid multiple instance
@@ -93,7 +80,7 @@ public class TopologyAssign implements Runnable {
public void init(NimbusData nimbusData) {
this.nimbusData = nimbusData;
- //this.cleanupTimeoutSec = 60;
+ // this.cleanupTimeoutSec = 60;
this.schedulers = new HashMap<String, IToplogyScheduler>();
@@ -113,8 +100,7 @@ public class TopologyAssign implements Runnable {
thread.interrupt();
}
- protected static LinkedBlockingQueue<TopologyAssignEvent> queue =
- new LinkedBlockingQueue<TopologyAssignEvent>();
+ protected static LinkedBlockingQueue<TopologyAssignEvent> queue = new LinkedBlockingQueue<TopologyAssignEvent>();
public static void push(TopologyAssignEvent event) {
queue.offer(event);
@@ -159,12 +145,37 @@ public class TopologyAssign implements Runnable {
* @return
*/
protected boolean doTopologyAssignment(TopologyAssignEvent event) {
- Assignment assignment = null;
+ Assignment assignment;
try {
+ Assignment oldAssignment = null;
+ boolean isReassign = event.isScratch();
+ if (isReassign) {
+ oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
+ }
assignment = mkAssignment(event);
- if (!(event.isScratch()))
+ // notify jstorm monitor on task assign/reassign/rebalance
+ TaskStartEvent taskEvent = new TaskStartEvent();
+ taskEvent.oldAssignment = oldAssignment;
+ taskEvent.newAssignment = assignment;
+ taskEvent.topologyId = event.getTopologyId();
+ taskEvent.clusterName = nimbusData.getClusterName();
+ taskEvent.timestamp = System.currentTimeMillis();
+
+ Map<Integer, String> task2Component;
+ // get from nimbus cache first
+ Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId());
+ if (taskInfoMap != null) {
+ task2Component = Common.getTaskToComponent(taskInfoMap);
+ } else {
+ task2Component = Common.getTaskToComponent(Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId()));
+ }
+ taskEvent.task2Component = task2Component;
+ nimbusData.getMetricRunnable().pushEvent(taskEvent);
+
+ if (!isReassign) {
setTopologyStatus(event);
+ }
} catch (Throwable e) {
LOG.error("Failed to assign topology " + event.getTopologyId(), e);
event.fail(e.getMessage());
@@ -180,8 +191,6 @@ public class TopologyAssign implements Runnable {
/**
* cleanup the topologies which are not in ZK /topology, but in other place
*
- * @param nimbusData
- * @param active_topologys
* @throws Exception
*/
public void cleanupDisappearedTopology() throws Exception {
@@ -192,8 +201,7 @@ public class TopologyAssign implements Runnable {
return;
}
- Set<String> cleanupIds =
- get_cleanup_ids(clusterState, active_topologys);
+ Set<String> cleanupIds = get_cleanup_ids(clusterState, active_topologys);
for (String topologyId : cleanupIds) {
@@ -202,13 +210,12 @@ public class TopologyAssign implements Runnable {
clusterState.try_remove_storm(topologyId);
//
nimbusData.getTaskHeartbeatsCache().remove(topologyId);
+ nimbusData.getTasksHeartbeat().remove(topologyId);
NimbusUtils.removeTopologyTaskTimeout(nimbusData, topologyId);
// get /nimbus/stormdist/topologyId
- String master_stormdist_root =
- StormConfig.masterStormdistRoot(nimbusData.getConf(),
- topologyId);
+ String master_stormdist_root = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId);
try {
// delete topologyId local dir
PathUtils.rmr(master_stormdist_root);
@@ -218,14 +225,12 @@ public class TopologyAssign implements Runnable {
}
}
- private void get_code_ids(List<String> code_ids,
- HashSet<String> latest_code_ids) throws IOException {
+ private void get_code_ids(List<String> code_ids, HashSet<String> latest_code_ids) throws IOException {
Map conf = nimbusData.getConf();
String master_stormdist_root = StormConfig.masterStormdistRoot(conf);
// listdir /local-dir/nimbus/stormdist
- List<String> all_code_ids =
- PathUtils.read_dir_contents(master_stormdist_root);
+ List<String> all_code_ids = PathUtils.read_dir_contents(master_stormdist_root);
code_ids.addAll(all_code_ids);
long now = System.currentTimeMillis();
@@ -238,9 +243,6 @@ public class TopologyAssign implements Runnable {
long modify = file.lastModified();
- if (now - modify < cleanupTimeoutSec * 1000) {
- latest_code_ids.add(dir);
- }
} catch (Exception exception) {
LOG.error("Failed to get modify time of " + dir, exception);
}
@@ -256,14 +258,14 @@ public class TopologyAssign implements Runnable {
* @return
* @throws Exception
*/
- private Set<String> get_cleanup_ids(StormClusterState clusterState,
- List<String> active_topologys) throws Exception {
+ private Set<String> get_cleanup_ids(StormClusterState clusterState, List<String> active_topologys) throws Exception {
List<String> task_ids = clusterState.task_storms();
List<String> heartbeat_ids = clusterState.heartbeat_storms();
List<String> error_ids = clusterState.task_error_storms();
List<String> assignment_ids = clusterState.assignments(null);
List<String> metric_ids = clusterState.get_metrics();
+ List<String> backpressure_ids = clusterState.backpressureInfos();
List<String> code_ids = new ArrayList<String>();
HashSet<String> latest_code_ids = new HashSet<String>();
@@ -272,8 +274,7 @@ public class TopologyAssign implements Runnable {
// Set<String> assigned_ids =
// JStormUtils.listToSet(clusterState.active_storms());
Set<String> to_cleanup_ids = new HashSet<String>();
- Set<String> pendingTopologys =
- nimbusData.getPendingSubmitTopoloygs().keySet();
+ Set<String> pendingTopologys = nimbusData.getPendingSubmitTopoloygs().keySet();
if (task_ids != null) {
to_cleanup_ids.addAll(task_ids);
@@ -294,9 +295,13 @@ public class TopologyAssign implements Runnable {
if (code_ids != null) {
to_cleanup_ids.addAll(code_ids);
}
-
+
if (metric_ids != null) {
- to_cleanup_ids.addAll(metric_ids);
+ to_cleanup_ids.addAll(metric_ids);
+ }
+
+ if (backpressure_ids != null) {
+ to_cleanup_ids.addAll(backpressure_ids);
}
if (active_topologys != null) {
@@ -309,8 +314,7 @@ public class TopologyAssign implements Runnable {
}
/**
- * Why need to remove latest code. Due to competition between
- * Thrift.threads and TopologyAssign thread
+ * Why need to remove latest code. Due to competition between Thrift.threads and TopologyAssign thread
*
*/
to_cleanup_ids.removeAll(latest_code_ids);
@@ -321,11 +325,6 @@ public class TopologyAssign implements Runnable {
/**
* start a topology: set active status of the topology
- *
- * @param topologyName
- * @param stormClusterState
- * @param topologyId
- * @throws Exception
*/
public void setTopologyStatus(TopologyAssignEvent event) throws Exception {
StormClusterState stormClusterState = nimbusData.getStormClusterState();
@@ -339,15 +338,11 @@ public class TopologyAssign implements Runnable {
status = event.getOldStatus();
}
- boolean isEnable =
- ConfigExtension
- .isEnablePerformanceMetrics(nimbusData.getConf());
+ boolean isEnable = ConfigExtension.isEnablePerformanceMetrics(nimbusData.getConf());
StormBase stormBase = stormClusterState.storm_base(topologyId, null);
if (stormBase == null) {
- stormBase =
- new StormBase(topologyName, TimeUtils.current_time_secs(),
- status, group);
+ stormBase = new StormBase(topologyName, TimeUtils.current_time_secs(), status, group);
stormBase.setEnableMonitor(isEnable);
stormClusterState.activate_storm(topologyId, stormBase);
@@ -367,18 +362,20 @@ public class TopologyAssign implements Runnable {
}
- protected TopologyAssignContext prepareTopologyAssign(
- TopologyAssignEvent event) throws Exception {
+ protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception {
TopologyAssignContext ret = new TopologyAssignContext();
String topologyId = event.getTopologyId();
+ ret.setTopologyId(topologyId);
+
+ int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
+ ret.setTopologyMasterTaskId(topoMasterId);
+ LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);
Map<Object, Object> nimbusConf = nimbusData.getConf();
- Map<Object, Object> topologyConf =
- StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
+ Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
- StormTopology rawTopology =
- StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
+ StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
ret.setRawTopology(rawTopology);
Map stormConf = new HashMap();
@@ -389,8 +386,7 @@ public class TopologyAssign implements Runnable {
StormClusterState stormClusterState = nimbusData.getStormClusterState();
// get all running supervisor, don't need callback to watch supervisor
- Map<String, SupervisorInfo> supInfos =
- Cluster.get_all_SupervisorInfo(stormClusterState, null);
+ Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
SupervisorInfo supervisor = supInfo.getValue();
@@ -400,21 +396,16 @@ public class TopologyAssign implements Runnable {
getAliveSupervsByHb(supInfos, nimbusConf);
if (supInfos.size() == 0) {
- throw new FailedAssignTopologyException(
- "Failed to make assignment " + topologyId
- + ", due to no alive supervisor");
+ throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor");
}
- Map<Integer, String> taskToComponent =
- Cluster.get_all_task_component(stormClusterState, topologyId, null);
+ Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);
// get taskids /ZK/tasks/topologyId
Set<Integer> allTaskIds = taskToComponent.keySet();
if (allTaskIds == null || allTaskIds.size() == 0) {
- String errMsg =
- "Failed to get all task ID list from /ZK-dir/tasks/"
- + topologyId;
+ String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId;
LOG.warn(errMsg);
throw new IOException(errMsg);
}
@@ -425,18 +416,31 @@ public class TopologyAssign implements Runnable {
// machine
Set<Integer> unstoppedTasks = new HashSet<Integer>();
Set<Integer> deadTasks = new HashSet<Integer>();
- Set<ResourceWorkerSlot> unstoppedWorkers =
- new HashSet<ResourceWorkerSlot>();
+ Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();
- Assignment existingAssignment =
- stormClusterState.assignment_info(topologyId, null);
+ Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null);
if (existingAssignment != null) {
aliveTasks = getAliveTasks(topologyId, allTaskIds);
- unstoppedTasks =
- getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
- deadTasks.addAll(allTaskIds);
- deadTasks.removeAll(aliveTasks);
+ /*
+ * Check if the topology master task is alive first since all task
+ * heartbeat info is reported by topology master.
+ * If master is dead, do reassignment for topology master first.
+ */
+ if (aliveTasks.contains(topoMasterId) == false) {
+ ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId);
+ deadTasks.addAll(worker.getTasks());
+
+ Set<Integer> tempSet = new HashSet<Integer>(allTaskIds);
+ tempSet.removeAll(deadTasks);
+ aliveTasks.addAll(tempSet);
+ aliveTasks.removeAll(deadTasks);
+ } else {
+ deadTasks.addAll(allTaskIds);
+ deadTasks.removeAll(aliveTasks);
+ }
+
+ unstoppedTasks = getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
}
ret.setDeadTaskIds(deadTasks);
@@ -451,9 +455,7 @@ public class TopologyAssign implements Runnable {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_NEW);
try {
- AssignmentBak lastAssignment =
- stormClusterState.assignment_bak(event
- .getTopologyName());
+ AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName());
if (lastAssignment != null) {
ret.setOldAssignment(lastAssignment.getAssignment());
}
@@ -465,13 +467,11 @@ public class TopologyAssign implements Runnable {
if (event.isScratch()) {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_REBALANCE);
ret.setIsReassign(event.isReassign());
- unstoppedWorkers =
- getUnstoppedWorkers(unstoppedTasks, existingAssignment);
+ unstoppedWorkers = getUnstoppedWorkers(unstoppedTasks, existingAssignment);
ret.setUnstoppedWorkers(unstoppedWorkers);
} else {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_MONITOR);
- unstoppedWorkers =
- getUnstoppedWorkers(aliveTasks, existingAssignment);
+ unstoppedWorkers = getUnstoppedWorkers(aliveTasks, existingAssignment);
ret.setUnstoppedWorkers(unstoppedWorkers);
}
}
@@ -480,13 +480,8 @@ public class TopologyAssign implements Runnable {
}
/**
- * make assignments for a topology The nimbus core function, this function
- * has been totally rewrite
+ * make assignments for a topology The nimbus core function, this function has been totally rewrite
*
- * @param nimbusData NimbusData
- * @param topologyId String
- * @param isScratch Boolean: isScratch is false unless rebalancing the
- * topology
* @throws Exception
*/
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
@@ -500,8 +495,7 @@ public class TopologyAssign implements Runnable {
if (!StormConfig.local_mode(nimbusData.getConf())) {
- IToplogyScheduler scheduler =
- schedulers.get(DEFAULT_SCHEDULER_NAME);
+ IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
assignments = scheduler.assignTasks(context);
@@ -511,29 +505,24 @@ public class TopologyAssign implements Runnable {
Assignment assignment = null;
if (assignments != null && assignments.size() > 0) {
- Map<String, String> nodeHost =
- getTopologyNodeHost(context.getCluster(),
- context.getOldAssignment(), assignments);
+ Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
- Map<Integer, Integer> startTimes =
- getTaskStartTimes(context, nimbusData, topologyId,
- context.getOldAssignment(), assignments);
+ Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments);
- String codeDir =
- StormConfig.masterStormdistRoot(nimbusData.getConf(),
- topologyId);
+ String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId);
- assignment =
- new Assignment(codeDir, assignments, nodeHost, startTimes);
+ assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
- StormClusterState stormClusterState =
- nimbusData.getStormClusterState();
+ // the topology binary changed.
+ if (event.isScaleTopology()){
+ assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
+ }
+ StormClusterState stormClusterState = nimbusData.getStormClusterState();
stormClusterState.set_assignment(topologyId, assignment);
// update task heartbeat's start time
- NimbusUtils.updateTaskHbStartTime(nimbusData, assignment,
- topologyId);
+ NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
// @@@ TODO
@@ -547,14 +536,13 @@ public class TopologyAssign implements Runnable {
NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId);
- LOG.info("Successfully make assignment for topology id "
- + topologyId + ": " + assignment);
+ LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
}
return assignment;
}
private static Set<ResourceWorkerSlot> mkLocalAssignment(
- TopologyAssignContext context) {
+ TopologyAssignContext context) throws Exception {
Set<ResourceWorkerSlot> result = new HashSet<ResourceWorkerSlot>();
Map<String, SupervisorInfo> cluster = context.getCluster();
if (cluster.size() != 1)
@@ -565,7 +553,15 @@ public class TopologyAssign implements Runnable {
supervisorId = entry.getKey();
localSupervisor = entry.getValue();
}
- int port = localSupervisor.getAvailableWorkerPorts().iterator().next();
+ int port = -1;
+ if (localSupervisor.getAvailableWorkerPorts().iterator().hasNext()) {
+ port = localSupervisor.getAvailableWorkerPorts().iterator().next();
+ } else {
+ LOG.info(" amount of worker's ports is not enough");
+ throw new FailedAssignTopologyException(
+ "Failed to make assignment " + ", due to no enough ports");
+ }
+
ResourceWorkerSlot worker = new ResourceWorkerSlot(supervisorId, port);
worker.setTasks(new HashSet<Integer>(context.getAllTaskIds()));
worker.setHostname(localSupervisor.getHostName());
@@ -573,16 +569,8 @@ public class TopologyAssign implements Runnable {
return result;
}
- /**
- * @param existingAssignment
- * @param taskWorkerSlot
- * @return
- * @throws Exception
- */
- public static Map<Integer, Integer> getTaskStartTimes(
- TopologyAssignContext context, NimbusData nimbusData,
- String topologyId, Assignment existingAssignment,
- Set<ResourceWorkerSlot> workers) throws Exception {
+ public static Map<Integer, Integer> getTaskStartTimes(TopologyAssignContext context, NimbusData nimbusData, String topologyId,
+ Assignment existingAssignment, Set<ResourceWorkerSlot> workers) throws Exception {
Map<Integer, Integer> startTimes = new TreeMap<Integer, Integer>();
@@ -600,8 +588,7 @@ public class TopologyAssign implements Runnable {
Set<ResourceWorkerSlot> oldWorkers = new HashSet<ResourceWorkerSlot>();
if (existingAssignment != null) {
- Map<Integer, Integer> taskStartTimeSecs =
- existingAssignment.getTaskStartTimeSecs();
+ Map<Integer, Integer> taskStartTimeSecs = existingAssignment.getTaskStartTimeSecs();
if (taskStartTimeSecs != null) {
startTimes.putAll(taskStartTimeSecs);
}
@@ -616,23 +603,21 @@ public class TopologyAssign implements Runnable {
int nowSecs = TimeUtils.current_time_secs();
for (Integer changedTaskId : changedTaskIds) {
startTimes.put(changedTaskId, nowSecs);
- zkClusterState.remove_task_heartbeat(topologyId, changedTaskId);
+ NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, changedTaskId);
}
Set<Integer> removedTaskIds = getRemovedTaskIds(oldWorkers, workers);
for (Integer removedTaskId : removedTaskIds) {
startTimes.remove(removedTaskId);
- zkClusterState.remove_task_heartbeat(topologyId, removedTaskId);
+ NimbusUtils.removeTopologyTaskHb(nimbusData, topologyId, removedTaskId);
}
- LOG.info("Task assignment has been changed: " + changedTaskIds
- + ", removed tasks " + removedTaskIds);
+ LOG.info("Task assignment has been changed: " + changedTaskIds + ", removed tasks " + removedTaskIds);
return startTimes;
}
- public static Map<String, String> getTopologyNodeHost(
- Map<String, SupervisorInfo> supervisorMap,
- Assignment existingAssignment, Set<ResourceWorkerSlot> workers) {
+ public static Map<String, String> getTopologyNodeHost(Map<String, SupervisorInfo> supervisorMap, Assignment existingAssignment,
+ Set<ResourceWorkerSlot> workers) {
// the following is that remove unused node from allNodeHost
Set<String> usedNodes = new HashSet<String>();
@@ -649,8 +634,7 @@ public class TopologyAssign implements Runnable {
}
// get alive supervisorMap Map<supervisorId, hostname>
- Map<String, String> nodeHost =
- SupervisorInfo.getNodeHost(supervisorMap);
+ Map<String, String> nodeHost = SupervisorInfo.getNodeHost(supervisorMap);
if (nodeHost != null) {
allNodeHost.putAll(nodeHost);
}
@@ -661,8 +645,7 @@ public class TopologyAssign implements Runnable {
if (allNodeHost.containsKey(supervisorId)) {
ret.put(supervisorId, allNodeHost.get(supervisorId));
} else {
- LOG.warn("Node " + supervisorId
- + " doesn't in the supervisor list");
+ LOG.warn("Node " + supervisorId + " doesn't in the supervisor list");
}
}
@@ -672,16 +655,12 @@ public class TopologyAssign implements Runnable {
/**
* get all taskids which are assigned newly or reassigned
*
- * @param taskToWorkerSlot
- * @param newtaskToWorkerSlot
* @return Set<Integer> taskid which is assigned newly or reassigned
*/
- public static Set<Integer> getNewOrChangedTaskIds(
- Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
+ public static Set<Integer> getNewOrChangedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
Set<Integer> rtn = new HashSet<Integer>();
- HashMap<String, ResourceWorkerSlot> workerPortMap =
- HostPortToWorkerMap(oldWorkers);
+ HashMap<String, ResourceWorkerSlot> workerPortMap = HostPortToWorkerMap(oldWorkers);
for (ResourceWorkerSlot worker : workers) {
ResourceWorkerSlot oldWorker = workerPortMap.get(worker.getHostPort());
if (oldWorker != null) {
@@ -691,14 +670,15 @@ public class TopologyAssign implements Runnable {
rtn.add(task);
}
} else {
- rtn.addAll(worker.getTasks());
+ if (worker.getTasks() != null) {
+ rtn.addAll(worker.getTasks());
+ }
}
}
return rtn;
}
- public static Set<Integer> getRemovedTaskIds(
- Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
+ public static Set<Integer> getRemovedTaskIds(Set<ResourceWorkerSlot> oldWorkers, Set<ResourceWorkerSlot> workers) {
Set<Integer> rtn = new HashSet<Integer>();
Set<Integer> oldTasks = getTaskSetFromWorkerSet(oldWorkers);
@@ -711,8 +691,7 @@ public class TopologyAssign implements Runnable {
return rtn;
}
- private static Set<Integer> getTaskSetFromWorkerSet(
- Set<ResourceWorkerSlot> workers) {
+ private static Set<Integer> getTaskSetFromWorkerSet(Set<ResourceWorkerSlot> workers) {
Set<Integer> rtn = new HashSet<Integer>();
for (ResourceWorkerSlot worker : workers) {
rtn.addAll(worker.getTasks());
@@ -720,10 +699,8 @@ public class TopologyAssign implements Runnable {
return rtn;
}
- private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap(
- Set<ResourceWorkerSlot> workers) {
- HashMap<String, ResourceWorkerSlot> rtn =
- new HashMap<String, ResourceWorkerSlot>();
+ private static HashMap<String, ResourceWorkerSlot> HostPortToWorkerMap(Set<ResourceWorkerSlot> workers) {
+ HashMap<String, ResourceWorkerSlot> rtn = new HashMap<String, ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : workers) {
rtn.put(worker.getHostPort(), worker);
}
@@ -731,17 +708,14 @@ public class TopologyAssign implements Runnable {
}
/**
- * sort slots, the purpose is to ensure that the tasks are assigned in
- * balancing
+ * sort slots, the purpose is to ensure that the tasks are assigned in balancing
*
* @param allSlots
* @return List<WorkerSlot>
*/
- public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots,
- int needSlotNum) {
+ public static List<WorkerSlot> sortSlots(Set<WorkerSlot> allSlots, int needSlotNum) {
- Map<String, List<WorkerSlot>> nodeMap =
- new HashMap<String, List<WorkerSlot>>();
+ Map<String, List<WorkerSlot>> nodeMap = new HashMap<String, List<WorkerSlot>>();
// group by first
for (WorkerSlot np : allSlots) {
@@ -778,8 +752,7 @@ public class TopologyAssign implements Runnable {
}
// interleave
- List<List<WorkerSlot>> splitup =
- new ArrayList<List<WorkerSlot>>(nodeMap.values());
+ List<List<WorkerSlot>> splitup = new ArrayList<List<WorkerSlot>>(nodeMap.values());
Collections.sort(splitup, new Comparator<List<WorkerSlot>>() {
@@ -801,13 +774,8 @@ public class TopologyAssign implements Runnable {
/**
* Get unstopped slots from alive task list
- *
- * @param aliveAssigned
- * @param supInfos
- * @return
*/
- public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks,
- Map<String, SupervisorInfo> supInfos, Assignment existAssignment) {
+ public Set<Integer> getUnstoppedSlots(Set<Integer> aliveTasks, Map<String, SupervisorInfo> supInfos, Assignment existAssignment) {
Set<Integer> ret = new HashSet<Integer>();
Set<ResourceWorkerSlot> oldWorkers = existAssignment.getWorkers();
@@ -835,8 +803,7 @@ public class TopologyAssign implements Runnable {
}
- private Set<ResourceWorkerSlot> getUnstoppedWorkers(
- Set<Integer> aliveTasks, Assignment existAssignment) {
+ private Set<ResourceWorkerSlot> getUnstoppedWorkers(Set<Integer> aliveTasks, Assignment existAssignment) {
Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : existAssignment.getWorkers()) {
boolean alive = true;
@@ -860,12 +827,9 @@ public class TopologyAssign implements Runnable {
* @param stormClusterState
* @throws Exception
*/
- public static void getFreeSlots(
- Map<String, SupervisorInfo> supervisorInfos,
- StormClusterState stormClusterState) throws Exception {
+ public static void getFreeSlots(Map<String, SupervisorInfo> supervisorInfos, StormClusterState stormClusterState) throws Exception {
- Map<String, Assignment> assignments =
- Cluster.get_all_assignment(stormClusterState, null);
+ Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, null);
for (Entry<String, Assignment> entry : assignments.entrySet()) {
String topologyId = entry.getKey();
@@ -875,8 +839,7 @@ public class TopologyAssign implements Runnable {
for (ResourceWorkerSlot worker : workers) {
- SupervisorInfo supervisorInfo =
- supervisorInfos.get(worker.getNodeId());
+ SupervisorInfo supervisorInfo = supervisorInfos.get(worker.getNodeId());
if (supervisorInfo == null) {
// the supervisor is dead
continue;
@@ -888,31 +851,20 @@ public class TopologyAssign implements Runnable {
}
/**
- * find all alived taskid Does not assume that clocks are synchronized. Task
- * heartbeat is only used so that nimbus knows when it's received a new
- * heartbeat. All timing is done by nimbus and tracked through
- * task-heartbeat-cache
+ * find all alived taskid Does not assume that clocks are synchronized. Task heartbeat is only used so that nimbus knows when it's received a new heartbeat.
+ * All timing is done by nimbus and tracked through task-heartbeat-cache
*
- * @param conf
- * @param topologyId
- * @param stormClusterState
- * @param taskIds
- * @param taskStartTimes
- * @param taskHeartbeatsCache --Map<topologyId, Map<taskid,
- * Map<tkHbCacheTime, time>>>
* @return Set<Integer> : taskid
* @throws Exception
*/
- public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds)
- throws Exception {
+ public Set<Integer> getAliveTasks(String topologyId, Set<Integer> taskIds) throws Exception {
Set<Integer> aliveTasks = new HashSet<Integer>();
// taskIds is the list from ZK /ZK-DIR/tasks/topologyId
for (int taskId : taskIds) {
- boolean isDead =
- NimbusUtils.isTaskDead(nimbusData, topologyId, taskId);
+ boolean isDead = NimbusUtils.isTaskDead(nimbusData, topologyId, taskId);
if (isDead == false) {
aliveTasks.add(taskId);
}
@@ -925,24 +877,20 @@ public class TopologyAssign implements Runnable {
/**
* Backup the toplogy's Assignment to ZK
*
- * @@@ Question Do we need to do backup operation every time?
* @param assignment
* @param event
+ * @@@ Question Do we need to do backup operation every time?
*/
- public void backupAssignment(Assignment assignment,
- TopologyAssignEvent event) {
+ public void backupAssignment(Assignment assignment, TopologyAssignEvent event) {
String topologyId = event.getTopologyId();
String topologyName = event.getTopologyName();
try {
- StormClusterState zkClusterState =
- nimbusData.getStormClusterState();
+ StormClusterState zkClusterState = nimbusData.getStormClusterState();
// one little problem, get tasks twice when assign one topology
- Map<Integer, String> tasks =
- Cluster.get_all_task_component(zkClusterState, topologyId, null);
+ Map<Integer, String> tasks = Cluster.get_all_task_component(zkClusterState, topologyId, null);
- Map<String, List<Integer>> componentTasks =
- JStormUtils.reverse_map(tasks);
+ Map<String, List<Integer>> componentTasks = JStormUtils.reverse_map(tasks);
for (Entry<String, List<Integer>> entry : componentTasks.entrySet()) {
List<Integer> keys = entry.getValue();
@@ -951,31 +899,24 @@ public class TopologyAssign implements Runnable {
}
- AssignmentBak assignmentBak =
- new AssignmentBak(componentTasks, assignment);
+ AssignmentBak assignmentBak = new AssignmentBak(componentTasks, assignment);
zkClusterState.backup_assignment(topologyName, assignmentBak);
} catch (Exception e) {
- LOG.warn("Failed to backup " + topologyId + " assignment "
- + assignment, e);
+ LOG.warn("Failed to backup " + topologyId + " assignment " + assignment, e);
}
}
- private void getAliveSupervsByHb(
- Map<String, SupervisorInfo> supervisorInfos, Map conf) {
+ private void getAliveSupervsByHb(Map<String, SupervisorInfo> supervisorInfos, Map conf) {
int currentTime = TimeUtils.current_time_secs();
- int hbTimeout =
- JStormUtils.parseInt(
- conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS),
- (JStormUtils.MIN_1 * 3));
+ int hbTimeout = JStormUtils.parseInt(conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS), (JStormUtils.MIN_1 * 3));
Set<String> supervisorTobeRemoved = new HashSet<String>();
for (Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) {
SupervisorInfo supInfo = entry.getValue();
int lastReportTime = supInfo.getTimeSecs();
if ((currentTime - lastReportTime) > hbTimeout) {
- LOG.warn("Supervisor-" + supInfo.getHostName()
- + " is dead. lastReportTime=" + lastReportTime);
+ LOG.warn("Supervisor-" + supInfo.getHostName() + " is dead. lastReportTime=" + lastReportTime);
supervisorTobeRemoved.add(entry.getKey());
}
}
@@ -989,8 +930,6 @@ public class TopologyAssign implements Runnable {
* @param args
*/
public static void main(String[] args) {
- // TODO Auto-generated method stub
-
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
index 8725918..a0bf9b9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyAssignEvent.java
@@ -25,7 +25,7 @@ import com.alibaba.jstorm.cluster.StormStatus;
public class TopologyAssignEvent {
// unit is minutes
- private static final int DEFAULT_WAIT_TIME = 2;
+ private static final int DEFAULT_WAIT_TIME = 5;
private String topologyId;
private String topologyName; // if this field has been set, it is create
private String group;
@@ -37,6 +37,14 @@ public class TopologyAssignEvent {
private CountDownLatch latch = new CountDownLatch(1);
private boolean isSuccess = false;
private String errorMsg;
+ private boolean isScaleTopology = false;
+
+ public void setScaleTopology(boolean isScaleTopology){
+ this.isScaleTopology = isScaleTopology;
+ }
+ public boolean isScaleTopology(){
+ return isScaleTopology;
+ }
public String getTopologyId() {
return topologyId;