You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/01/19 17:52:58 UTC

[4/5] storm git commit: edits based on comments

edits based on comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05294108
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05294108
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05294108

Branch: refs/heads/master
Commit: 052941084bc7fcaee3d433d39d2bcc7a9953c69a
Parents: 4722c6f
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Jan 14 23:33:28 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 23:33:28 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  2 +-
 .../org/apache/storm/scheduler/WorkerSlot.java  |  2 +-
 .../storm/scheduler/resource/RAS_Node.java      |  5 +-
 .../DefaultResourceAwareStrategy.java           | 57 ++++++++------------
 .../strategies/scheduling/IStrategy.java        |  4 +-
 5 files changed, 29 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0555ed9..58a2a22 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -701,7 +701,7 @@
                        ;; making a map from node+port to WorkerSlot with allocated resources
                        node+port->slot (into {} (for [[[node port] [mem-on-heap mem-off-heap cpu]] worker->resources]
                                                   {[node port]
-                                                   (doto (WorkerSlot. node port mem-on-heap mem-off-heap cpu))}))
+                                                   (WorkerSlot. node port mem-on-heap mem-off-heap cpu)}))
                        executor->slot (into {} (for [[executor [node port]] executor->node+port]
                                                  ;; filter out the dead executors
                                                  (if (contains? alive-executors executor)

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 423764d..308af85 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -48,7 +48,7 @@ public class WorkerSlot {
     }
 
     public String getId() {
-        return this.getNodeId() + ":" + this.getPort();
+        return getNodeId() + ":" + getPort();
     }
 
     public double getAllocatedMemOnHeap() {

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index e33122b..8d37805 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -99,7 +99,7 @@ public class RAS_Node {
     }
 
     /**
-     * intializes resource usages on node
+     * initializes resource usages on node
      */
     private void intializeResources() {
         for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry : _topIdToUsedSlots.entrySet()) {
@@ -204,7 +204,7 @@ public class RAS_Node {
     /**
      * Free all slots on this node.  This will update the Cluster too.
      */
-     public void freeAllSlots() {
+    public void freeAllSlots() {
         if (!_isAlive) {
             LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
         }
@@ -269,7 +269,6 @@ public class RAS_Node {
             return 0.0;
         }
         Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
-        LOG.info("getMemoryUsedByWorker executors: {}", execs);
         double totalMemoryUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalMemoryUsed += topo.getTotalMemReqTask(exec);

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 86d12b0..9ecba47 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -114,23 +114,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
                     LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
                             new Object[] { exec, td.getExecutorToComponent().get(exec),
                     td.getTaskResourceReqList(exec), entry.getKey() });
-                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
-                    if (targetSlot != null) {
-                        NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
-                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
-                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
-                        }
-                       
-                        schedulerAssignmentMap.get(targetSlot).add(exec);
-                        targetNode.consumeResourcesforTask(exec, td);
-                        scheduledTasks.add(exec);
-                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                                targetNode, targetNode.getAvailableMemoryResources(),
-                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                                targetNode.getTotalCpuResources(), targetSlot);
-                    } else {
-                        LOG.error("Not Enough Resources to schedule Task {}", exec);
-                    }
+                    scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
                     it.remove();
                 }
             }
@@ -140,23 +124,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
         // schedule left over system tasks
         for (ExecutorDetails exec : executorsNotScheduled) {
-            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
-            if (targetSlot != null) {
-                NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
-                if(!schedulerAssignmentMap.containsKey(targetSlot)) {
-                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
-                }
-               
-                schedulerAssignmentMap.get(targetSlot).add(exec);
-                targetNode.consumeResourcesforTask(exec, td);
-                scheduledTasks.add(exec);
-                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                        targetNode, targetNode.getAvailableMemoryResources(),
-                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                        targetNode.getTotalCpuResources(), targetSlot);
-            } else {
-                LOG.error("Not Enough Resources to schedule Task {}", exec);
-            }
+            scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
         }
 
         SchedulingResult result;
@@ -177,6 +145,27 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return result;
     }
 
+    private void scheduleExecutor(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot,
+            Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) {
+        WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+        if (targetSlot != null) {
+            NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
+            if (!schedulerAssignmentMap.containsKey(targetSlot)) {
+                schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+            }
+
+            schedulerAssignmentMap.get(targetSlot).add(exec);
+            targetNode.consumeResourcesforTask(exec, td);
+            scheduledTasks.add(exec);
+            LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+                    targetNode, targetNode.getAvailableMemoryResources(),
+                    targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+                    targetNode.getTotalCpuResources(), targetSlot);
+        } else {
+            LOG.error("Not Enough Resources to schedule Task {}", exec);
+        }
+    }
+
     private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
       WorkerSlot ws;
       // first scheduling

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 58eb236..4a1180a 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -31,7 +31,7 @@ public interface IStrategy {
     /**
      * initialize prior to scheduling
      */
-    public void prepare(ClusterStateData clusterStateData);
+    void prepare(ClusterStateData clusterStateData);
 
     /**
      * This method is invoked to calcuate a scheduling for topology td
@@ -41,5 +41,5 @@ public interface IStrategy {
      * this map is the worker slot that the value (collection of executors) should be assigned to.
      * if a scheduling is calculated successfully, put the scheduling map in the SchedulingResult object.
      */
-    public SchedulingResult schedule(TopologyDetails td);
+    SchedulingResult schedule(TopologyDetails td);
 }