You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:48 UTC

[20/23] storm git commit: revisions based on comments

revisions based on comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc6d0f79
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc6d0f79
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc6d0f79

Branch: refs/heads/master
Commit: dc6d0f797bd0da8d2da361378ae836cdcf61528e
Parents: 0f80d06
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Dec 18 13:11:04 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 18 13:11:04 2015 -0600

----------------------------------------------------------------------
 .../resource/ResourceAwareScheduler.java        | 30 ++++++++++----------
 .../backtype/storm/scheduler/resource/User.java |  3 +-
 .../eviction/DefaultEvictionStrategy.java       | 19 ++++++++++++-
 3 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d0f79/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 116f1b5..f97e6fe 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -83,13 +83,13 @@ public class ResourceAwareScheduler implements IScheduler {
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
         //initialize data structures
-        this.initialize(topologies, cluster);
+        initialize(topologies, cluster);
         //logs everything that is currently scheduled and the location at which they are scheduled
         LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
         //logs the resources available/used for every node
         LOG.info("Nodes:\n{}", this.nodes);
         //logs the detailed info about each user
-        for (User user : this.getUserMap().values()) {
+        for (User user : getUserMap().values()) {
             LOG.info(user.getDetailedInfo());
         }
 
@@ -128,14 +128,14 @@ public class ResourceAwareScheduler implements IScheduler {
         if (cluster.getUnassignedExecutors(td).size() > 0) {
             LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
 
-            SchedulingState schedulingState = this.checkpointSchedulingState();
+            SchedulingState schedulingState = checkpointSchedulingState();
             IStrategy rasStrategy = null;
             try {
                 rasStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
             } catch (RuntimeException e) {
                 LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
                         td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
-                topologySubmitter = this.cleanup(schedulingState, td);
+                topologySubmitter = cleanup(schedulingState, td);
                 topologySubmitter.moveTopoFromPendingToInvalid(td);
                 this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
                         + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
@@ -151,7 +151,7 @@ public class ResourceAwareScheduler implements IScheduler {
                 } catch (Exception ex) {
                     LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
                             , rasStrategy.getClass().getName(), td.getName()), ex);
-                    topologySubmitter = this.cleanup(schedulingState, td);
+                    topologySubmitter = cleanup(schedulingState, td);
                     topologySubmitter.moveTopoFromPendingToInvalid(td);
                     this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
                             + rasStrategy.getClass().getName() + ". Please check logs for details");
@@ -170,7 +170,7 @@ public class ResourceAwareScheduler implements IScheduler {
                             }
                         } catch (IllegalStateException ex) {
                             LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
-                            topologySubmitter = this.cleanup(schedulingState, td);
+                            topologySubmitter = cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToAttempted(td);
                             this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
                         }
@@ -195,31 +195,31 @@ public class ResourceAwareScheduler implements IScheduler {
                             } catch (Exception ex) {
                                 LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
                                         , evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex);
-                                topologySubmitter = this.cleanup(schedulingState, td);
+                                topologySubmitter = cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 break;
                             }
                             if (!madeSpace) {
                                 LOG.debug("Could not make space for topo {} will move to attempted", td);
-                                topologySubmitter = this.cleanup(schedulingState, td);
+                                topologySubmitter = cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
                                 break;
                             }
                             continue;
                         } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
-                            topologySubmitter = this.cleanup(schedulingState, td);
+                            topologySubmitter = cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
                             break;
                         } else {
-                            topologySubmitter = this.cleanup(schedulingState, td);
+                            topologySubmitter = cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
                             break;
                         }
                     }
                 } else {
                     LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
-                    topologySubmitter = this.cleanup(schedulingState, td);
+                    topologySubmitter = cleanup(schedulingState, td);
                     topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
                     break;
                 }
@@ -234,7 +234,7 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
-        this.restoreCheckpointSchedulingState(schedulingState);
+        restoreCheckpointSchedulingState(schedulingState);
         //since state is restored need the update User topologySubmitter to the new User object in userMap
         return this.userMap.get(td.getTopologySubmitter());
     }
@@ -310,7 +310,7 @@ public class ResourceAwareScheduler implements IScheduler {
      */
     private void initUsers(Topologies topologies, Cluster cluster) {
         this.userMap = new HashMap<String, User>();
-        Map<String, Map<String, Double>> userResourcePools = this.getUserResourcePools();
+        Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
         LOG.debug("userResourcePools: {}", userResourcePools);
 
         for (TopologyDetails td : topologies.getTopologies()) {
@@ -380,7 +380,7 @@ public class ResourceAwareScheduler implements IScheduler {
 
     private SchedulingState checkpointSchedulingState() {
         LOG.debug("/*********Checkpoint scheduling state************/");
-        for (User user : this.getUserMap().values()) {
+        for (User user : getUserMap().values()) {
             LOG.debug(user.getDetailedInfo());
         }
         LOG.debug(ResourceUtils.printScheduling(this.cluster, this.topologies));
@@ -404,7 +404,7 @@ public class ResourceAwareScheduler implements IScheduler {
         this.userMap = schedulingState.userMap;
         this.nodes = schedulingState.nodes;
 
-        for (User user : this.getUserMap().values()) {
+        for (User user : getUserMap().values()) {
             LOG.debug(user.getDetailedInfo());
         }
         LOG.debug(ResourceUtils.printScheduling(cluster, topologies));

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d0f79/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index f1d53c6..f3434ac 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -38,9 +38,10 @@ public class User {
     //Topologies yet to be scheduled sorted by priority for each user
     private TreeSet<TopologyDetails> runningQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
-    //Topologies that was attempted to be scheduled but wasn't successull
+    //Topologies that was attempted to be scheduled but wasn't successful
     private TreeSet<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
+    //Topologies that was deemed to be invalid
     private TreeSet<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
     private Map<String, Double> resourcePool = new HashMap<String, Double>();

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d0f79/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index d54ec43..1f446bd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -62,6 +62,12 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
         if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
             if (evictUser != null) {
                 TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+                LOG.debug("Running Topology {} from user {} is still within user's resource guarantee thus, POTENTIALLY evicting Topology {} from user {} since:" +
+                                "\n(1.0 - submitter.getCPUResourcePoolUtilization()) = {} >= cpuNeeded = {}" +
+                                "\nand" +
+                                "\n(1.0 - submitter.getMemoryResourcePoolUtilization()) = {} >= memoryNeeded = {}"
+                        ,td, submitter, topologyEvict, evictUser, (1.0 - submitter.getCPUResourcePoolUtilization())
+                        , cpuNeeded, (1.0 - submitter.getMemoryResourcePoolUtilization()), memoryNeeded);
                 evictTopology(topologyEvict);
                 return true;
             }
@@ -69,16 +75,27 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
             if (evictUser != null) {
                 if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
                     TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+                    LOG.debug("POTENTIALLY Evicting Topology {} from user {} since:" +
+                                    "\n((evictUser.getResourcePoolAverageUtilization() - 1.0) = {}" +
+                                    "\n(cpuNeeded + memoryNeeded) / 2) = {} and (submitter.getResourcePoolAverageUtilization() - 1.0)) = {} Thus," +
+                                    "\n(evictUser.getResourcePoolAverageUtilization() - 1.0) = {} > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)) = {}"
+                            ,topologyEvict, evictUser, (evictUser.getResourcePoolAverageUtilization() - 1.0), ((cpuNeeded + memoryNeeded) / 2)
+                            , (submitter.getResourcePoolAverageUtilization() - 1.0), (evictUser.getResourcePoolAverageUtilization() - 1.0)
+                            , (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)));
                     evictTopology(topologyEvict);
                     return true;
                 }
             }
         }
         //See if there is a lower priority topology that can be evicted from the current user
+        //topologies should already be sorted in order of increasing priority.
+        //Thus, topology at the front of the queue has the lowest priority
         for (TopologyDetails topo : submitter.getTopologiesRunning()) {
             //check to if there is a topology with a lower priority we can evict
             if (topo.getTopologyPriority() > td.getTopologyPriority()) {
-                evictTopology(topo);
+                LOG.debug("POTENTIALLY Evicting Topology {} from user {} (itself) since topology {} has a lower priority than topology {}"
+                        , topo, submitter, topo, td);
+                        evictTopology(topo);
                 return true;
             }
         }