You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:47 UTC
[19/23] storm git commit: edits based on review comments
edits based on review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f80d067
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f80d067
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f80d067
Branch: refs/heads/master
Commit: 0f80d0678c243a7bd1b6aa169272741ef7e2950a
Parents: 068a8c2
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Dec 18 01:09:15 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 18 01:21:41 2015 -0600
----------------------------------------------------------------------
.../jvm/backtype/storm/scheduler/Cluster.java | 2 +-
.../resource/ResourceAwareScheduler.java | 88 +++----
.../storm/scheduler/resource/ResourceUtils.java | 11 +-
.../scheduler/resource/SchedulingResult.java | 2 +-
.../backtype/storm/scheduler/resource/User.java | 3 +-
.../eviction/DefaultEvictionStrategy.java | 10 +-
.../DefaultSchedulingPriorityStrategy.java | 4 -
.../strategies/scheduling/IStrategy.java | 4 -
.../jvm/backtype/storm/TestConfigValidate.java | 13 +-
.../resource/TestResourceAwareScheduler.java | 229 ++++++++++++-------
.../TestUtilsForResourceAwareScheduler.java | 26 +--
11 files changed, 208 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 53fdaa4..e3297c8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -458,7 +458,7 @@ public class Cluster {
* set assignments for cluster
*/
public void setAssignments(Map<String, SchedulerAssignment> newAssignments) {
- this.assignments = new HashMap<String, SchedulerAssignmentImpl>();
+ this.assignments = new HashMap<String, SchedulerAssignmentImpl>(newAssignments.size());
for (Map.Entry<String, SchedulerAssignment> entry : newAssignments.entrySet()) {
this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 0558e12..116f1b5 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -99,9 +99,9 @@ public class ResourceAwareScheduler implements IScheduler {
if (schedulingPrioritystrategy == null) {
try {
schedulingPrioritystrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
- } catch (RuntimeException e) {
- LOG.error("failed to create instance of priority strategy: {} with error: {}! No topologies will be scheduled.",
- this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage());
+ } catch (RuntimeException ex) {
+ LOG.error(String.format("failed to create instance of priority strategy: %s with error: %s! No topologies will be scheduled.",
+ this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), ex.getMessage()), ex);
break;
}
}
@@ -111,9 +111,9 @@ public class ResourceAwareScheduler implements IScheduler {
schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
//Call scheduling priority strategy
td = schedulingPrioritystrategy.getNextTopologyToSchedule();
- } catch (Exception e) {
- LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled! Error: {} StackTrace: {}"
- , schedulingPrioritystrategy.getClass().getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+ } catch (Exception ex) {
+ LOG.error(String.format("Exception thrown when running priority strategy %s. No topologies will be scheduled! Error: %s"
+ , schedulingPrioritystrategy.getClass().getName(), ex.getMessage()), ex.getStackTrace());
break;
}
if (td == null) {
@@ -135,9 +135,7 @@ public class ResourceAwareScheduler implements IScheduler {
} catch (RuntimeException e) {
LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
@@ -150,12 +148,10 @@ public class ResourceAwareScheduler implements IScheduler {
//Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
rasStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
result = rasStrategy.schedule(td);
- } catch (Exception e) {
- LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrace: {}"
- , rasStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ } catch (Exception ex) {
+ LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
+ , rasStrategy.getClass().getName(), td.getName()), ex);
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+ rasStrategy.getClass().getName() + ". Please check logs for details");
@@ -168,19 +164,13 @@ public class ResourceAwareScheduler implements IScheduler {
topologySubmitter.moveTopoFromPendingToRunning(td);
this.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
} else {
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
}
} catch (IllegalStateException ex) {
- LOG.error(ex.toString());
- LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Error: {} StackTrace: {}",
- ex.getClass().getName(), Arrays.toString(ex.getStackTrace()));
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
}
@@ -202,44 +192,34 @@ public class ResourceAwareScheduler implements IScheduler {
//need to re prepare since scheduling state might have been restored
evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
madeSpace = evictionStrategy.makeSpaceForTopo(td);
- } catch (Exception e) {
- LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}. No evictions will be done! Error: {} StackTrace: {}"
- , evictionStrategy.getClass().getName(), td.getName(), e.getClass().getName(), Arrays.toString(e.getStackTrace()));
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ } catch (Exception ex) {
+ LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
+ , evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex);
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
break;
}
if (!madeSpace) {
LOG.debug("Could not make space for topo {} will move to attempted", td);
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
break;
}
continue;
} else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
break;
} else {
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
break;
}
}
} else {
LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
- this.restoreCheckpointSchedulingState(schedulingState);
- //since state is restored need the update User topologySubmitter to the new User object in userMap
- topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+ topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
break;
}
@@ -253,6 +233,12 @@ public class ResourceAwareScheduler implements IScheduler {
}
}
+ private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
+ this.restoreCheckpointSchedulingState(schedulingState);
+ //since state is restored need the update User topologySubmitter to the new User object in userMap
+ return this.userMap.get(td.getTopologySubmitter());
+ }
+
private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
if (schedulerAssignmentMap != null) {
double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
@@ -328,8 +314,7 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.debug("userResourcePools: {}", userResourcePools);
for (TopologyDetails td : topologies.getTopologies()) {
- //Get user that submitted topology. If topology submitter is null or empty string, the topologySubmitter
- //will be set to anonymous
+
String topologySubmitter = td.getTopologySubmitter();
//additional safety check to make sure that topologySubmitter is going to be a valid value
if (topologySubmitter == null || topologySubmitter.equals("")) {
@@ -362,17 +347,18 @@ public class ResourceAwareScheduler implements IScheduler {
/**
* Get resource guarantee configs
*
- * @return
+ * @return a map that contains resource guarantees of every user of the following format
+ * {userid->{resourceType->amountGuaranteed}}
*/
private Map<String, Map<String, Double>> getUserResourcePools() {
Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
if (raw != null) {
- for (Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
- String user = UserPoolEntry.getKey();
+ for (Map.Entry<String, Map<String, Number>> userPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
+ String user = userPoolEntry.getKey();
ret.put(user, new HashMap<String, Double>());
- for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+ for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
}
}
@@ -381,10 +367,10 @@ public class ResourceAwareScheduler implements IScheduler {
Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
if (tmp != null) {
- for (Map.Entry<String, Map<String, Number>> UserPoolEntry : tmp.entrySet()) {
- String user = UserPoolEntry.getKey();
+ for (Map.Entry<String, Map<String, Number>> userPoolEntry : tmp.entrySet()) {
+ String user = userPoolEntry.getKey();
ret.put(user, new HashMap<String, Double>());
- for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+ for (Map.Entry<String, Number> resourceEntry : userPoolEntry.getValue().entrySet()) {
ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
index 02d48e1..870b6c2 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
@@ -152,7 +152,7 @@ public class ResourceUtils {
WorkerSlot slot = entry.getValue();
String nodeId = slot.getNodeId();
ExecutorDetails exec = entry.getKey();
- if (schedulingMap.containsKey(nodeId) == false) {
+ if (!schedulingMap.containsKey(nodeId)) {
schedulingMap.put(nodeId, new HashMap<String, Map<WorkerSlot, Collection<ExecutorDetails>>>());
}
if (schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
@@ -181,13 +181,4 @@ public class ResourceUtils {
}
return str.toString();
}
-
- public static String printScheduling(RAS_Nodes nodes) {
- String ret="";
- for (RAS_Node node : nodes.getNodes()) {
- ret += "Node: " + node.getHostname() + "\n";
- ret += "-> " + node.getTopoIdTousedSlots() + "\n";
- }
- return ret;
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
index 13ed8ad..3ea1b2a 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
@@ -80,7 +80,7 @@ public class SchedulingResult {
}
public Map<WorkerSlot, Collection<ExecutorDetails>> getSchedulingResultMap() {
- return schedulingResultMap;
+ return this.schedulingResultMap;
}
public boolean isSuccess() {
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 7f49446..f1d53c6 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -324,6 +322,7 @@ public class User {
/**
* Comparator that sorts topologies by priority and then by submission time
+ * First sort by Topology Priority, if there is a tie for topology priority, topology uptime is used to sort
*/
static class PQsortByPriorityAndSubmittionTime implements Comparator<TopologyDetails> {
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 1812580..d54ec43 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -34,14 +34,12 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultEvictionStrategy.class);
- private Topologies topologies;
private Cluster cluster;
private Map<String, User> userMap;
private RAS_Nodes nodes;
@Override
public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
- this.topologies = topologies;
this.cluster = cluster;
this.userMap = userMap;
this.nodes = nodes;
@@ -51,14 +49,16 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
public boolean makeSpaceForTopo(TopologyDetails td) {
LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
User submitter = this.userMap.get(td.getTopologySubmitter());
- if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
+ if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null
+ || submitter.getCPUResourceGuaranteed() == 0.0 || submitter.getMemoryResourceGuaranteed() == 0.0) {
return false;
}
+
double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
User evictUser = this.findUserWithHighestAverageResourceUtilAboveGuarantee();
- //user has enough resource under his or her resource guarantee to schedule topology
+ //check if user has enough resource under his or her resource guarantee to schedule topology
if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
if (evictUser != null) {
TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
@@ -67,7 +67,7 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
}
} else {
if (evictUser != null) {
- if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (cpuNeeded + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
+ if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
evictTopology(topologyEvict);
return true;
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 0d891ff..c168ab8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -32,17 +32,13 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
private static final Logger LOG = LoggerFactory
.getLogger(DefaultSchedulingPriorityStrategy.class);
- private Topologies topologies;
private Cluster cluster;
private Map<String, User> userMap;
- private RAS_Nodes nodes;
@Override
public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
- this.topologies = topologies;
this.cluster = cluster;
this.userMap = userMap;
- this.nodes = nodes;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index a7ac5c9..83d1289 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -18,15 +18,11 @@
package backtype.storm.scheduler.resource.strategies.scheduling;
-import java.util.Collection;
import java.util.Map;
import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.RAS_Node;
import backtype.storm.scheduler.resource.RAS_Nodes;
import backtype.storm.scheduler.resource.SchedulingResult;
import backtype.storm.scheduler.resource.User;
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 048cc92..0d317c6 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -633,7 +633,6 @@ public class TestConfigValidate {
@Test
public void TestResourceAwareSchedulerUserPool() {
TestConfig config = new TestConfig();
- Collection<Object> passCases = new LinkedList<Object>();
Collection<Object> failCases = new LinkedList<Object>();
Map<String, Map<String, Integer>> passCase1 = new HashMap<String, Map<String, Integer>>();
@@ -648,12 +647,8 @@ public class TestConfigValidate {
passCase1.get("derek").put("cpu", 30000);
passCase1.get("derek").put("memory", 60148);
- passCases.add(passCase1);
-
- for (Object value : passCases) {
- config.put(TestConfig.TEST_MAP_CONFIG_7, value);
- ConfigValidation.validateFields(config, TestConfig.class);
- }
+ config.put(TestConfig.TEST_MAP_CONFIG_7, (Object) passCase1);
+ ConfigValidation.validateFields(config, TestConfig.class);
Map<String, Map<String, Integer>> failCase1 = new HashMap<String, Map<String, Integer>>();
failCase1.put("jerry", new HashMap<String, Integer>());
@@ -664,9 +659,11 @@ public class TestConfigValidate {
failCase1.get("jerry").put("memory", 20148);
failCase1.get("bobby").put("cpu", 20000);
failCase1.get("bobby").put("memory", 40148);
+ //this will fail the test since user derek does not have an entry for memory
failCase1.get("derek").put("cpu", 30000);
Map<String, Map<String, Integer>> failCase2 = new HashMap<String, Map<String, Integer>>();
+ //this will fail since jerry doesn't have either cpu or memory entries
failCase2.put("jerry", new HashMap<String, Integer>());
failCase2.put("bobby", new HashMap<String, Integer>());
failCase2.put("derek", new HashMap<String, Integer>());
@@ -700,7 +697,7 @@ public class TestConfigValidate {
config.put(TestConfig.TEST_MAP_CONFIG_8, value);
ConfigValidation.validateFields(config, TestConfig.class);
}
-
+ //will fail since backtype.storm.nimbus.NimbusInfo doesn't implement or extend backtype.storm.networktopography.DNSToSwitchMapping
failCases.add("backtype.storm.nimbus.NimbusInfo");
failCases.add(null);
for (Object value : failCases) {
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index f9b4cd6..f5e875a 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -22,14 +22,12 @@ import backtype.storm.Config;
import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.INimbus;
-import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.SchedulerAssignmentImpl;
import backtype.storm.scheduler.SupervisorDetails;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.utils.Time;
import backtype.storm.utils.Utils;
import backtype.storm.validation.ConfigValidation;
import org.junit.Assert;
@@ -37,7 +35,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -45,21 +42,86 @@ import java.util.Set;
public class TestResourceAwareScheduler {
- private static final int NUM_SUPS = 20;
- private static final int NUM_WORKERS_PER_SUP = 4;
private final String TOPOLOGY_SUBMITTER = "jerry";
private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
+ private static int currentTime = 1450418597;
+
@Test
public void TestReadInResourceAwareSchedulerUserPools() {
-
Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
LOG.info("fromFile: {}", fromFile);
ConfigValidation.validateFields(fromFile);
}
@Test
+ public void TestSubmitUsersWithNoGuarantees() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 200.0);
+ resourceUserPool.get("jerry").put("memory", 2000.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 20);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ }
+
+ @Test
public void TestTopologySortedInCorrectOrder() {
INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
Map<String, Number> resourceMap = new HashMap<String, Number>();
@@ -89,11 +151,11 @@ public class TestResourceAwareScheduler {
config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
@@ -138,7 +200,7 @@ public class TestResourceAwareScheduler {
LOG.info("{} - {}", topo.getName(), queue);
Assert.assertEquals("check order", topo.getName(), "topo-2");
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 30, 10);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 30, 10);
topoMap.put(topo6.getId(), topo6);
topologies = new Topologies(topoMap);
@@ -200,27 +262,27 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 29);
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 29);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 29);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 29);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
- TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 29);
- TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
- TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 29);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 20);
+ TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 29);
+ TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 29);
+ TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 20);
+ TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 29);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
- TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 29);
- TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
- TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 29);
+ TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, currentTime - 2, 20);
+ TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, currentTime - 8, 29);
+ TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, currentTime - 16, 29);
+ TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, currentTime - 16, 20);
+ TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, currentTime - 24, 29);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
@@ -286,8 +348,8 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 29);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 29);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
@@ -313,9 +375,9 @@ public class TestResourceAwareScheduler {
}
/**
- * The resources in the cluster is limited. In the first round of scheduling, all resources in the cluster is used.
- * User jerry submits another toplogy. Since user jerry has has his resource guarantees satisfied, and user bobby
- * has exceeded his resource guarantee, topo-3 from user bobby should be eviced.
+ * The resources in the cluster are limited. In the first round of scheduling, all resources in the cluster is used.
+ * User jerry submits another toploogy. Since user jerry has his resource guarantees satisfied, and user bobby
+ * has exceeded his resource guarantee, topo-3 from user bobby should be evicted.
*/
@Test
public void testEviction() {
@@ -350,18 +412,18 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 20);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
@@ -377,7 +439,7 @@ public class TestResourceAwareScheduler {
rs.schedule(topologies, cluster);
for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
@@ -385,7 +447,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -393,7 +455,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -407,7 +469,7 @@ public class TestResourceAwareScheduler {
rs.schedule(topologies, cluster);
for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
@@ -415,7 +477,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -423,16 +485,17 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
- Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertFalse("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("correct topology to evict", "topo-3", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName());
}
@Test
@@ -464,17 +527,17 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, currentTime - 2, 10);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo2.getId(), topo2);
@@ -490,7 +553,7 @@ public class TestResourceAwareScheduler {
rs.schedule(topologies, cluster);
for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -498,7 +561,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -511,7 +574,7 @@ public class TestResourceAwareScheduler {
rs.schedule(topologies, cluster);
for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
@@ -519,7 +582,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertTrue("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
@@ -527,7 +590,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
- Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertFalse("assert topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -571,19 +634,19 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, currentTime - 2, 10);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 20);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 29);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 29);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 15, 29);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo2.getId(), topo2);
@@ -751,19 +814,18 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 29);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 10);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 29);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
@@ -874,8 +936,7 @@ public class TestResourceAwareScheduler {
}
/**
- * If topologies from other users cannot be evicted to make space
- * check if there is a topology with lower priority that can be evicted from the current user
+ * If users are above his or her guarantee, check if topology eviction works correct
*/
@Test
public void TestOverGuaranteeEviction() {
@@ -910,18 +971,18 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
@@ -961,7 +1022,8 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
//user derek submits another topology into a full cluster
- // topo6 should not be able to scheduled
+ //topo6 should not be able to scheduled intially, but since topo6 has higher priority than topo5
+ //topo5 will be evicted so that topo6 can be scheduled
topoMap.put(topo6.getId(), topo6);
topologies = new Topologies(topoMap);
rs.schedule(topologies, cluster);
@@ -1051,7 +1113,6 @@ public class TestResourceAwareScheduler {
resourceUserPool.get("bobby").put("cpu", 200.0);
resourceUserPool.get("bobby").put("memory", 2000.0);
-
resourceUserPool.put("derek", new HashMap<String, Number>());
resourceUserPool.get("derek").put("cpu", 100.0);
resourceUserPool.get("derek").put("memory", 1000.0);
@@ -1061,18 +1122,18 @@ public class TestResourceAwareScheduler {
config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, currentTime - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, currentTime - 2, 20);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, currentTime - 2, 10);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, currentTime - 2, 10);
config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, currentTime - 2, 29);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, currentTime - 2, 10);
Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
topoMap.put(topo1.getId(), topo1);
http://git-wip-us.apache.org/repos/asf/storm/blob/0f80d067/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index dcd487f..1aa010b 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -22,7 +22,6 @@ import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.TopologySummary;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.INimbus;
import backtype.storm.scheduler.IScheduler;
@@ -57,26 +56,25 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class TestUtilsForResourceAwareScheduler {
+ private static int currentTime = 1450418597;
+
private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
public static List<TopologyDetails> getListOfTopologies(Config config) {
List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 0));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 0));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 15));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 8));
- topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 9));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, currentTime - 2, 20));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, currentTime - 8, 30));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, currentTime - 16, 30));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, currentTime - 16, 20));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, currentTime - 24, 30));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, currentTime - 2, 0));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, currentTime - 8, 0));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, currentTime - 16, 15));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, currentTime - 16, 8));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, currentTime - 24, 9));
return topos;
}