You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:47 UTC

[19/23] storm git commit: edits based on review comments

edits based on review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f80d067
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f80d067
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f80d067

Branch: refs/heads/master
Commit: 0f80d0678c243a7bd1b6aa169272741ef7e2950a
Parents: 068a8c2
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Dec 18 01:09:15 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 18 01:21:41 2015 -0600

----------------------------------------------------------------------
 .../jvm/backtype/storm/scheduler/Cluster.java   |   2 +-
 .../resource/ResourceAwareScheduler.java        |  88 +++----
 .../storm/scheduler/resource/ResourceUtils.java |  11 +-
 .../scheduler/resource/SchedulingResult.java    |   2 +-
 .../backtype/storm/scheduler/resource/User.java |   3 +-
 .../eviction/DefaultEvictionStrategy.java       |  10 +-
 .../DefaultSchedulingPriorityStrategy.java      |   4 -
 .../strategies/scheduling/IStrategy.java        |   4 -
 .../jvm/backtype/storm/TestConfigValidate.java  |  13 +-
 .../resource/TestResourceAwareScheduler.java    | 229 ++++++++++++-------
 .../TestUtilsForResourceAwareScheduler.java     |  26 +--
 11 files changed, 208 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 53fdaa4..e3297c8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -458,7 +458,7 @@ public class Cluster {
      * set assignments for cluster
      */
     public void setAssignments(Map<String, SchedulerAssignment> newAssignments) {
-        this.assignments = new HashMap<String, SchedulerAssignmentImpl>();
+        this.assignments = new HashMap<String, SchedulerAssignmentImpl>(newAssignments.size());
         for (Map.Entry<String, SchedulerAssignment> entry : newAssignments.entrySet()) {
             this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 0558e12..116f1b5 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -99,9 +99,9 @@ public class ResourceAwareScheduler implements IScheduler {
             if (schedulingPrioritystrategy == null) {
                 try {
                     schedulingPrioritystrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
-                } catch (RuntimeException e) {
-                    LOG.error("failed to create instance of priority strategy: {} with error: {}! No topologies will be scheduled.",
-                            this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage());
+                } catch (RuntimeException ex) {
+                    LOG.error(String.format("failed to create instance of priority strategy: %s with error: %s! No topologies will be scheduled.",
+                                    this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), ex.getMessage()), ex);
                     break;
                 }
             }
@@ -111,9 +111,9 @@ public class ResourceAwareScheduler implements IScheduler {
                 schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
                 //Call scheduling priority strategy
                 td = schedulingPrioritystrategy.getNextTopologyToSchedule();
-            } catch (Exception e) {
-                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled! Error: {} StackTrace: {}"
-                        , schedulingPrioritystrategy.getClass().getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+            } catch (Exception ex) {
+                LOG.error(String.format("Exception thrown when running priority strategy %s. No topologies will be scheduled! Error: %s"
+                        , schedulingPrioritystrategy.getClass().getName(), ex.getMessage()), ex.getStackTrace());
                 break;
             }
             if (td == null) {
@@ -135,9 +135,7 @@ public class ResourceAwareScheduler implements IScheduler {
             } catch (RuntimeException e) {
                 LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
                         td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
-                this.restoreCheckpointSchedulingState(schedulingState);
-                //since state is restored need the update User topologySubmitter to the new User object in userMap
-                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                topologySubmitter = this.cleanup(schedulingState, td);
                 topologySubmitter.moveTopoFromPendingToInvalid(td);
                 this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
                         + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
@@ -150,12 +148,10 @@ public class ResourceAwareScheduler implements IScheduler {
                     //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
                     rasStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
                     result = rasStrategy.schedule(td);
-                } catch (Exception e) {
-                    LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrace: {}"
-                            , rasStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
-                    this.restoreCheckpointSchedulingState(schedulingState);
-                    //since state is restored need the update User topologySubmitter to the new User object in userMap
-                    topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                } catch (Exception ex) {
+                    LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
+                            , rasStrategy.getClass().getName(), td.getName()), ex);
+                    topologySubmitter = this.cleanup(schedulingState, td);
                     topologySubmitter.moveTopoFromPendingToInvalid(td);
                     this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
                             + rasStrategy.getClass().getName() + ". Please check logs for details");
@@ -168,19 +164,13 @@ public class ResourceAwareScheduler implements IScheduler {
                                 topologySubmitter.moveTopoFromPendingToRunning(td);
                                 this.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
                             } else {
-                                this.restoreCheckpointSchedulingState(schedulingState);
-                                //since state is restored need the update User topologySubmitter to the new User object in userMap
-                                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                                topologySubmitter = this.cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
                             }
                         } catch (IllegalStateException ex) {
-                            LOG.error(ex.toString());
-                            LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Error: {} StackTrace: {}",
-                                    ex.getClass().getName(), Arrays.toString(ex.getStackTrace()));
-                            this.restoreCheckpointSchedulingState(schedulingState);
-                            //since state is restored need the update User topologySubmitter to the new User object in userMap
-                            topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                            LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
+                            topologySubmitter = this.cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToAttempted(td);
                             this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
                         }
@@ -202,44 +192,34 @@ public class ResourceAwareScheduler implements IScheduler {
                                 //need to re prepare since scheduling state might have been restored
                                 evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
                                 madeSpace = evictionStrategy.makeSpaceForTopo(td);
-                            } catch (Exception e) {
-                                LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}. No evictions will be done! Error: {} StackTrace: {}"
-                                        , evictionStrategy.getClass().getName(), td.getName(), e.getClass().getName(), Arrays.toString(e.getStackTrace()));
-                                this.restoreCheckpointSchedulingState(schedulingState);
-                                //since state is restored need the update User topologySubmitter to the new User object in userMap
-                                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                            } catch (Exception ex) {
+                                LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
+                                        , evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex);
+                                topologySubmitter = this.cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 break;
                             }
                             if (!madeSpace) {
                                 LOG.debug("Could not make space for topo {} will move to attempted", td);
-                                this.restoreCheckpointSchedulingState(schedulingState);
-                                //since state is restored need the update User topologySubmitter to the new User object in userMap
-                                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                                topologySubmitter = this.cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
                                 break;
                             }
                             continue;
                         } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
-                            this.restoreCheckpointSchedulingState(schedulingState);
-                            //since state is restored need the update User topologySubmitter to the new User object in userMap
-                            topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                            topologySubmitter = this.cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
                             break;
                         } else {
-                            this.restoreCheckpointSchedulingState(schedulingState);
-                            //since state is restored need the update User topologySubmitter to the new User object in userMap
-                            topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                            topologySubmitter = this.cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
                             break;
                         }
                     }
                 } else {
                     LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
-                    this.restoreCheckpointSchedulingState(schedulingState);
-                    //since state is restored need the update User topologySubmitter to the new User object in userMap
-                    topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                    topologySubmitter = this.cleanup(schedulingState, td);
                     topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
                     break;
                 }
@@ -253,6 +233,12 @@ public class ResourceAwareScheduler implements IScheduler {
         }
     }
 
+    private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
+        this.restoreCheckpointSchedulingState(schedulingState);
+        //since state is restored need the update User topologySubmitter to the new User object in userMap
+        return this.userMap.get(td.getTopologySubmitter());
+    }
+
     private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
         if (schedulerAssignmentMap != null) {
             double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
@@ -328,8 +314,7 @@ public class ResourceAwareScheduler implements IScheduler {
         LOG.debug("userResourcePools: {}", userResourcePools);
 
         for (TopologyDetails td : topologies.getTopologies()) {
-            //Get user that submitted topology.  If topology submitter is null or empty string, the topologySubmitter
-            //will be set to anonymous
+
             String topologySubmitter = td.getTopologySubmitter();
             //additional safety check to make sure that topologySubmitter is going to be a valid value
             if (topologySubmitter == null || topologySubmitter.equals("")) {
@@ -362,17 +347,18 @@ public class ResourceAwareScheduler implements IScheduler {
     /**
      * Get resource guarantee configs
      *
-     * @return
+     * @return a map that contains resource guarantees of every user of the following format
+     * {userid->{resourceType->amountGuaranteed}}
      */
     private Map<String, Map<String, Double>> getUserResourcePools() {
         Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
         Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
 
         if (raw != null) {
-            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
-                String user = UserPoolEntry.getKey();
+            for (Map.Entry<String, Map<String, Number>> userPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
+                String user = userPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
-                for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+                for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
                     ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
                 }
             }
@@ -381,10 +367,10 @@ public class ResourceAwareScheduler implements IScheduler {
         Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
         Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
         if (tmp != null) {
-            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : tmp.entrySet()) {
-                String user = UserPoolEntry.getKey();
+            for (Map.Entry<String, Map<String, Number>> userPoolEntry : tmp.entrySet()) {
+                String user = userPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
-                for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+                for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
                     ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
index 02d48e1..870b6c2 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
@@ -152,7 +152,7 @@ public class ResourceUtils {
                     WorkerSlot slot = entry.getValue();
                     String nodeId = slot.getNodeId();
                     ExecutorDetails exec = entry.getKey();
-                    if (schedulingMap.containsKey(nodeId) == false) {
+                    if (!schedulingMap.containsKey(nodeId)) {
                         schedulingMap.put(nodeId, new HashMap<String, Map<WorkerSlot, Collection<ExecutorDetails>>>());
                     }
                     if (schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
@@ -181,13 +181,4 @@ public class ResourceUtils {
         }
         return str.toString();
     }
-
-    public static String printScheduling(RAS_Nodes nodes) {
-        String ret="";
-        for (RAS_Node node : nodes.getNodes()) {
-            ret += "Node: " + node.getHostname() + "\n";
-            ret += "-> " + node.getTopoIdTousedSlots() + "\n";
-        }
-        return ret;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
index 13ed8ad..3ea1b2a 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
@@ -80,7 +80,7 @@ public class SchedulingResult {
     }
 
     public Map<WorkerSlot, Collection<ExecutorDetails>> getSchedulingResultMap() {
-        return schedulingResultMap;
+        return this.schedulingResultMap;
     }
 
     public boolean isSuccess() {

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 7f49446..f1d53c6 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -324,6 +322,7 @@ public class User {
 
     /**
      * Comparator that sorts topologies by priority and then by submission time
+     * First sort by Topology Priority, if there is a tie for topology priority, topology uptime is used to sort
      */
     static class PQsortByPriorityAndSubmittionTime implements Comparator<TopologyDetails> {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 1812580..d54ec43 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -34,14 +34,12 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultEvictionStrategy.class);
 
-    private Topologies topologies;
     private Cluster cluster;
     private Map<String, User> userMap;
     private RAS_Nodes nodes;
 
     @Override
     public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
-        this.topologies = topologies;
         this.cluster = cluster;
         this.userMap = userMap;
         this.nodes = nodes;
@@ -51,14 +49,16 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
     public boolean makeSpaceForTopo(TopologyDetails td) {
         LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
         User submitter = this.userMap.get(td.getTopologySubmitter());
-        if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
+        if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null
+                || submitter.getCPUResourceGuaranteed() == 0.0 || submitter.getMemoryResourceGuaranteed() == 0.0) {
             return false;
         }
+
         double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
         double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
 
         User evictUser = this.findUserWithHighestAverageResourceUtilAboveGuarantee();
-        //user has enough resource under his or her resource guarantee to schedule topology
+        //check if user has enough resource under his or her resource guarantee to schedule topology
         if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
             if (evictUser != null) {
                 TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
@@ -67,7 +67,7 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
             }
         } else {
             if (evictUser != null) {
-                if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (cpuNeeded + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
+                if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
                     TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
                     evictTopology(topologyEvict);
                     return true;

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 0d891ff..c168ab8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -32,17 +32,13 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultSchedulingPriorityStrategy.class);
 
-    private Topologies topologies;
     private Cluster cluster;
     private Map<String, User> userMap;
-    private RAS_Nodes nodes;
 
     @Override
     public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
-        this.topologies = topologies;
         this.cluster = cluster;
         this.userMap = userMap;
-        this.nodes = nodes;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index a7ac5c9..83d1289 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -18,15 +18,11 @@
 
 package backtype.storm.scheduler.resource.strategies.scheduling;
 
-import java.util.Collection;
 import java.util.Map;
 
 import backtype.storm.scheduler.Cluster;
 import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.ExecutorDetails;
 import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.RAS_Node;
 import backtype.storm.scheduler.resource.RAS_Nodes;
 import backtype.storm.scheduler.resource.SchedulingResult;
 import backtype.storm.scheduler.resource.User;

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 048cc92..0d317c6 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -633,7 +633,6 @@ public class TestConfigValidate {
     @Test
     public void TestResourceAwareSchedulerUserPool() {
         TestConfig config = new TestConfig();
-        Collection<Object> passCases = new LinkedList<Object>();
         Collection<Object> failCases = new LinkedList<Object>();
 
         Map<String, Map<String, Integer>> passCase1 = new HashMap<String, Map<String, Integer>>();
@@ -648,12 +647,8 @@ public class TestConfigValidate {
         passCase1.get("derek").put("cpu", 30000);
         passCase1.get("derek").put("memory", 60148);
 
-        passCases.add(passCase1);
-
-        for (Object value : passCases) {
-            config.put(TestConfig.TEST_MAP_CONFIG_7, value);
-            ConfigValidation.validateFields(config, TestConfig.class);
-        }
+        config.put(TestConfig.TEST_MAP_CONFIG_7, (Object) passCase1);
+        ConfigValidation.validateFields(config, TestConfig.class);
 
         Map<String, Map<String, Integer>> failCase1 = new HashMap<String, Map<String, Integer>>();
         failCase1.put("jerry", new HashMap<String, Integer>());
@@ -664,9 +659,11 @@ public class TestConfigValidate {
         failCase1.get("jerry").put("memory", 20148);
         failCase1.get("bobby").put("cpu", 20000);
         failCase1.get("bobby").put("memory", 40148);
+        //this will fail the test since user derek does not have an entry for memory
         failCase1.get("derek").put("cpu", 30000);
 
         Map<String, Map<String, Integer>> failCase2 = new HashMap<String, Map<String, Integer>>();
+        //this will fail since jerry doesn't have either cpu or memory entries
         failCase2.put("jerry", new HashMap<String, Integer>());
         failCase2.put("bobby", new HashMap<String, Integer>());
         failCase2.put("derek", new HashMap<String, Integer>());
@@ -700,7 +697,7 @@ public class TestConfigValidate {
             config.put(TestConfig.TEST_MAP_CONFIG_8, value);
             ConfigValidation.validateFields(config, TestConfig.class);
         }
-
+        //will fail since backtype.storm.nimbus.NimbusInfo doesn't implement or extend backtype.storm.networktopography.DNSToSwitchMapping
         failCases.add("backtype.storm.nimbus.NimbusInfo");
         failCases.add(null);
         for (Object value : failCases) {

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index f9b4cd6..f5e875a 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -22,14 +22,12 @@ import backtype.storm.Config;
 import backtype.storm.scheduler.Cluster;
 import backtype.storm.scheduler.ExecutorDetails;
 import backtype.storm.scheduler.INimbus;
-import backtype.storm.scheduler.IScheduler;
 import backtype.storm.scheduler.SchedulerAssignment;
 import backtype.storm.scheduler.SchedulerAssignmentImpl;
 import backtype.storm.scheduler.SupervisorDetails;
 import backtype.storm.scheduler.Topologies;
 import backtype.storm.scheduler.TopologyDetails;
 import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.utils.Time;
 import backtype.storm.utils.Utils;
 import backtype.storm.validation.ConfigValidation;
 import org.junit.Assert;
@@ -37,7 +35,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -45,21 +42,86 @@ import java.util.Set;
 
 public class TestResourceAwareScheduler {
 
-    private static final int NUM_SUPS = 20;
-    private static final int NUM_WORKERS_PER_SUP = 4;
     private final String TOPOLOGY_SUBMITTER = "jerry";
 
     private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
 
+    private static int currentTime = 1450418597;
+
     @Test
     public void TestReadInResourceAwareSchedulerUserPools() {
-
         Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
         LOG.info("fromFile: {}", fromFile);
         ConfigValidation.validateFields(fromFile);
     }
 
     @Test
+    public void TestSubmitUsersWithNoGuarantees() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 200.0);
+        resourceUserPool.get("jerry").put("memory", 2000.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+    }
+
+    @Test
     public void TestTopologySortedInCorrectOrder() {
         INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
         Map<String, Number> resourceMap = new HashMap<String, Number>();
@@ -89,11 +151,11 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -138,7 +200,7 @@ public class TestResourceAwareScheduler {
         LOG.info("{} - {}", topo.getName(), queue);
         Assert.assertEquals("check order", topo.getName(), "topo-2");
 
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 30, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, 10);
         topoMap.put(topo6.getId(), topo6);
 
         topologies = new Topologies(topoMap);
@@ -200,27 +262,27 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 29);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 29);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 29);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
-        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 29);
-        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 29);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20);
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29);
+        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 29);
+        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 20);
+        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 29);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
-        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 29);
-        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 29);
+        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, 20);
+        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, 29);
+        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, 29);
+        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, 20);
+        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, 29);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -286,8 +348,8 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -313,9 +375,9 @@ public class TestResourceAwareScheduler {
     }
 
     /**
-     * The resources in the cluster is limited. In the first round of scheduling, all resources in the cluster is used.
-     * User jerry submits another toplogy.  Since user jerry has has his resource guarantees satisfied, and user bobby
-     * has exceeded his resource guarantee, topo-3 from user bobby should be eviced.
+     * The resources in the cluster are limited. In the first round of scheduling, all resources in the cluster is used.
+     * User jerry submits another toploogy.  Since user jerry has his resource guarantees satisfied, and user bobby
+     * has exceeded his resource guarantee, topo-3 from user bobby should be evicted.
      */
     @Test
     public void testEviction() {
@@ -350,18 +412,18 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 20);
 
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -377,7 +439,7 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
 
         for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
@@ -385,7 +447,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -393,7 +455,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -407,7 +469,7 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
 
         for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
@@ -415,7 +477,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -423,16 +485,17 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
 
         for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
-            Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertFalse("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("correct topology to evict", "topo-3", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName());
     }
 
     @Test
@@ -464,17 +527,17 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, currentTime - 2, 10);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
@@ -490,7 +553,7 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
 
         for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -498,7 +561,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -511,7 +574,7 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
 
         for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
@@ -519,7 +582,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -527,7 +590,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
-            Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertFalse("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -571,19 +634,19 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
@@ -751,19 +814,18 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -874,8 +936,7 @@ public class TestResourceAwareScheduler {
     }
 
     /**
-     * If topologies from other users cannot be evicted to make space
-     * check if there is a topology with lower priority that can be evicted from the current user
+     * If users are above his or her guarantee, check if topology eviction works correct
      */
     @Test
     public void TestOverGuaranteeEviction() {
@@ -910,18 +971,18 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
@@ -961,7 +1022,8 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
 
         //user derek submits another topology into a full cluster
-        // topo6 should not be able to scheduled
+        //topo6 should not be able to scheduled intially, but since topo6 has higher priority than topo5
+        //topo5 will be evicted so that topo6 can be scheduled
         topoMap.put(topo6.getId(), topo6);
         topologies = new Topologies(topoMap);
         rs.schedule(topologies, cluster);
@@ -1051,7 +1113,6 @@ public class TestResourceAwareScheduler {
         resourceUserPool.get("bobby").put("cpu", 200.0);
         resourceUserPool.get("bobby").put("memory", 2000.0);
 
-
         resourceUserPool.put("derek", new HashMap<String, Number>());
         resourceUserPool.get("derek").put("cpu", 100.0);
         resourceUserPool.get("derek").put("memory", 1000.0);
@@ -1061,18 +1122,18 @@ public class TestResourceAwareScheduler {
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);

http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index dcd487f..1aa010b 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -22,7 +22,6 @@ import backtype.storm.Config;
 import backtype.storm.generated.Bolt;
 import backtype.storm.generated.SpoutSpec;
 import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.TopologySummary;
 import backtype.storm.scheduler.ExecutorDetails;
 import backtype.storm.scheduler.INimbus;
 import backtype.storm.scheduler.IScheduler;
@@ -57,26 +56,25 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class TestUtilsForResourceAwareScheduler {
+    private static int currentTime = 1450418597;
+
     private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
     public static List<TopologyDetails> getListOfTopologies(Config config) {
 
         List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
 
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 0));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 15));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 8));
-        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 9));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9));
         return topos;
     }