You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 08:57:08 UTC

[6/8] storm git commit: Fixed Race

Fixed Race


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/984a322a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/984a322a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/984a322a

Branch: refs/heads/master
Commit: 984a322ac59afe11aa12e55080fa35d06603895a
Parents: dafeda5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Dec 2 08:46:38 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Dec 7 17:16:48 2016 -0600

----------------------------------------------------------------------
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 166 +++++++++----------
 1 file changed, 83 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/984a322a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
index 04ed7e0..719f21c 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1751,93 +1751,93 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         Map<String, SchedulerAssignment> newSchedulerAssignments = null;
         synchronized (schedLock) {
             newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
-        }
-        Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
-        for (String id: assignedTopologyIds) {
-            if (!topologyToExecutorToNodePort.containsKey(id)) {
-                topologyToExecutorToNodePort.put(id, null);
-            }
-        }
-        Map<String, Map<List<Object>, List<Double>>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments);
-        int nowSecs = Time.currentTimeSecs();
-        Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
-        //construct the final Assignments by adding start-times etc into it
-        Map<String, Assignment> newAssignments  = new HashMap<>();
-        for (Entry<String, Map<List<Long>, List<Object>>> entry: topologyToExecutorToNodePort.entrySet()) {
-            String topoId = entry.getKey();
-            Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
-            Assignment existingAssignment = existingAssignments.get(topoId);
-            Set<String> allNodes = new HashSet<>();
-            if (execToNodePort != null) {
-                for (List<Object> nodePort: execToNodePort.values()) {
-                    allNodes.add((String) nodePort.get(0));
+
+            Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments);
+            for (String id: assignedTopologyIds) {
+                if (!topologyToExecutorToNodePort.containsKey(id)) {
+                    topologyToExecutorToNodePort.put(id, null);
                 }
             }
-            Map<String, String> allNodeHost = new HashMap<>();
-            if (existingAssignment != null) {
-                allNodeHost.putAll(existingAssignment.get_node_host());
-            }
-            for (String node: allNodes) {
-                String host = inimbus.getHostName(basicSupervisorDetailsMap, node);
-                if (host != null) {
-                    allNodeHost.put(node, host);
+            Map<String, Map<List<Object>, List<Double>>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments);
+            int nowSecs = Time.currentTimeSecs();
+            Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
+            //construct the final Assignments by adding start-times etc into it
+            Map<String, Assignment> newAssignments  = new HashMap<>();
+            for (Entry<String, Map<List<Long>, List<Object>>> entry: topologyToExecutorToNodePort.entrySet()) {
+                String topoId = entry.getKey();
+                Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
+                Assignment existingAssignment = existingAssignments.get(topoId);
+                Set<String> allNodes = new HashSet<>();
+                if (execToNodePort != null) {
+                    for (List<Object> nodePort: execToNodePort.values()) {
+                        allNodes.add((String) nodePort.get(0));
+                    }
+                }
+                Map<String, String> allNodeHost = new HashMap<>();
+                if (existingAssignment != null) {
+                    allNodeHost.putAll(existingAssignment.get_node_host());
                 }
+                for (String node: allNodes) {
+                    String host = inimbus.getHostName(basicSupervisorDetailsMap, node);
+                    if (host != null) {
+                        allNodeHost.put(node, host);
+                    }
+                }
+                Map<List<Long>, NodeInfo> execNodeInfo = null;
+                if (existingAssignment != null) {
+                    execNodeInfo = existingAssignment.get_executor_node_port();
+                }
+                List<List<Long>> reassignExecutors = changedExecutors(execNodeInfo, execToNodePort);
+                Map<List<Long>, Long> startTimes = new HashMap<>();
+                if (existingAssignment != null) {
+                    startTimes.putAll(existingAssignment.get_executor_start_time_secs());
+                }
+                for (List<Long> id: reassignExecutors) {
+                    startTimes.put(id, (long)nowSecs);
+                }
+                Map<List<Object>, List<Double>> workerToResources = newAssignedWorkerToResources.get(topoId);
+                Assignment newAssignment = new Assignment((String)conf.get(Config.STORM_LOCAL_DIR));
+                Map<String, String> justAssignedKeys = new HashMap<>(allNodeHost);
+                //Modifies justAssignedKeys
+                justAssignedKeys.keySet().retainAll(allNodes);
+                newAssignment.set_node_host(justAssignedKeys);
+                //convert NodePort to NodeInfo (again!!!).
+                Map<List<Long>, NodeInfo> execToNodeInfo = new HashMap<>();
+                for (Entry<List<Long>, List<Object>> execAndNodePort: execToNodePort.entrySet()) {
+                    List<Object> nodePort = execAndNodePort.getValue();
+                    NodeInfo ni = new NodeInfo();
+                    ni.set_node((String) nodePort.get(0));
+                    ni.add_to_port((Long)nodePort.get(1));
+                    execToNodeInfo.put(execAndNodePort.getKey(), ni);
+                }
+                newAssignment.set_executor_node_port(execToNodeInfo);
+                newAssignment.set_executor_start_time_secs(startTimes);
+                //do another conversion (lets just make this all common)
+                Map<NodeInfo, WorkerResources> workerResources = new HashMap<>();
+                for (Entry<List<Object>, List<Double>> wr: workerToResources.entrySet()) {
+                    List<Object> nodePort = wr.getKey();
+                    NodeInfo ni = new NodeInfo();
+                    ni.set_node((String) nodePort.get(0));
+                    ni.add_to_port((Long) nodePort.get(1));
+                    List<Double> r = wr.getValue();
+                    WorkerResources resources = new WorkerResources();
+                    resources.set_mem_on_heap(r.get(0));
+                    resources.set_mem_off_heap(r.get(1));
+                    resources.set_cpu(r.get(2));
+                    workerResources.put(ni, resources);
+                }
+                newAssignment.set_worker_resources(workerResources);
+                newAssignments.put(topoId, newAssignment);
             }
-            Map<List<Long>, NodeInfo> execNodeInfo = null;
-            if (existingAssignment != null) {
-                execNodeInfo = existingAssignment.get_executor_node_port();
-            }
-            List<List<Long>> reassignExecutors = changedExecutors(execNodeInfo, execToNodePort);
-            Map<List<Long>, Long> startTimes = new HashMap<>();
-            if (existingAssignment != null) {
-                startTimes.putAll(existingAssignment.get_executor_start_time_secs());
-            }
-            for (List<Long> id: reassignExecutors) {
-                startTimes.put(id, (long)nowSecs);
-            }
-            Map<List<Object>, List<Double>> workerToResources = newAssignedWorkerToResources.get(topoId);
-            Assignment newAssignment = new Assignment((String)conf.get(Config.STORM_LOCAL_DIR));
-            Map<String, String> justAssignedKeys = new HashMap<>(allNodeHost);
-            //Modifies justAssignedKeys
-            justAssignedKeys.keySet().retainAll(allNodes);
-            newAssignment.set_node_host(justAssignedKeys);
-            //convert NodePort to NodeInfo (again!!!).
-            Map<List<Long>, NodeInfo> execToNodeInfo = new HashMap<>();
-            for (Entry<List<Long>, List<Object>> execAndNodePort: execToNodePort.entrySet()) {
-                List<Object> nodePort = execAndNodePort.getValue();
-                NodeInfo ni = new NodeInfo();
-                ni.set_node((String) nodePort.get(0));
-                ni.add_to_port((Long)nodePort.get(1));
-                execToNodeInfo.put(execAndNodePort.getKey(), ni);
-            }
-            newAssignment.set_executor_node_port(execToNodeInfo);
-            newAssignment.set_executor_start_time_secs(startTimes);
-            //do another conversion (lets just make this all common)
-            Map<NodeInfo, WorkerResources> workerResources = new HashMap<>();
-            for (Entry<List<Object>, List<Double>> wr: workerToResources.entrySet()) {
-                List<Object> nodePort = wr.getKey();
-                NodeInfo ni = new NodeInfo();
-                ni.set_node((String) nodePort.get(0));
-                ni.add_to_port((Long) nodePort.get(1));
-                List<Double> r = wr.getValue();
-                WorkerResources resources = new WorkerResources();
-                resources.set_mem_on_heap(r.get(0));
-                resources.set_mem_off_heap(r.get(1));
-                resources.set_cpu(r.get(2));
-                workerResources.put(ni, resources);
-            }
-            newAssignment.set_worker_resources(workerResources);
-            newAssignments.put(topoId, newAssignment);
-        }
-
-        if (!newAssignments.equals(existingAssignments)) {
-            LOG.debug("RESETTING id->resources and id->worker-resources cache!");
-            idToResources.set(new HashMap<>());
-            idToWorkerResources.set(new HashMap<>());
-        }
-        //tasks figure out what tasks to talk to by looking at topology at runtime
-        // only log/set when there's been a change to the assignment
-        synchronized (schedLock) {
+
+            if (!newAssignments.equals(existingAssignments)) {
+                LOG.debug("RESETTING id->resources and id->worker-resources cache!");
+                idToResources.set(new HashMap<>());
+                idToWorkerResources.set(new HashMap<>());
+            }
+
+            //tasks figure out what tasks to talk to by looking at topology at runtime
+            // only log/set when there's been a change to the assignment
             for (Entry<String, Assignment> entry: newAssignments.entrySet()) {
                 String topoId = entry.getKey();
                 Assignment assignment = entry.getValue();