You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/09/29 17:10:59 UTC

storm git commit: Merge branch '1.x-STORM-1913' of https://github.com/knusbaum/incubator-storm into STORM-1913-1.x

Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch c07befa9b -> df17f5445


Merge branch '1.x-STORM-1913' of https://github.com/knusbaum/incubator-storm into STORM-1913-1.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df17f544
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df17f544
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df17f544

Branch: refs/heads/1.0.x-branch
Commit: df17f5445ca89a9001589a829c89eccae9d076fd
Parents: c07befa
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:20:47 2016 +0900
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 29 12:08:10 2016 -0500

----------------------------------------------------------------------
 .../topology/BaseConfigurationDeclarer.java     | 14 ++--
 .../apache/storm/trident/TridentTopology.java   | 82 ++++++++++++++------
 .../org/apache/storm/trident/graph/Group.java   | 74 ++++++++++++++----
 .../operation/DefaultResourceDeclarer.java      | 12 +--
 .../org/apache/storm/trident/planner/Node.java  |  9 ++-
 .../storm/trident/planner/ProcessorNode.java    |  5 ++
 .../topology/TridentTopologyBuilder.java        | 29 +++++--
 .../apache/storm/trident/integration_test.clj   |  3 +
 8 files changed, 171 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
index 9f597b7..2604d1d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
@@ -46,7 +46,7 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration
         if(val!=null) val = val.intValue();
         return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val);
     }
-    
+
     @Override
     public T setNumTasks(Number val) {
         if (val != null) val = val.intValue();
@@ -55,16 +55,18 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration
 
     @Override
     public T setMemoryLoad(Number onHeap) {
-        return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
+        if (onHeap != null) {
+            onHeap = onHeap.doubleValue();
+            return addConfiguration(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+        }
+        return null;
     }
 
     @Override
     public T setMemoryLoad(Number onHeap, Number offHeap) {
         T ret = null;
-        if (onHeap != null) {
-            onHeap = onHeap.doubleValue();
-            ret = addConfiguration(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
-        }
+        ret = setMemoryLoad(onHeap);
+
         if (offHeap!=null) {
             offHeap = offHeap.doubleValue();
             ret = addConfiguration(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index 06e1576..e87b1d1 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -26,6 +26,7 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.SpoutDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
 
@@ -43,6 +44,7 @@ import org.apache.storm.trident.fluent.IAggregatableStream;
 import org.apache.storm.trident.fluent.UniqueIdGen;
 import org.apache.storm.trident.graph.GraphGrouper;
 import org.apache.storm.trident.graph.Group;
+import org.apache.storm.trident.operation.DefaultResourceDeclarer;
 import org.apache.storm.trident.operation.GroupedMultiReducer;
 import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.operation.MultiReducer;
@@ -76,7 +78,6 @@ import org.apache.storm.trident.util.ErrorEdgeFactory;
 import org.apache.storm.trident.util.IndexedEdge;
 import org.apache.storm.trident.util.TridentUtils;
 
-
 // graph with 3 kinds of nodes:
 // operation, partition, or spout
 // all operations have finishBatch and can optionally be committers
@@ -84,10 +85,12 @@ public class TridentTopology {
     
     //TODO: add a method for drpc stream, needs to know how to automatically do return results, etc
     // is it too expensive to do a batch per drpc request?
-    
+
     final DefaultDirectedGraph<Node, IndexedEdge> _graph;
     final Map<String, List<Node>> _colocate;
     final UniqueIdGen _gen;
+    Map<String, Number> _resourceDefaults = new HashMap<>();
+    Map<String, Number> _masterCoordResources = new HashMap<>();
 
     public TridentTopology() {
         this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
@@ -272,8 +275,18 @@ public class TridentTopology {
               groupedStreams(streams, joinFields),
               new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
               outFields);
-    }    
-        
+    }
+
+    public TridentTopology setResourceDefaults(DefaultResourceDeclarer defaults) {
+        _resourceDefaults = defaults.getResources();
+        return this;
+    }
+
+    public TridentTopology setMasterCoordResources(DefaultResourceDeclarer resources) {
+        _masterCoordResources = resources.getResources();
+        return this;
+    }
+
     public StormTopology build() {
         DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
         
@@ -389,29 +402,28 @@ public class TridentTopology {
 //        System.out.println(graph);
         
         Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
-        
+
         TridentTopologyBuilder builder = new TridentTopologyBuilder();
-        
+
         Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
         Map<Group, String> boltIds = genBoltIds(mergedGroups);
-        
-        Map defaults = Utils.readDefaultConfig();
 
         for(SpoutNode sn: spoutNodes) {
             Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
 
-            Map<String, Number> spoutRes = null;
-            spoutRes = mergeDefaultResources(sn.getResources(), defaults);
+            Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
+            spoutRes.putAll(sn.getResources());
+
             Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
             Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
             Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
+            SpoutDeclarer spoutDeclarer = null;
+
             if(sn.type == SpoutNode.SpoutType.DRPC) {
 
-                builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
-                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn))
-                    .setMemoryLoad(onHeap, offHeap)
-                    .setCPULoad(cpuLoad);
+                spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
+                                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
             } else {
                 ITridentSpout s;
                 if(sn.spout instanceof IBatchSpout) {
@@ -422,9 +434,20 @@ public class TridentTopology {
                     throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                     // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                 }
-                builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn))
-                    .setMemoryLoad(onHeap, offHeap)
-                    .setCPULoad(cpuLoad);
+                spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
+            }
+
+            if(onHeap != null) {
+                if(offHeap != null) {
+                    spoutDeclarer.setMemoryLoad(onHeap, offHeap);
+                }
+                else {
+                    spoutDeclarer.setMemoryLoad(onHeap);
+                }
+            }
+
+            if(cpuLoad != null) {
+                spoutDeclarer.setCPULoad(cpuLoad);
             }
         }
 
@@ -432,16 +455,28 @@ public class TridentTopology {
             if(!isSpoutGroup(g)) {
                 Integer p = parallelisms.get(g);
                 Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
-                Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults);
+                Map<String, Number> groupRes = g.getResources(_resourceDefaults);
 
                 Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
                 Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
                 Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
                 BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
-                                                 committerBatches(g, batchGroupMap), streamToGroup)
-                    .setMemoryLoad(onHeap, offHeap)
-                    .setCPULoad(cpuLoad);
+                                                 committerBatches(g, batchGroupMap), streamToGroup);
+
+                if(onHeap != null) {
+                    if(offHeap != null) {
+                        d.setMemoryLoad(onHeap, offHeap);
+                    }
+                    else {
+                        d.setMemoryLoad(onHeap);
+                    }
+                }
+
+                if(cpuLoad != null) {
+                    d.setCPULoad(cpuLoad);
+                }
+
                 Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                 for(PartitionNode n: inputs) {
                     Node parent = TridentUtils.getParent(graph, n);
@@ -451,8 +486,9 @@ public class TridentTopology {
                 }
             }
         }
-        
-        return builder.buildTopology();
+        HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
+        combinedMasterCoordResources.putAll(_masterCoordResources);
+        return builder.buildTopology(combinedMasterCoordResources);
     }
 
     private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index 2c92304..69deb0e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -20,18 +20,17 @@ package org.apache.storm.trident.graph;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.jgrapht.DirectedGraph;
-import org.apache.storm.trident.operation.ITridentResource;
 import org.apache.storm.trident.planner.Node;
 import org.apache.storm.trident.util.IndexedEdge;
 import org.apache.storm.trident.util.TridentUtils;
 
-
-public class Group implements ITridentResource {
+public class Group {
     public final Set<Node> nodes = new HashSet<>();
     private final DirectedGraph<Node, IndexedEdge> graph;
     private final String id = UUID.randomUUID().toString();
@@ -67,21 +66,68 @@ public class Group implements ITridentResource {
         return ret;        
     }
 
-    @Override
-    public Map<String, Number> getResources() {
-        Map<String, Number> ret = new HashMap<>();
+    /**
+     * In case no resources are specified, returns empty map.
+     * In case differing types of resources are specified, throw.
+     * Otherwise, add all the resources for a group.
+     */
+    public Map<String, Number> getResources(Map<String, Number> defaults) {
+        if(defaults == null) {
+            defaults = new HashMap<>();
+        }
+
+        Map<String, Number> resources = null;
         for(Node n: nodes) {
-            Map<String, Number> res = n.getResources();
-            for(Map.Entry<String, Number> kv : res.entrySet()) {
-                String key = kv.getKey();
-                Number val = kv.getValue();
-                if(ret.containsKey(key)) {
-                    val = new Double(val.doubleValue() + ret.get(key).doubleValue());
+            if(resources == null) {
+                // After this, resources should contain all the kinds of resources
+                // we can count for the group. If we see a kind of resource in another
+                // node not in resources.keySet(), we'll throw.
+                resources = new HashMap<>(defaults);
+                resources.putAll(n.getResources());
+            }
+            else {
+                Map<String, Number> node_res = new HashMap<>(defaults);
+                node_res.putAll(n.getResources());
+                
+                if(!node_res.keySet().equals(resources.keySet())) {
+                    StringBuilder ops = new StringBuilder();
+                    
+                    for(Node nod : nodes) {
+                        Set<String> resource_keys = new HashSet<>(defaults.keySet());
+                        resource_keys.addAll(nod.getResources().keySet());
+                        ops.append("\t[ " + nod.shortString() + ", Resources Set: " + resource_keys + " ]\n");
+                    }
+
+                    if(node_res.keySet().containsAll(resources.keySet())) {
+                        Set<String> diffset = new HashSet<>(node_res.keySet());
+                        diffset.removeAll(resources.keySet());
+                        throw new RuntimeException("Found an operation with resources set which are not set in other operations in the group:\n" +
+                                                   "\t[ " + n.shortString() + " ]: " + diffset + "\n" +
+                                                   "Either set these resources in all other operations in the group, add a default setting, or remove the setting from this operation.\n" +
+                                                   "The group at fault:\n" +
+                                                   ops);
+                    }
+                    else if(resources.keySet().containsAll(node_res.keySet())) {
+                        Set<String> diffset = new HashSet<>(resources.keySet());
+                        diffset.removeAll(node_res.keySet());
+                        throw new RuntimeException("Found an operation with resources unset which are set in other operations in the group:\n" +
+                                                   "\t[ " + n.shortString() + " ]: " + diffset + "\n" +
+                                                   "Either set these resources in all other operations in the group, add a default setting, or remove the setting from all other operations.\n" +
+                                                   "The group at fault:\n" +
+                                                   ops);
+                    }
+                }
+
+                for(Map.Entry<String, Number> kv : node_res.entrySet()) {
+                    String key = kv.getKey();
+                    Number val = kv.getValue();
+
+                    Number newval = new Double(val.doubleValue() + resources.get(key).doubleValue());
+                    resources.put(key, newval);
                 }
-                ret.put(key, val);
             }
         }
-        return ret;
+        return resources;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
index d49011a..0b4bdcd 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -34,15 +34,17 @@ public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implemen
 
     @Override
     public T setMemoryLoad(Number onHeap) {
-        return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
-    }
-
-    @Override
-    public T setMemoryLoad(Number onHeap, Number offHeap) {
         if (onHeap != null) {
             onHeap = onHeap.doubleValue();
             resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
         }
+        return (T)this;
+    }
+
+    @Override
+    public T setMemoryLoad(Number onHeap, Number offHeap) {
+        setMemoryLoad(onHeap);
+
         if (offHeap!=null) {
             offHeap = offHeap.doubleValue();
             resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index b2466e6..c59b9a9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -28,16 +28,15 @@ import org.apache.commons.lang.builder.ToStringStyle;
 
 public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
     private static final AtomicInteger INDEX = new AtomicInteger(0);
-    
     private String nodeId;
-    
+
     public String name = null;
     public Fields allOutputFields;
     public String streamId;
     public Integer parallelismHint = null;
     public NodeStateInfo stateInfo = null;
     public int creationIndex;
-    
+
     public Node(String streamId, String name, Fields allOutputFields) {
         this.nodeId = UUID.randomUUID().toString();
         this.allOutputFields = allOutputFields;
@@ -63,4 +62,8 @@ public class Node extends DefaultResourceDeclarer<Node> implements Serializable
     public String toString() {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
     }
+
+    public String shortString() {
+        return "nodeId: " + nodeId + ", allOutputFields: " + allOutputFields;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java b/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
index 4aef341..49a9208 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
@@ -30,4 +30,9 @@ public class ProcessorNode extends Node {
         this.processor = processor;
         this.selfOutFields = selfOutFields;
     }
+
+    @Override
+    public String shortString() {
+        return super.shortString() + ", processor: " + processor + ", selfOutFields: " + selfOutFields;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
index 21f4377..1e366ab 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.trident.topology;
 
+import org.apache.storm.Config;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.StormTopology;
@@ -129,7 +130,7 @@ public class TridentTopologyBuilder {
         return ret;
     }
     
-    public StormTopology buildTopology() {        
+    public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
         TopologyBuilder builder = new TopologyBuilder();
         Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
         Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
@@ -195,11 +196,27 @@ public class TridentTopologyBuilder {
                 d.addConfigurations(conf);
             }
         }
-        
-        for(Map.Entry<String, List<String>> entry: batchesToCommitIds.entrySet()) {
-            String batch = entry.getKey();
-            List<String> commitIds = entry.getValue();
-            builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
+
+        Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+        for(String batch: batchesToCommitIds.keySet()) {
+            List<String> commitIds = batchesToCommitIds.get(batch);
+            SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
+
+            if(onHeap != null) {
+                if(offHeap != null) {
+                    masterCoord.setMemoryLoad(onHeap, offHeap);
+                }
+                else {
+                    masterCoord.setMemoryLoad(onHeap);
+                }
+            }
+
+            if(cpuLoad != null) {
+                masterCoord.setCPULoad(cpuLoad);
+            }
         }
                 
         for(String id: _bolts.keySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/df17f544/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 7f81c4b..72cebd4 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -285,6 +285,9 @@
                            (. collector emit (str (. tuple getString 0) "!")))))
         (bind word-counts
           (.. topo
+              (setResourceDefaults (doto (org.apache.storm.trident.operation.DefaultResourceDeclarer.)
+                                     (.setMemoryLoad 0 0)
+                                     (.setCPULoad 0)))
               (newStream "words" feeder)
               (parallelismHint 5)
               (setCPULoad 20)