You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/03/28 21:07:24 UTC
[4/8] storm git commit: Merge two functions
Merge two functions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f1161c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f1161c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f1161c
Branch: refs/heads/master
Commit: 58f1161cb077f90fb64e8a68f9da9c9aedf4f7dd
Parents: 6340601
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Mar 24 17:31:13 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Mar 24 17:31:13 2016 -0500
----------------------------------------------------------------------
.../resource/TestResourceAwareScheduler.java | 10 +++--
.../TestUtilsForResourceAwareScheduler.java | 44 ++++----------------
2 files changed, 13 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 28fd491..9cfdc6e 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -616,8 +616,9 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
- Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
- Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+ Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+ Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+ TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
final Double EPSILON = 0.0001;
for (SupervisorDetails supervisor : supMap.values()) {
@@ -658,8 +659,9 @@ public class TestResourceAwareScheduler {
topologies = new Topologies(topoMap);
rs.prepare(config1);
rs.schedule(topologies, cluster);
- superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
- superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+ superToCpu = new HashMap<>();
+ superToMem = new HashMap<>();
+ TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
for (SupervisorDetails supervisor : supMap.values()) {
Double cpuAvailable = supervisor.getTotalCPU();
Double memAvailable = supervisor.getTotalMemory();
http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7cd21ce..8b7f9c8 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -289,46 +289,14 @@ public class TestUtilsForResourceAwareScheduler {
return ret;
}
- public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
- Map<SupervisorDetails, Double> superToMem = new HashMap<>();
- Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
- Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
- for (SupervisorDetails supervisor : supervisors) {
- superToMem.put(supervisor, 0.0);
- }
-
- for (SchedulerAssignment assignment : assignments) {
- Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
- Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
- TopologyDetails topology = topologies.getById(assignment.getTopologyId());
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
- executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
- }
- for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
- List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
- if (executorsOnSupervisor == null) {
- executorsOnSupervisor = new ArrayList<>();
- supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
- }
- executorsOnSupervisor.add(entry.getKey());
- }
- for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
- Double supervisorUsedMemory = 0.0;
- for (ExecutorDetails executor: entry.getValue()) {
- supervisorUsedMemory += topology.getTotalMemReqTask(executor);
- }
- superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
- }
- }
- return superToMem;
- }
-
- public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
- Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+ public static void getSupervisorToResourceUsage(Cluster cluster, Topologies topologies,
+ Map<SupervisorDetails, Double> superToCpu,
+ Map<SupervisorDetails, Double> superToMem) {
Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
for (SupervisorDetails supervisor : supervisors) {
superToCpu.put(supervisor, 0.0);
+ superToMem.put(supervisor, 0.0);
}
for (SchedulerAssignment assignment : assignments) {
@@ -348,12 +316,14 @@ public class TestUtilsForResourceAwareScheduler {
}
for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
Double supervisorUsedCpu = 0.0;
+ Double supervisorUsedMemory = 0.0;
for (ExecutorDetails executor: entry.getValue()) {
supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+ supervisorUsedMemory += topology.getTotalMemReqTask(executor);
}
superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu);
+ superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
}
}
- return superToCpu;
}
}