You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/11/14 15:29:12 UTC

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2419

    STORM-2805: Clean up confs in TopologyBuilders

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2805

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2419
    
----
commit dffe76eba24a903e64a7a62d2b3409ae8bf8b8ac
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-11-13T13:58:06Z

    STORM-2805: Clean up confs in TopologyBuilders

----


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r150948421
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -84,24 +84,24 @@ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
                 p = parallelism.intValue();
             }
             Component component = new Component(bolt, p);
    -        _bolts.put(id, component);
    +        bolts.put(id, component);
             return new BoltDeclarerImpl(component);
         }
         
         public void extendTopology(TopologyBuilder builder) {
    -        BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
    -        for (InputDeclaration decl: _masterBolt.declarations) {
    +        BoltDeclarer declarer = builder.setBolt(masterId, new CoordinatedBolt(masterBolt.bolt), masterBolt.parallelism);
    +        for (InputDeclaration decl: masterBolt.declarations) {
                 decl.declare(declarer);
             }
    -        for (Map<String, Object> conf: _masterBolt.componentConfs) {
    -            declarer.addConfigurations(conf);
    +        if (!masterBolt.componentConf.isEmpty()) {
    --- End diff --
    
    Nit: addConfigurations seems like it does this check too, so it seems unnecessary here


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151444469
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -446,31 +446,26 @@ public String getComponent() {
             }
             
             private void addDeclaration(InputDeclaration declaration) {
    -            _component.declarations.add(declaration);
    +            component.declarations.add(declaration);
             }
     
             @Override
             public BoltDeclarer addConfigurations(Map<String, Object> conf) {
    -            _component.componentConfs.add(conf);
    +            if (conf != null && !conf.isEmpty()) {
    +                component.componentConf.putAll(conf);
    +            }
                 return this;
             }
     
             @Override
    -        public Map getRASConfiguration() {
    -            for (Map<String, Object> conf : _component.componentConfs) {
    -                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    -                    return conf;
    -                }
    -            }
    -            Map<String, Object> newConf = new HashMap<>();
    -            newConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
    -            _component.componentConfs.add(newConf);
    -            return newConf;
    +        public Map<String, Object> getRASConfiguration() {
    +            //TODO this should not be modifiable
    --- End diff --
    
    Yes thank you I forgot to grep the patch for TODOs.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r150950456
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
    @@ -53,12 +53,12 @@
     // Trident subsumes the functionality provided by this class, so it's deprecated
     @Deprecated
    --- End diff --
    
    Not directly related to this PR, but this class has been deprecated for a while. Maybe we should consider removing some of the deprecated classes for 2.0.0?


---

[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2419
  
    Thanks, +1


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151697329
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java ---
    @@ -308,7 +308,7 @@ public String toString() {
                     ", _maxTransactionActive=" + _maxTransactionActive +
                     ", _coordinators=" + _coordinators +
                     ", _managedSpoutIds=" + _managedSpoutIds +
    -                ", _spouts=" + _spouts +
    +                ", spouts=" + _spouts +
    --- End diff --
    
    Do we want to get rid of the underscores of other variables? I wonder why only `spouts`


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151444375
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -84,24 +84,24 @@ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
                 p = parallelism.intValue();
             }
             Component component = new Component(bolt, p);
    -        _bolts.put(id, component);
    +        bolts.put(id, component);
             return new BoltDeclarerImpl(component);
         }
         
         public void extendTopology(TopologyBuilder builder) {
    -        BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
    -        for (InputDeclaration decl: _masterBolt.declarations) {
    +        BoltDeclarer declarer = builder.setBolt(masterId, new CoordinatedBolt(masterBolt.bolt), masterBolt.parallelism);
    +        for (InputDeclaration decl: masterBolt.declarations) {
                 decl.declare(declarer);
             }
    -        for (Map<String, Object> conf: _masterBolt.componentConfs) {
    -            declarer.addConfigurations(conf);
    +        if (!masterBolt.componentConf.isEmpty()) {
    --- End diff --
    
    It is probably unnecessary and I will pull it out.  I left it in initially for compatibility, why try to add an empty config, which will be a no-op.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2419


---

[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2419
  
    Thanks, happy to help :)


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151700529
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
    @@ -187,16 +199,21 @@ public void testGenericResourceAwareStrategy() {
             rs.prepare(conf);
             rs.schedule(topologies, cluster);
     
    +        //We need to have 3 slots on 3 separate hosts to make the GPU situation work
    +
             HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
    -        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
    +        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3 - 500 MB, 50% CPU, 2 GPU
    --- End diff --
    
    This test was also wrong because it was not taking the GPUs into account.   That is why I called out all of the resources in the comments for the scheduling.  bolt-3 requires 2 GPUs.  Each node has 2 GPUs on it.  Having 2 instances of a bolt-3 on the same node, which is what the scheduling was before, would go over the GPU count for the node.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151699779
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java ---
    @@ -308,7 +308,7 @@ public String toString() {
                     ", _maxTransactionActive=" + _maxTransactionActive +
                     ", _coordinators=" + _coordinators +
                     ", _managedSpoutIds=" + _managedSpoutIds +
    -                ", _spouts=" + _spouts +
    +                ", spouts=" + _spouts +
    --- End diff --
    
    This was probably the IDE getting overly ambitious when I refactored something else.  I'll fix it.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151699527
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
    @@ -129,23 +130,34 @@ public void testGenericResourceAwareStrategySharedMemory() {
             double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
             
             SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
    -        assertEquals(1, assignment.getSlots().size());
    -        WorkerSlot ws = assignment.getSlots().iterator().next();
    -        String nodeId = ws.getNodeId();
    -        assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
    -        assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
    -        assertEquals(1, assignment.getScheduledResources().size());
    -        WorkerResources resources = assignment.getScheduledResources().get(ws);
    -        assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
    -        assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
    -        assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
    -        assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
    -        assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
    +        Set<WorkerSlot> slots = assignment.getSlots();
    +        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
    +        LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
    +        Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
    +        assertEquals(2, slots.size());
    --- End diff --
    
    I'll fix the comment.  The test was wrong because the GPU resources were not being recorded properly.  
    
    Each supervisor has a single GPU.  Each spout needs a GPU, but there are 2 spouts, so it cannot be on a single node, and hence cannot be on a single slot.  I can either up the number of CPUs per node to 2 and leave the rest of the test alone, or I can update the comment.


---

[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2419
  
    @srdo I think I have addressed all of your comments.  I also found a few places were an NPE could happen, and one where the TopologyBuilder was not setting the resource configs correctly.  I fixed it and the corresponding test failures.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151636460
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
    @@ -187,16 +199,21 @@ public void testGenericResourceAwareStrategy() {
             rs.prepare(conf);
             rs.schedule(topologies, cluster);
     
    +        //We need to have 3 slots on 3 separate hosts to make the GPU situation work
    +
             HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
    -        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
    +        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3 - 500 MB, 50% CPU, 2 GPU
    --- End diff --
    
    Why is bolt 3 task 3 and 4 instead of 5 and 6 as before?


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151704811
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
    @@ -129,23 +130,34 @@ public void testGenericResourceAwareStrategySharedMemory() {
             double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
             
             SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
    -        assertEquals(1, assignment.getSlots().size());
    -        WorkerSlot ws = assignment.getSlots().iterator().next();
    -        String nodeId = ws.getNodeId();
    -        assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
    -        assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
    -        assertEquals(1, assignment.getScheduledResources().size());
    -        WorkerResources resources = assignment.getScheduledResources().get(ws);
    -        assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
    -        assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
    -        assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
    -        assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
    -        assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
    +        Set<WorkerSlot> slots = assignment.getSlots();
    +        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
    +        LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
    +        Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
    +        assertEquals(2, slots.size());
    --- End diff --
    
    Thanks for explaining.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151630914
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -562,43 +560,54 @@ private void initCommon(String id, IComponent component, Number parallelism) thr
             }
             Map<String, Object> conf = component.getComponentConfiguration();
             if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
    -        _commons.put(id, common);
    +        commons.put(id, common);
         }
     
         protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
    -        String _id;
    +        String id;
             
             public ConfigGetter(String id) {
    -            _id = id;
    +            this.id = id;
             }
             
             @SuppressWarnings("unchecked")
             @Override
             public T addConfigurations(Map<String, Object> conf) {
    -            if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
    -                throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
    +            if (conf != null) {
    +                if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
    +                    throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
    +                }
    +                if (!conf.isEmpty()) {
    +                    String currConf = commons.get(id).get_json_conf();
    +                    commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
    +                }
                 }
    -            String currConf = _commons.get(_id).get_json_conf();
    -            _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
                 return (T) this;
             }
     
    -        @SuppressWarnings("unchecked")
             @Override
    -        public T addResource(String resourceName, Number resourceValue) {
    -            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().getOrDefault(
    -                    Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
    -
    -            resourcesMap.put(resourceName, resourceValue.doubleValue());
    -
    -            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    +        public T addResources(Map<String, Double> resources) {
    +            if (resources != null && !resources.isEmpty()) {
    +                String currConf = commons.get(id).get_json_conf();
    +                Map<String, Object> conf = parseJson(currConf);
    +                Map<String, Double> currentResources =
    +                    (Map<String, Double>) conf.computeIfAbsent(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
    +                currentResources.putAll(resources);
    +                commons.get(id).set_json_conf(JSONValue.toJSONString(currConf));
    --- End diff --
    
    Isn't this writing the original currConf string back to commons instead of the modified conf map?


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r150950610
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java ---
    @@ -51,8 +51,8 @@
     
     // based on transactional topologies
     public class TridentTopologyBuilder {
    -    Map<GlobalStreamId, String> _batchIds = new HashMap();
    -    Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
    +    Map<GlobalStreamId, String> batchIds = new HashMap();
    --- End diff --
    
    Nit: Raw types here and in the following lines


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r150949260
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -446,31 +446,26 @@ public String getComponent() {
             }
             
             private void addDeclaration(InputDeclaration declaration) {
    -            _component.declarations.add(declaration);
    +            component.declarations.add(declaration);
             }
     
             @Override
             public BoltDeclarer addConfigurations(Map<String, Object> conf) {
    -            _component.componentConfs.add(conf);
    +            if (conf != null && !conf.isEmpty()) {
    +                component.componentConf.putAll(conf);
    +            }
                 return this;
             }
     
             @Override
    -        public Map getRASConfiguration() {
    -            for (Map<String, Object> conf : _component.componentConfs) {
    -                if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
    -                    return conf;
    -                }
    -            }
    -            Map<String, Object> newConf = new HashMap<>();
    -            newConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
    -            _component.componentConfs.add(newConf);
    -            return newConf;
    +        public Map<String, Object> getRASConfiguration() {
    +            //TODO this should not be modifiable
    --- End diff --
    
    Would be good to fix this TODO before merging, or at least remove the TODO and raise an issue. I think it'll be forgotten if it's left here.


---

[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2419
  
    @srdo thanks for the review you found a lot of bugs and made this a lot better.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151632384
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
    @@ -129,23 +130,34 @@ public void testGenericResourceAwareStrategySharedMemory() {
             double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
             
             SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
    -        assertEquals(1, assignment.getSlots().size());
    -        WorkerSlot ws = assignment.getSlots().iterator().next();
    -        String nodeId = ws.getNodeId();
    -        assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
    -        assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
    -        assertEquals(1, assignment.getScheduledResources().size());
    -        WorkerResources resources = assignment.getScheduledResources().get(ws);
    -        assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
    -        assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
    -        assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
    -        assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
    -        assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
    +        Set<WorkerSlot> slots = assignment.getSlots();
    +        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
    +        LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
    +        Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
    +        assertEquals(2, slots.size());
    --- End diff --
    
    The comment in line 126 says everything should fit in a single slot. Why does the assignment use two slots now?


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r150948564
  
    --- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
    @@ -446,31 +446,26 @@ public String getComponent() {
             }
             
             private void addDeclaration(InputDeclaration declaration) {
    -            _component.declarations.add(declaration);
    +            component.declarations.add(declaration);
             }
     
             @Override
             public BoltDeclarer addConfigurations(Map<String, Object> conf) {
    -            _component.componentConfs.add(conf);
    +            if (conf != null && !conf.isEmpty()) {
    --- End diff --
    
    Nit: I think putAll is a no-op when the conf is empty, so you probably don't need to check.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151697850
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -562,43 +560,54 @@ private void initCommon(String id, IComponent component, Number parallelism) thr
             }
             Map<String, Object> conf = component.getComponentConfiguration();
             if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
    -        _commons.put(id, common);
    +        commons.put(id, common);
         }
     
         protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
    -        String _id;
    +        String id;
             
             public ConfigGetter(String id) {
    -            _id = id;
    +            this.id = id;
             }
             
             @SuppressWarnings("unchecked")
             @Override
             public T addConfigurations(Map<String, Object> conf) {
    -            if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
    -                throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
    +            if (conf != null) {
    +                if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
    +                    throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
    +                }
    +                if (!conf.isEmpty()) {
    +                    String currConf = commons.get(id).get_json_conf();
    +                    commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
    +                }
                 }
    -            String currConf = _commons.get(_id).get_json_conf();
    -            _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
                 return (T) this;
             }
     
    -        @SuppressWarnings("unchecked")
             @Override
    -        public T addResource(String resourceName, Number resourceValue) {
    -            Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().getOrDefault(
    -                    Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
    -
    -            resourcesMap.put(resourceName, resourceValue.doubleValue());
    -
    -            getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
    +        public T addResources(Map<String, Double> resources) {
    +            if (resources != null && !resources.isEmpty()) {
    +                String currConf = commons.get(id).get_json_conf();
    +                Map<String, Object> conf = parseJson(currConf);
    +                Map<String, Double> currentResources =
    +                    (Map<String, Double>) conf.computeIfAbsent(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
    +                currentResources.putAll(resources);
    +                commons.get(id).set_json_conf(JSONValue.toJSONString(currConf));
    --- End diff --
    
    You are right.  Fixing it...


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151707538
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
    @@ -187,16 +199,21 @@ public void testGenericResourceAwareStrategy() {
             rs.prepare(conf);
             rs.schedule(topologies, cluster);
     
    +        //We need to have 3 slots on 3 separate hosts to make the GPU situation work
    +
             HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
    -        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
    +        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3 - 500 MB, 50% CPU, 2 GPU
    --- End diff --
    
    That makes sense. What I'm unsure about is what determines the task numbering, I'm not understanding why the numbering is spout - 0, bolt1 - 1, 2, bolt2 - 5, 6, bolt3 - 3, 4. I would have thought they'd be numbered in declaration order?
    
    `genExecsAndComps` enumerates the tasks by counting the parallelism hint for each bolt but it iterates the bolts via a map entryset. I think iteration order for those is undefined except for specific map types, and it looks like TopologyBuilder is using a HashMap. It might make this test unstable, or at least be inconsistent for different JVMs?


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r151444918
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
    @@ -53,12 +53,12 @@
     // Trident subsumes the functionality provided by this class, so it's deprecated
     @Deprecated
    --- End diff --
    
    I'll file some separate JIRAs to remove LinerarDRPCTopologyBuilder and TransactionalTopologies that too has been deprecated for a while too.


---

[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2419
  
    @srdo I have fixed the issues you called out
    
    @Ethanlm I fixed the issue you saw.


---

[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2419#discussion_r150950920
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java ---
    @@ -104,8 +104,8 @@ public static String spoutIdFromCoordinatorId(String coordId) {
         }
         
         Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
    -        Map<GlobalStreamId, String> ret = new HashMap<>(_batchIds);
    -        Set<String> allBatches = new HashSet(_batchIds.values());
    +        Map<GlobalStreamId, String> ret = new HashMap<>(batchIds);
    +        Set<String> allBatches = new HashSet(batchIds.values());
    --- End diff --
    
    Also raw here.


---