You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 08:57:08 UTC
[6/8] storm git commit: Fixed Race
Fixed Race
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/984a322a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/984a322a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/984a322a
Branch: refs/heads/master
Commit: 984a322ac59afe11aa12e55080fa35d06603895a
Parents: dafeda5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 08:46:38 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Dec 7 17:16:48 2016 -0600
----------------------------------------------------------------------
.../org/apache/storm/daemon/nimbus/Nimbus.java | 166 +++++++++----------
1 file changed, 83 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/984a322a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
index 04ed7e0..719f21c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1751,93 +1751,93 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Map<String, SchedulerAssignment> newSchedulerAssignments = null;
synchronized (schedLock) {
newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
- }
- Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
- for (String id: assignedTopologyIds) {
- if (!topologyToExecutorToNodePort.containsKey(id)) {
- topologyToExecutorToNodePort.put(id, null);
- }
- }
- Map<String, Map<List<Object>, List<Double>>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments);
- int nowSecs = Time.currentTimeSecs();
- Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
- //construct the final Assignments by adding start-times etc into it
- Map<String, Assignment> newAssignments = new HashMap<>();
- for (Entry<String, Map<List<Long>, List<Object>>> entry: topologyToExecutorToNodePort.entrySet()) {
- String topoId = entry.getKey();
- Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
- Assignment existingAssignment = existingAssignments.get(topoId);
- Set<String> allNodes = new HashSet<>();
- if (execToNodePort != null) {
- for (List<Object> nodePort: execToNodePort.values()) {
- allNodes.add((String) nodePort.get(0));
+
+ Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
+ for (String id: assignedTopologyIds) {
+ if (!topologyToExecutorToNodePort.containsKey(id)) {
+ topologyToExecutorToNodePort.put(id, null);
}
}
- Map<String, String> allNodeHost = new HashMap<>();
- if (existingAssignment != null) {
- allNodeHost.putAll(existingAssignment.get_node_host());
- }
- for (String node: allNodes) {
- String host = inimbus.getHostName(basicSupervisorDetailsMap, node);
- if (host != null) {
- allNodeHost.put(node, host);
+ Map<String, Map<List<Object>, List<Double>>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments);
+ int nowSecs = Time.currentTimeSecs();
+ Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
+ //construct the final Assignments by adding start-times etc into it
+ Map<String, Assignment> newAssignments = new HashMap<>();
+ for (Entry<String, Map<List<Long>, List<Object>>> entry: topologyToExecutorToNodePort.entrySet()) {
+ String topoId = entry.getKey();
+ Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
+ Assignment existingAssignment = existingAssignments.get(topoId);
+ Set<String> allNodes = new HashSet<>();
+ if (execToNodePort != null) {
+ for (List<Object> nodePort: execToNodePort.values()) {
+ allNodes.add((String) nodePort.get(0));
+ }
+ }
+ Map<String, String> allNodeHost = new HashMap<>();
+ if (existingAssignment != null) {
+ allNodeHost.putAll(existingAssignment.get_node_host());
}
+ for (String node: allNodes) {
+ String host = inimbus.getHostName(basicSupervisorDetailsMap, node);
+ if (host != null) {
+ allNodeHost.put(node, host);
+ }
+ }
+ Map<List<Long>, NodeInfo> execNodeInfo = null;
+ if (existingAssignment != null) {
+ execNodeInfo = existingAssignment.get_executor_node_port();
+ }
+ List<List<Long>> reassignExecutors = changedExecutors(execNodeInfo, execToNodePort);
+ Map<List<Long>, Long> startTimes = new HashMap<>();
+ if (existingAssignment != null) {
+ startTimes.putAll(existingAssignment.get_executor_start_time_secs());
+ }
+ for (List<Long> id: reassignExecutors) {
+ startTimes.put(id, (long)nowSecs);
+ }
+ Map<List<Object>, List<Double>> workerToResources = newAssignedWorkerToResources.get(topoId);
+ Assignment newAssignment = new Assignment((String)conf.get(Config.STORM_LOCAL_DIR));
+ Map<String, String> justAssignedKeys = new HashMap<>(allNodeHost);
+ //Modifies justAssignedKeys
+ justAssignedKeys.keySet().retainAll(allNodes);
+ newAssignment.set_node_host(justAssignedKeys);
+ //convert NodePort to NodeInfo (again!!!).
+ Map<List<Long>, NodeInfo> execToNodeInfo = new HashMap<>();
+ for (Entry<List<Long>, List<Object>> execAndNodePort: execToNodePort.entrySet()) {
+ List<Object> nodePort = execAndNodePort.getValue();
+ NodeInfo ni = new NodeInfo();
+ ni.set_node((String) nodePort.get(0));
+ ni.add_to_port((Long)nodePort.get(1));
+ execToNodeInfo.put(execAndNodePort.getKey(), ni);
+ }
+ newAssignment.set_executor_node_port(execToNodeInfo);
+ newAssignment.set_executor_start_time_secs(startTimes);
+ //do another conversion (lets just make this all common)
+ Map<NodeInfo, WorkerResources> workerResources = new HashMap<>();
+ for (Entry<List<Object>, List<Double>> wr: workerToResources.entrySet()) {
+ List<Object> nodePort = wr.getKey();
+ NodeInfo ni = new NodeInfo();
+ ni.set_node((String) nodePort.get(0));
+ ni.add_to_port((Long) nodePort.get(1));
+ List<Double> r = wr.getValue();
+ WorkerResources resources = new WorkerResources();
+ resources.set_mem_on_heap(r.get(0));
+ resources.set_mem_off_heap(r.get(1));
+ resources.set_cpu(r.get(2));
+ workerResources.put(ni, resources);
+ }
+ newAssignment.set_worker_resources(workerResources);
+ newAssignments.put(topoId, newAssignment);
}
- Map<List<Long>, NodeInfo> execNodeInfo = null;
- if (existingAssignment != null) {
- execNodeInfo = existingAssignment.get_executor_node_port();
- }
- List<List<Long>> reassignExecutors = changedExecutors(execNodeInfo, execToNodePort);
- Map<List<Long>, Long> startTimes = new HashMap<>();
- if (existingAssignment != null) {
- startTimes.putAll(existingAssignment.get_executor_start_time_secs());
- }
- for (List<Long> id: reassignExecutors) {
- startTimes.put(id, (long)nowSecs);
- }
- Map<List<Object>, List<Double>> workerToResources = newAssignedWorkerToResources.get(topoId);
- Assignment newAssignment = new Assignment((String)conf.get(Config.STORM_LOCAL_DIR));
- Map<String, String> justAssignedKeys = new HashMap<>(allNodeHost);
- //Modifies justAssignedKeys
- justAssignedKeys.keySet().retainAll(allNodes);
- newAssignment.set_node_host(justAssignedKeys);
- //convert NodePort to NodeInfo (again!!!).
- Map<List<Long>, NodeInfo> execToNodeInfo = new HashMap<>();
- for (Entry<List<Long>, List<Object>> execAndNodePort: execToNodePort.entrySet()) {
- List<Object> nodePort = execAndNodePort.getValue();
- NodeInfo ni = new NodeInfo();
- ni.set_node((String) nodePort.get(0));
- ni.add_to_port((Long)nodePort.get(1));
- execToNodeInfo.put(execAndNodePort.getKey(), ni);
- }
- newAssignment.set_executor_node_port(execToNodeInfo);
- newAssignment.set_executor_start_time_secs(startTimes);
- //do another conversion (lets just make this all common)
- Map<NodeInfo, WorkerResources> workerResources = new HashMap<>();
- for (Entry<List<Object>, List<Double>> wr: workerToResources.entrySet()) {
- List<Object> nodePort = wr.getKey();
- NodeInfo ni = new NodeInfo();
- ni.set_node((String) nodePort.get(0));
- ni.add_to_port((Long) nodePort.get(1));
- List<Double> r = wr.getValue();
- WorkerResources resources = new WorkerResources();
- resources.set_mem_on_heap(r.get(0));
- resources.set_mem_off_heap(r.get(1));
- resources.set_cpu(r.get(2));
- workerResources.put(ni, resources);
- }
- newAssignment.set_worker_resources(workerResources);
- newAssignments.put(topoId, newAssignment);
- }
-
- if (!newAssignments.equals(existingAssignments)) {
- LOG.debug("RESETTING id->resources and id->worker-resources cache!");
- idToResources.set(new HashMap<>());
- idToWorkerResources.set(new HashMap<>());
- }
- //tasks figure out what tasks to talk to by looking at topology at runtime
- // only log/set when there's been a change to the assignment
- synchronized (schedLock) {
+
+ if (!newAssignments.equals(existingAssignments)) {
+ LOG.debug("RESETTING id->resources and id->worker-resources cache!");
+ idToResources.set(new HashMap<>());
+ idToWorkerResources.set(new HashMap<>());
+ }
+
+ //tasks figure out what tasks to talk to by looking at topology at runtime
+ // only log/set when there's been a change to the assignment
for (Entry<String, Assignment> entry: newAssignments.entrySet()) {
String topoId = entry.getKey();
Assignment assignment = entry.getValue();