You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:48 UTC
[20/23] storm git commit: revisions based on comments
revisions based on comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc6d0f79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc6d0f79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc6d0f79
Branch: refs/heads/master
Commit: dc6d0f797bd0da8d2da361378ae836cdcf61528e
Parents: 0f80d06
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Dec 18 13:11:04 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 18 13:11:04 2015 -0600
----------------------------------------------------------------------
.../resource/ResourceAwareScheduler.java | 30 ++++++++++----------
.../backtype/storm/scheduler/resource/User.java | 3 +-
.../eviction/DefaultEvictionStrategy.java | 19 ++++++++++++-
3 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d0f79/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 116f1b5..f97e6fe 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -83,13 +83,13 @@ public class ResourceAwareScheduler implements IScheduler {
public void schedule(Topologies topologies, Cluster cluster) {
LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
//initialize data structures
- this.initialize(topologies, cluster);
+ initialize(topologies, cluster);
//logs everything that is currently scheduled and the location at which they are scheduled
LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
//logs the resources available/used for every node
LOG.info("Nodes:\n{}", this.nodes);
//logs the detailed info about each user
- for (User user : this.getUserMap().values()) {
+ for (User user : getUserMap().values()) {
LOG.info(user.getDetailedInfo());
}
@@ -128,14 +128,14 @@ public class ResourceAwareScheduler implements IScheduler {
if (cluster.getUnassignedExecutors(td).size() > 0) {
LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
- SchedulingState schedulingState = this.checkpointSchedulingState();
+ SchedulingState schedulingState = checkpointSchedulingState();
IStrategy rasStrategy = null;
try {
rasStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
} catch (RuntimeException e) {
LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
@@ -151,7 +151,7 @@ public class ResourceAwareScheduler implements IScheduler {
} catch (Exception ex) {
LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
, rasStrategy.getClass().getName(), td.getName()), ex);
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+ rasStrategy.getClass().getName() + ". Please check logs for details");
@@ -170,7 +170,7 @@ public class ResourceAwareScheduler implements IScheduler {
}
} catch (IllegalStateException ex) {
LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
}
@@ -195,31 +195,31 @@ public class ResourceAwareScheduler implements IScheduler {
} catch (Exception ex) {
LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
, evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex);
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
break;
}
if (!madeSpace) {
LOG.debug("Could not make space for topo {} will move to attempted", td);
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
break;
}
continue;
} else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
break;
} else {
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
break;
}
}
} else {
LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
- topologySubmitter = this.cleanup(schedulingState, td);
+ topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
break;
}
@@ -234,7 +234,7 @@ public class ResourceAwareScheduler implements IScheduler {
}
private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
- this.restoreCheckpointSchedulingState(schedulingState);
+ restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
return this.userMap.get(td.getTopologySubmitter());
}
@@ -310,7 +310,7 @@ public class ResourceAwareScheduler implements IScheduler {
*/
private void initUsers(Topologies topologies, Cluster cluster) {
this.userMap = new HashMap<String, User>();
- Map<String, Map<String, Double>> userResourcePools = this.getUserResourcePools();
+ Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
LOG.debug("userResourcePools: {}", userResourcePools);
for (TopologyDetails td : topologies.getTopologies()) {
@@ -380,7 +380,7 @@ public class ResourceAwareScheduler implements IScheduler {
private SchedulingState checkpointSchedulingState() {
LOG.debug("/*********Checkpoint scheduling state************/");
- for (User user : this.getUserMap().values()) {
+ for (User user : getUserMap().values()) {
LOG.debug(user.getDetailedInfo());
}
LOG.debug(ResourceUtils.printScheduling(this.cluster, this.topologies));
@@ -404,7 +404,7 @@ public class ResourceAwareScheduler implements IScheduler {
this.userMap = schedulingState.userMap;
this.nodes = schedulingState.nodes;
- for (User user : this.getUserMap().values()) {
+ for (User user : getUserMap().values()) {
LOG.debug(user.getDetailedInfo());
}
LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d0f79/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index f1d53c6..f3434ac 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -38,9 +38,10 @@ public class User {
//Topologies yet to be scheduled sorted by priority for each user
private TreeSet<TopologyDetails> runningQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
- //Topologies that was attempted to be scheduled but wasn't successull
+ //Topologies that was attempted to be scheduled but wasn't successful
private TreeSet<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ //Topologies that was deemed to be invalid
private TreeSet<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
private Map<String, Double> resourcePool = new HashMap<String, Double>();
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d0f79/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index d54ec43..1f446bd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -62,6 +62,12 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
if (evictUser != null) {
TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+ LOG.debug("Running Topology {} from user {} is still within user's resource guarantee thus, POTENTIALLY evicting Topology {} from user {} since:" +
+ "\n(1.0 - submitter.getCPUResourcePoolUtilization()) = {} >= cpuNeeded = {}" +
+ "\nand" +
+ "\n(1.0 - submitter.getMemoryResourcePoolUtilization()) = {} >= memoryNeeded = {}"
+ ,td, submitter, topologyEvict, evictUser, (1.0 - submitter.getCPUResourcePoolUtilization())
+ , cpuNeeded, (1.0 - submitter.getMemoryResourcePoolUtilization()), memoryNeeded);
evictTopology(topologyEvict);
return true;
}
@@ -69,16 +75,27 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
if (evictUser != null) {
if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+ LOG.debug("POTENTIALLY Evicting Topology {} from user {} since:" +
+ "\n((evictUser.getResourcePoolAverageUtilization() - 1.0) = {}" +
+ "\n(cpuNeeded + memoryNeeded) / 2) = {} and (submitter.getResourcePoolAverageUtilization() - 1.0)) = {} Thus," +
+ "\n(evictUser.getResourcePoolAverageUtilization() - 1.0) = {} > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)) = {}"
+ ,topologyEvict, evictUser, (evictUser.getResourcePoolAverageUtilization() - 1.0), ((cpuNeeded + memoryNeeded) / 2)
+ , (submitter.getResourcePoolAverageUtilization() - 1.0), (evictUser.getResourcePoolAverageUtilization() - 1.0)
+ , (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)));
evictTopology(topologyEvict);
return true;
}
}
}
//See if there is a lower priority topology that can be evicted from the current user
+ //topologies should already be sorted in order of increasing priority.
+ //Thus, topology at the front of the queue has the lowest priority
for (TopologyDetails topo : submitter.getTopologiesRunning()) {
//check to if there is a topology with a lower priority we can evict
if (topo.getTopologyPriority() > td.getTopologyPriority()) {
- evictTopology(topo);
+ LOG.debug("POTENTIALLY Evicting Topology {} from user {} (itself) since topology {} has a lower priority than topology {}"
+ , topo, submitter, topo, td);
+ evictTopology(topo);
return true;
}
}