You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/02 21:41:57 UTC

[1/2] storm git commit: STORM-2872: Fix for wouldFit and rebalance as part of GenericResourceAwareScheduling changes

Repository: storm
Updated Branches:
  refs/heads/master 41d6cdcc9 -> 2846417c8


STORM-2872: Fix for wouldFit and rebalance as part of GenericResourceAwareScheduling changes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fd02caa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fd02caa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fd02caa

Branch: refs/heads/master
Commit: 4fd02caabe2415481c4a0c52d51638a022730f73
Parents: 41d6cdc
Author: Govind Menon <go...@gmail.com>
Authored: Wed Dec 13 17:05:51 2017 -0600
Committer: Govind Menon <go...@gmail.com>
Committed: Tue Jan 2 14:58:32 2018 -0600

----------------------------------------------------------------------
 .../org/apache/storm/scheduler/Cluster.java     |  2 +-
 .../scheduler/resource/NormalizedResources.java | 26 ++++++------
 .../storm/scheduler/resource/ResourceUtils.java | 43 +++++++++++++-------
 .../java/org/apache/storm/TestRebalance.java    | 12 ++++--
 .../resource/TestResourceAwareScheduler.java    | 40 +++++++++++-------
 5 files changed, 78 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4fd02caa/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 39afa9f..9ab0c93 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -468,7 +468,7 @@ public class Cluster implements ISchedulingState {
         double maxHeap) {
 
         NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-        if (!resourcesAvailable.couldHoldIgnoringMemory(requestedResources)) {
+        if (!resourcesAvailable.couldHoldIgnoringSharedMemory(requestedResources)) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fd02caa/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index f332da5..eea38cf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -18,8 +18,6 @@
 
 package org.apache.storm.scheduler.resource;
 
-import static org.apache.storm.Constants.*;
-
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,11 +42,11 @@ public abstract class NormalizedResources {
 
     static {
         Map<String, String> tmp = new HashMap<>();
-        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
         RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
     }
 
@@ -56,10 +54,10 @@ public abstract class NormalizedResources {
         //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
         for (String key : normalizedResources.keySet()) {
             //We are going to skip over CPU and Memory, because they are captured elsewhere
-            if (!COMMON_CPU_RESOURCE_NAME.equals(key)
-                && !COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
-                && !COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
-                && !COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
                 resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement());
             }
         }
@@ -231,7 +229,7 @@ public abstract class NormalizedResources {
      * @param other the resources that we want to check if they would fit in this.
      * @return true if it might fit, else false if it could not possibly fit.
      */
-    public boolean couldHoldIgnoringMemory(NormalizedResources other) {
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other) {
         if (this.cpu < other.getTotalCpu()) {
             return false;
         }
@@ -241,6 +239,10 @@ public abstract class NormalizedResources {
                 return false;
             }
         }
+
+        if (this.getTotalMemoryMb() < other.getTotalMemoryMb()) {
+            return false;
+        }
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fd02caa/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index 83ee4cb..db9dbfa 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
@@ -32,6 +33,8 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
+
 public class ResourceUtils {
     private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
 
@@ -92,7 +95,7 @@ public class ResourceUtils {
 
                 if (resourceUpdatesMap.containsKey(spoutName)) {
                     ComponentCommon spoutCommon = spoutSpec.get_common();
-                    Map<String, Double> resourcesUpdate = resourceUpdatesMap.get(spoutName);
+                    Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(spoutName));
                     String newJsonConf = getJsonWithUpdatedResources(spoutCommon.get_json_conf(), resourcesUpdate);
                     spoutCommon.set_json_conf(newJsonConf);
                     componentsUpdated.put(spoutName, resourcesUpdate);
@@ -105,9 +108,9 @@ public class ResourceUtils {
                 Bolt boltObj = bolt.getValue();
                 String boltName = bolt.getKey();
 
-                if(resourceUpdatesMap.containsKey(boltName)) {
+                if (resourceUpdatesMap.containsKey(boltName)) {
                     ComponentCommon boltCommon = boltObj.get_common();
-                    Map<String, Double> resourcesUpdate = resourceUpdatesMap.get(boltName);
+                    Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(boltName));
                     String newJsonConf = getJsonWithUpdatedResources(boltCommon.get_json_conf(), resourceUpdatesMap.get(boltName));
                     boltCommon.set_json_conf(newJsonConf);
                     componentsUpdated.put(boltName, resourcesUpdate);
@@ -124,24 +127,36 @@ public class ResourceUtils {
         LOG.info("Component resource updates ignored: {}", notUpdated);
     }
 
+    public static String getCorrespondingLegacyResourceName(String normalizedResourceName) {
+        for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_MAPPING.entrySet()) {
+            if (entry.getValue().equals(normalizedResourceName)) {
+                return entry.getKey();
+            }
+        }
+        return normalizedResourceName;
+    }
+
     public static String getJsonWithUpdatedResources(String jsonConf, Map<String, Double> resourceUpdates) {
         try {
             JSONParser parser = new JSONParser();
             Object obj = parser.parse(jsonConf);
             JSONObject jsonObject = (JSONObject) obj;
 
-            if (resourceUpdates.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
-                Double topoMemOnHeap = resourceUpdates.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-                jsonObject.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
-            }
-            if (resourceUpdates.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
-                Double topoMemOffHeap = resourceUpdates.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
-                jsonObject.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
-            }
-            if (resourceUpdates.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-                Double topoCPU = resourceUpdates.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
-                jsonObject.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCPU);
+            Map<String, Double> componentResourceMap =
+                    (Map<String, Double>) jsonObject.getOrDefault(
+                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<String, Double>()
+                    );
+
+            for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) {
+                if (NormalizedResources.RESOURCE_NAME_MAPPING.containsValue(resourceUpdateEntry.getKey())) {
+                    // if there will be legacy values they will be in the outer conf
+                    jsonObject.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
+                    componentResourceMap.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
+                }
+                componentResourceMap.put(resourceUpdateEntry.getKey(), resourceUpdateEntry.getValue());
             }
+            jsonObject.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, componentResourceMap);
+
             return jsonObject.toJSONString();
         } catch (ParseException ex) {
             throw new RuntimeException("Failed to parse component resources with json: " +  jsonConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/4fd02caa/storm-server/src/test/java/org/apache/storm/TestRebalance.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TestRebalance.java b/storm-server/src/test/java/org/apache/storm/TestRebalance.java
index 1f2e8b1..26a8f7f 100644
--- a/storm-server/src/test/java/org/apache/storm/TestRebalance.java
+++ b/storm-server/src/test/java/org/apache/storm/TestRebalance.java
@@ -75,7 +75,7 @@ public class TestRebalance {
 
             StormTopology stormTopology = builder.createTopology();
 
-            LOG.info("submitting topologies...");
+            LOG.info("submitting topologies....");
             String topoName = "topo1";
             cluster.submitTopology(topoName, new HashMap<>(), stormTopology);
 
@@ -87,6 +87,7 @@ public class TestRebalance {
             resources.put("spout-1", new HashMap<String, Double>());
             resources.get("spout-1").put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 120.0);
             resources.get("spout-1").put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 25.0);
+            resources.get("spout-1").put("gpu.count", 5.0);
 
             opts.set_topology_resources_overrides(resources);
             opts.set_wait_secs(0);
@@ -96,7 +97,7 @@ public class TestRebalance {
 
             opts.set_topology_conf_overrides(jsonObject.toJSONString());
 
-            LOG.info("rebalancing...");
+            LOG.info("rebalancing....");
             cluster.rebalance("topo1", opts);
 
             waitTopologyScheduled(topoName, cluster, 10);
@@ -113,8 +114,11 @@ public class TestRebalance {
 
             JSONObject readTopologyConf = (JSONObject) parser.parse(componentConfRaw);
 
-            assertEquals("Updated CPU correct", 25.0, (double) readTopologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), 0.001);
-            assertEquals("Updated Memory correct", 120.0, (double) readTopologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), 0.001);
+            Map<String, Double> componentResources = (Map<String, Double>) readTopologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
+
+            assertEquals("Updated CPU correct", 25.0, componentResources.get(Constants.COMMON_CPU_RESOURCE_NAME), 0.001);
+            assertEquals("Updated Memory correct", 120.0, componentResources.get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME), 0.001);
+            assertEquals("Updated Generic resource correct", 5.0, componentResources.get("gpu.count"), 0.001);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fd02caa/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 38b68c0..eb5f70f 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -42,6 +42,7 @@ import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
+import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
 import org.apache.storm.testing.TestWordCounter;
 import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.TopologyBuilder;
@@ -499,8 +500,7 @@ public class TestResourceAwareScheduler {
         assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology2.getId()));
     }
 
-    @Test
-    public void testHeterogeneousCluster() {
+    public void testHeterogeneousCluster(Config topologyConf, String strategyName) {
         Map<String, Double> test = new HashMap<>();
         test.put("gpu.count", 0.0);
         new NormalizedResourceOffer(test);
@@ -532,7 +532,7 @@ public class TestResourceAwareScheduler {
         builder1.setSpout("wordSpout1", new TestWordSpout(), 1).setCPULoad(300.0).setMemoryLoad(2000.0, 48.0);
         StormTopology stormTopology1 = builder1.createTopology();
         Config config1 = new Config();
-        config1.putAll(defaultTopologyConf);
+        config1.putAll(topologyConf);
         Map<ExecutorDetails, String> executorMap1 = genExecsAndComps(stormTopology1);
         TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0, "user");
 
@@ -541,7 +541,7 @@ public class TestResourceAwareScheduler {
         builder2.setSpout("wordSpout2", new TestWordSpout(), 4).setCPULoad(100.0).setMemoryLoad(500.0, 12.0);
         StormTopology stormTopology2 = builder2.createTopology();
         Config config2 = new Config();
-        config2.putAll(defaultTopologyConf);
+        config2.putAll(topologyConf);
         Map<ExecutorDetails, String> executorMap2 = genExecsAndComps(stormTopology2);
         TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0, "user");
 
@@ -550,7 +550,7 @@ public class TestResourceAwareScheduler {
         builder3.setSpout("wordSpout3", new TestWordSpout(), 4).setCPULoad(20.0).setMemoryLoad(200.0, 56.0);
         StormTopology stormTopology3 = builder3.createTopology();
         Config config3 = new Config();
-        config3.putAll(defaultTopologyConf);
+        config3.putAll(topologyConf);
         Map<ExecutorDetails, String> executorMap3 = genExecsAndComps(stormTopology3);
         TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0, "user");
 
@@ -559,7 +559,7 @@ public class TestResourceAwareScheduler {
         builder4.setSpout("wordSpout4", new TestWordSpout(), 12).setCPULoad(30.0).setMemoryLoad(100.0, 0.0);
         StormTopology stormTopology4 = builder4.createTopology();
         Config config4 = new Config();
-        config4.putAll(defaultTopologyConf);
+        config4.putAll(topologyConf);
         Map<ExecutorDetails, String> executorMap4 = genExecsAndComps(stormTopology4);
         TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0, "user");
 
@@ -568,7 +568,7 @@ public class TestResourceAwareScheduler {
         builder5.setSpout("wordSpout5", new TestWordSpout(), 40).setCPULoad(25.0).setMemoryLoad(100.0, 28.0);
         StormTopology stormTopology5 = builder5.createTopology();
         Config config5 = new Config();
-        config5.putAll(defaultTopologyConf);
+        config5.putAll(topologyConf);
         Map<ExecutorDetails, String> executorMap5 = genExecsAndComps(stormTopology5);
         TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0, "user");
 
@@ -580,9 +580,9 @@ public class TestResourceAwareScheduler {
         rs.prepare(config1);
         rs.schedule(topologies, cluster);
 
-        assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
-        assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
-        assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
+        assertEquals("Running - Fully Scheduled by " + strategyName, cluster.getStatusMap().get(topology1.getId()));
+        assertEquals("Running - Fully Scheduled by " + strategyName, cluster.getStatusMap().get(topology2.getId()));
+        assertEquals("Running - Fully Scheduled by " + strategyName, cluster.getStatusMap().get(topology3.getId()));
 
         Map<SupervisorDetails, Double> superToCpu = getSupervisorToCpuUsage(cluster, topologies);
         Map<SupervisorDetails, Double> superToMem = getSupervisorToMemoryUsage(cluster, topologies);
@@ -595,7 +595,7 @@ public class TestResourceAwareScheduler {
             Double memUsed = superToMem.get(supervisor);
 
             assertTrue(supervisor.getId() + " MEM: "+ memAvailable + " == " + memUsed + " OR CPU: " + cpuAvailable + " == " + cpuUsed,
-                (Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON));
+                    (Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON));
         }
         // end of Test1
 
@@ -606,15 +606,15 @@ public class TestResourceAwareScheduler {
         rs.prepare(config1);
         rs.schedule(topologies, cluster);
         int numTopologiesAssigned = 0;
-        if (cluster.getStatusMap().get(topology1.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+        if (cluster.getStatusMap().get(topology1.getId()).equals("Running - Fully Scheduled by " + strategyName)) {
             LOG.info("TOPO 1 scheduled");
             numTopologiesAssigned++;
         }
-        if (cluster.getStatusMap().get(topology2.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+        if (cluster.getStatusMap().get(topology2.getId()).equals("Running - Fully Scheduled by " + strategyName)) {
             LOG.info("TOPO 2 scheduled");
             numTopologiesAssigned++;
         }
-        if (cluster.getStatusMap().get(topology4.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+        if (cluster.getStatusMap().get(topology4.getId()).equals("Running - Fully Scheduled by " + strategyName)) {
             LOG.info("TOPO 3 scheduled");
             numTopologiesAssigned++;
         }
@@ -641,6 +641,18 @@ public class TestResourceAwareScheduler {
     }
 
     @Test
+    public void testHeterogeneousClusterwithDefaultRas() {
+        testHeterogeneousCluster(defaultTopologyConf, DefaultResourceAwareStrategy.class.getSimpleName());
+    }
+
+    @Test
+    public void testHeterogeneousClusterwithGras() {
+        Config grasClusterConfig = (Config) defaultTopologyConf.clone();
+        grasClusterConfig.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, GenericResourceAwareStrategy.class.getName());
+        testHeterogeneousCluster(grasClusterConfig, GenericResourceAwareStrategy.class.getSimpleName());
+    }
+
+    @Test
     public void testTopologyWorkerMaxHeapSize() {
         // Test1: If RAS spreads executors across multiple workers based on the set limit for a worker used by the topology
         INimbus iNimbus = new INimbusTest();


[2/2] storm git commit: Merge branch 'YSTORM-4457-II' of https://github.com/govind-menon/storm into STORM-2872

Posted by bo...@apache.org.
Merge branch 'YSTORM-4457-II' of https://github.com/govind-menon/storm into STORM-2872

STORM-2872: Fix for wouldFit and rebalance as part of
GenericResourceAwareScheduling changes

This closes #2456


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2846417c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2846417c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2846417c

Branch: refs/heads/master
Commit: 2846417c8a2b2d990146063bc30aef5acbb6933c
Parents: 41d6cdc 4fd02ca
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Jan 2 15:19:07 2018 -0600
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Jan 2 15:19:07 2018 -0600

----------------------------------------------------------------------
 .../org/apache/storm/scheduler/Cluster.java     |  2 +-
 .../scheduler/resource/NormalizedResources.java | 26 ++++++------
 .../storm/scheduler/resource/ResourceUtils.java | 43 +++++++++++++-------
 .../java/org/apache/storm/TestRebalance.java    | 12 ++++--
 .../resource/TestResourceAwareScheduler.java    | 40 +++++++++++-------
 5 files changed, 78 insertions(+), 45 deletions(-)
----------------------------------------------------------------------