You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/01/19 17:52:58 UTC
[4/5] storm git commit: edits based on comments
edits based on comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05294108
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05294108
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05294108
Branch: refs/heads/master
Commit: 052941084bc7fcaee3d433d39d2bcc7a9953c69a
Parents: 4722c6f
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Jan 14 23:33:28 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 23:33:28 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 2 +-
.../org/apache/storm/scheduler/WorkerSlot.java | 2 +-
.../storm/scheduler/resource/RAS_Node.java | 5 +-
.../DefaultResourceAwareStrategy.java | 57 ++++++++------------
.../strategies/scheduling/IStrategy.java | 4 +-
5 files changed, 29 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0555ed9..58a2a22 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -701,7 +701,7 @@
;; making a map from node+port to WorkerSlot with allocated resources
node+port->slot (into {} (for [[[node port] [mem-on-heap mem-off-heap cpu]] worker->resources]
{[node port]
- (doto (WorkerSlot. node port mem-on-heap mem-off-heap cpu))}))
+ (WorkerSlot. node port mem-on-heap mem-off-heap cpu)}))
executor->slot (into {} (for [[executor [node port]] executor->node+port]
;; filter out the dead executors
(if (contains? alive-executors executor)
http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 423764d..308af85 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -48,7 +48,7 @@ public class WorkerSlot {
}
public String getId() {
- return this.getNodeId() + ":" + this.getPort();
+ return getNodeId() + ":" + getPort();
}
public double getAllocatedMemOnHeap() {
http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index e33122b..8d37805 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -99,7 +99,7 @@ public class RAS_Node {
}
/**
- * intializes resource usages on node
+ * initializes resource usages on node
*/
private void intializeResources() {
for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry : _topIdToUsedSlots.entrySet()) {
@@ -204,7 +204,7 @@ public class RAS_Node {
/**
* Free all slots on this node. This will update the Cluster too.
*/
- public void freeAllSlots() {
+ public void freeAllSlots() {
if (!_isAlive) {
LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
}
@@ -269,7 +269,6 @@ public class RAS_Node {
return 0.0;
}
Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
- LOG.info("getMemoryUsedByWorker executors: {}", execs);
double totalMemoryUsed = 0.0;
for (ExecutorDetails exec : execs) {
totalMemoryUsed += topo.getTotalMemReqTask(exec);
http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 86d12b0..9ecba47 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -114,23 +114,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
new Object[] { exec, td.getExecutorToComponent().get(exec),
td.getTaskResourceReqList(exec), entry.getKey() });
- WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
- if (targetSlot != null) {
- NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
- if(!schedulerAssignmentMap.containsKey(targetSlot)) {
- schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
- }
-
- schedulerAssignmentMap.get(targetSlot).add(exec);
- targetNode.consumeResourcesforTask(exec, td);
- scheduledTasks.add(exec);
- LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
- targetNode, targetNode.getAvailableMemoryResources(),
- targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
- targetNode.getTotalCpuResources(), targetSlot);
- } else {
- LOG.error("Not Enough Resources to schedule Task {}", exec);
- }
+ scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
it.remove();
}
}
@@ -140,23 +124,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
// schedule left over system tasks
for (ExecutorDetails exec : executorsNotScheduled) {
- WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
- if (targetSlot != null) {
- NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
- if(!schedulerAssignmentMap.containsKey(targetSlot)) {
- schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
- }
-
- schedulerAssignmentMap.get(targetSlot).add(exec);
- targetNode.consumeResourcesforTask(exec, td);
- scheduledTasks.add(exec);
- LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
- targetNode, targetNode.getAvailableMemoryResources(),
- targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
- targetNode.getTotalCpuResources(), targetSlot);
- } else {
- LOG.error("Not Enough Resources to schedule Task {}", exec);
- }
+ scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
}
SchedulingResult result;
@@ -177,6 +145,27 @@ public class DefaultResourceAwareStrategy implements IStrategy {
return result;
}
+ private void scheduleExecutor(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot,
+ Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) {
+ WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+ if (targetSlot != null) {
+ NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
+ if (!schedulerAssignmentMap.containsKey(targetSlot)) {
+ schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+ }
+
+ schedulerAssignmentMap.get(targetSlot).add(exec);
+ targetNode.consumeResourcesforTask(exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+ targetNode, targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(), targetSlot);
+ } else {
+ LOG.error("Not Enough Resources to schedule Task {}", exec);
+ }
+ }
+
private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
WorkerSlot ws;
// first scheduling
http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 58eb236..4a1180a 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -31,7 +31,7 @@ public interface IStrategy {
/**
* initialize prior to scheduling
*/
- public void prepare(ClusterStateData clusterStateData);
+ void prepare(ClusterStateData clusterStateData);
/**
* This method is invoked to calcuate a scheduling for topology td
@@ -41,5 +41,5 @@ public interface IStrategy {
* this map is the worker slot that the value (collection of executors) should be assigned to.
* if a scheduling is calculated successfully, put the scheduling map in the SchedulingResult object.
*/
- public SchedulingResult schedule(TopologyDetails td);
+ SchedulingResult schedule(TopologyDetails td);
}