You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/03/28 21:07:24 UTC

[4/8] storm git commit: Merge two functions

Merge two functions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f1161c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f1161c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f1161c

Branch: refs/heads/master
Commit: 58f1161cb077f90fb64e8a68f9da9c9aedf4f7dd
Parents: 6340601
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Mar 24 17:31:13 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Mar 24 17:31:13 2016 -0500

----------------------------------------------------------------------
 .../resource/TestResourceAwareScheduler.java    | 10 +++--
 .../TestUtilsForResourceAwareScheduler.java     | 44 ++++----------------
 2 files changed, 13 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 28fd491..9cfdc6e 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -616,8 +616,9 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
         Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
 
-        Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
-        Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+        Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+        Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+        TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
 
         final Double EPSILON = 0.0001;
         for (SupervisorDetails supervisor : supMap.values()) {
@@ -658,8 +659,9 @@ public class TestResourceAwareScheduler {
         topologies = new Topologies(topoMap);
         rs.prepare(config1);
         rs.schedule(topologies, cluster);
-        superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
-        superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+        superToCpu = new HashMap<>();
+        superToMem = new HashMap<>();
+        TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
         for (SupervisorDetails supervisor : supMap.values()) {
             Double cpuAvailable = supervisor.getTotalCPU();
             Double memAvailable = supervisor.getTotalMemory();

http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7cd21ce..8b7f9c8 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -289,46 +289,14 @@ public class TestUtilsForResourceAwareScheduler {
         return ret;
     }
 
-    public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
-        Map<SupervisorDetails, Double> superToMem = new HashMap<>();
-        Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
-        Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
-        for (SupervisorDetails supervisor : supervisors) {
-            superToMem.put(supervisor, 0.0);
-        }
-
-        for (SchedulerAssignment assignment : assignments) {
-            Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
-            Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
-            TopologyDetails topology = topologies.getById(assignment.getTopologyId());
-            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
-                executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
-            }
-            for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
-                List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
-                if (executorsOnSupervisor == null) {
-                    executorsOnSupervisor = new ArrayList<>();
-                    supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
-                }
-                executorsOnSupervisor.add(entry.getKey());
-            }
-            for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
-                Double supervisorUsedMemory = 0.0;
-                for (ExecutorDetails executor: entry.getValue()) {
-                    supervisorUsedMemory += topology.getTotalMemReqTask(executor);
-                }
-                superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
-            }
-        }
-        return superToMem;
-    }
-
-    public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
-        Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+    public static void getSupervisorToResourceUsage(Cluster cluster, Topologies topologies,
+                                                    Map<SupervisorDetails, Double> superToCpu,
+                                                    Map<SupervisorDetails, Double> superToMem) {
         Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
         Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
         for (SupervisorDetails supervisor : supervisors) {
             superToCpu.put(supervisor, 0.0);
+            superToMem.put(supervisor, 0.0);
         }
 
         for (SchedulerAssignment assignment : assignments) {
@@ -348,12 +316,14 @@ public class TestUtilsForResourceAwareScheduler {
             }
             for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
                 Double supervisorUsedCpu = 0.0;
+                Double supervisorUsedMemory = 0.0;
                 for (ExecutorDetails executor: entry.getValue()) {
                     supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+                    supervisorUsedMemory += topology.getTotalMemReqTask(executor);
                 }
                 superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu);
+                superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
             }
         }
-        return superToCpu;
     }
 }