You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/11/14 15:29:12 UTC
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
GitHub user revans2 opened a pull request:
https://github.com/apache/storm/pull/2419
STORM-2805: Clean up confs in TopologyBuilders
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/revans2/incubator-storm STORM-2805
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2419.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2419
----
commit dffe76eba24a903e64a7a62d2b3409ae8bf8b8ac
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date: 2017-11-13T13:58:06Z
STORM-2805: Clean up confs in TopologyBuilders
----
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r150948421
--- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
@@ -84,24 +84,24 @@ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
p = parallelism.intValue();
}
Component component = new Component(bolt, p);
- _bolts.put(id, component);
+ bolts.put(id, component);
return new BoltDeclarerImpl(component);
}
public void extendTopology(TopologyBuilder builder) {
- BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
- for (InputDeclaration decl: _masterBolt.declarations) {
+ BoltDeclarer declarer = builder.setBolt(masterId, new CoordinatedBolt(masterBolt.bolt), masterBolt.parallelism);
+ for (InputDeclaration decl: masterBolt.declarations) {
decl.declare(declarer);
}
- for (Map<String, Object> conf: _masterBolt.componentConfs) {
- declarer.addConfigurations(conf);
+ if (!masterBolt.componentConf.isEmpty()) {
--- End diff --
Nit: addConfigurations seems like it does this check too, so it seems unnecessary here
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151444469
--- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
@@ -446,31 +446,26 @@ public String getComponent() {
}
private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
+ component.declarations.add(declaration);
}
@Override
public BoltDeclarer addConfigurations(Map<String, Object> conf) {
- _component.componentConfs.add(conf);
+ if (conf != null && !conf.isEmpty()) {
+ component.componentConf.putAll(conf);
+ }
return this;
}
@Override
- public Map getRASConfiguration() {
- for (Map<String, Object> conf : _component.componentConfs) {
- if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
- return conf;
- }
- }
- Map<String, Object> newConf = new HashMap<>();
- newConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
- _component.componentConfs.add(newConf);
- return newConf;
+ public Map<String, Object> getRASConfiguration() {
+ //TODO this should not be modifiable
--- End diff --
Yes thank you I forgot to grep the patch for TODOs.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r150950456
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
@@ -53,12 +53,12 @@
// Trident subsumes the functionality provided by this class, so it's deprecated
@Deprecated
--- End diff --
Not directly related to this PR, but this class has been deprecated for a while. Maybe we should consider removing some of the deprecated classes for 2.0.0?
---
[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2419
Thanks, +1
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151697329
--- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java ---
@@ -308,7 +308,7 @@ public String toString() {
", _maxTransactionActive=" + _maxTransactionActive +
", _coordinators=" + _coordinators +
", _managedSpoutIds=" + _managedSpoutIds +
- ", _spouts=" + _spouts +
+ ", spouts=" + _spouts +
--- End diff --
Do we want to get rid of the underscores of other variables? I wonder why only `spouts`
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151444375
--- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
@@ -84,24 +84,24 @@ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
p = parallelism.intValue();
}
Component component = new Component(bolt, p);
- _bolts.put(id, component);
+ bolts.put(id, component);
return new BoltDeclarerImpl(component);
}
public void extendTopology(TopologyBuilder builder) {
- BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
- for (InputDeclaration decl: _masterBolt.declarations) {
+ BoltDeclarer declarer = builder.setBolt(masterId, new CoordinatedBolt(masterBolt.bolt), masterBolt.parallelism);
+ for (InputDeclaration decl: masterBolt.declarations) {
decl.declare(declarer);
}
- for (Map<String, Object> conf: _masterBolt.componentConfs) {
- declarer.addConfigurations(conf);
+ if (!masterBolt.componentConf.isEmpty()) {
--- End diff --
It is probably unnecessary and I will pull it out. I left it in initially for compatibility, why try to add an empty config, which will be a no-op.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/storm/pull/2419
---
[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2419
Thanks, happy to help :)
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151700529
--- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
@@ -187,16 +199,21 @@ public void testGenericResourceAwareStrategy() {
rs.prepare(conf);
rs.schedule(topologies, cluster);
+ //We need to have 3 slots on 3 separate hosts to make the GPU situation work
+
HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
- expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
+ expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3 - 500 MB, 50% CPU, 2 GPU
--- End diff --
This test was also wrong because it was not taking the GPUs into account. That is why I called out all of the resources in the comments for the scheduling. bolt-3 requires 2 GPUs. Each node has 2 GPUs on it. Having 2 instances of a bolt-3 on the same node, which is what the scheduling was before, would go over the GPU count for the node.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151699779
--- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java ---
@@ -308,7 +308,7 @@ public String toString() {
", _maxTransactionActive=" + _maxTransactionActive +
", _coordinators=" + _coordinators +
", _managedSpoutIds=" + _managedSpoutIds +
- ", _spouts=" + _spouts +
+ ", spouts=" + _spouts +
--- End diff --
This was probably the IDE getting overly ambitious when I refactored something else. I'll fix it.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151699527
--- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
@@ -129,23 +130,34 @@ public void testGenericResourceAwareStrategySharedMemory() {
double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
- assertEquals(1, assignment.getSlots().size());
- WorkerSlot ws = assignment.getSlots().iterator().next();
- String nodeId = ws.getNodeId();
- assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
- assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
- assertEquals(1, assignment.getScheduledResources().size());
- WorkerResources resources = assignment.getScheduledResources().get(ws);
- assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
- assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
- assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
- assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
- assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+ Set<WorkerSlot> slots = assignment.getSlots();
+ Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
+ Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
+ assertEquals(2, slots.size());
--- End diff --
I'll fix the comment. The test was wrong because the GPU resources were not being recorded properly.
Each supervisor has a single GPU. Each spout needs a GPU, but there are 2 spouts, so it cannot be on a single node, and hence cannot be on a single slot. I can either up the number of CPUs per node to 2 and leave the rest of the test alone, or I can update the comment.
---
[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2419
@srdo I think I have addressed all of your comments. I also found a few places were an NPE could happen, and one where the TopologyBuilder was not setting the resource configs correctly. I fixed it and the corresponding test failures.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151636460
--- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
@@ -187,16 +199,21 @@ public void testGenericResourceAwareStrategy() {
rs.prepare(conf);
rs.schedule(topologies, cluster);
+ //We need to have 3 slots on 3 separate hosts to make the GPU situation work
+
HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
- expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
+ expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3 - 500 MB, 50% CPU, 2 GPU
--- End diff --
Why is bolt 3 task 3 and 4 instead of 5 and 6 as before?
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151704811
--- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
@@ -129,23 +130,34 @@ public void testGenericResourceAwareStrategySharedMemory() {
double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
- assertEquals(1, assignment.getSlots().size());
- WorkerSlot ws = assignment.getSlots().iterator().next();
- String nodeId = ws.getNodeId();
- assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
- assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
- assertEquals(1, assignment.getScheduledResources().size());
- WorkerResources resources = assignment.getScheduledResources().get(ws);
- assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
- assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
- assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
- assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
- assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+ Set<WorkerSlot> slots = assignment.getSlots();
+ Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
+ Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
+ assertEquals(2, slots.size());
--- End diff --
Thanks for explaining.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151630914
--- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
@@ -562,43 +560,54 @@ private void initCommon(String id, IComponent component, Number parallelism) thr
}
Map<String, Object> conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
- _commons.put(id, common);
+ commons.put(id, common);
}
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
- String _id;
+ String id;
public ConfigGetter(String id) {
- _id = id;
+ this.id = id;
}
@SuppressWarnings("unchecked")
@Override
public T addConfigurations(Map<String, Object> conf) {
- if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
- throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
+ if (conf != null) {
+ if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
+ throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
+ }
+ if (!conf.isEmpty()) {
+ String currConf = commons.get(id).get_json_conf();
+ commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
+ }
}
- String currConf = _commons.get(_id).get_json_conf();
- _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
return (T) this;
}
- @SuppressWarnings("unchecked")
@Override
- public T addResource(String resourceName, Number resourceValue) {
- Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().getOrDefault(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
-
- resourcesMap.put(resourceName, resourceValue.doubleValue());
-
- getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
+ public T addResources(Map<String, Double> resources) {
+ if (resources != null && !resources.isEmpty()) {
+ String currConf = commons.get(id).get_json_conf();
+ Map<String, Object> conf = parseJson(currConf);
+ Map<String, Double> currentResources =
+ (Map<String, Double>) conf.computeIfAbsent(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+ currentResources.putAll(resources);
+ commons.get(id).set_json_conf(JSONValue.toJSONString(currConf));
--- End diff --
Isn't this writing the original currConf string back to commons instead of the modified conf map?
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r150950610
--- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java ---
@@ -51,8 +51,8 @@
// based on transactional topologies
public class TridentTopologyBuilder {
- Map<GlobalStreamId, String> _batchIds = new HashMap();
- Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
+ Map<GlobalStreamId, String> batchIds = new HashMap();
--- End diff --
Nit: Raw types here and in the following lines
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r150949260
--- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
@@ -446,31 +446,26 @@ public String getComponent() {
}
private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
+ component.declarations.add(declaration);
}
@Override
public BoltDeclarer addConfigurations(Map<String, Object> conf) {
- _component.componentConfs.add(conf);
+ if (conf != null && !conf.isEmpty()) {
+ component.componentConf.putAll(conf);
+ }
return this;
}
@Override
- public Map getRASConfiguration() {
- for (Map<String, Object> conf : _component.componentConfs) {
- if (conf.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
- return conf;
- }
- }
- Map<String, Object> newConf = new HashMap<>();
- newConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
- _component.componentConfs.add(newConf);
- return newConf;
+ public Map<String, Object> getRASConfiguration() {
+ //TODO this should not be modifiable
--- End diff --
Would be good to fix this TODO before merging, or at least remove the TODO and raise an issue. I think it'll be forgotten if it's left here.
---
[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2419
@srdo thanks for the review you found a lot of bugs and made this a lot better.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151632384
--- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
@@ -129,23 +130,34 @@ public void testGenericResourceAwareStrategySharedMemory() {
double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
- assertEquals(1, assignment.getSlots().size());
- WorkerSlot ws = assignment.getSlots().iterator().next();
- String nodeId = ws.getNodeId();
- assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
- assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
- assertEquals(1, assignment.getScheduledResources().size());
- WorkerResources resources = assignment.getScheduledResources().get(ws);
- assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
- assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
- assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
- assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
- assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+ Set<WorkerSlot> slots = assignment.getSlots();
+ Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
+ Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
+ assertEquals(2, slots.size());
--- End diff --
The comment in line 126 says everything should fit in a single slot. Why does the assignment use two slots now?
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r150948564
--- Diff: storm-client/src/jvm/org/apache/storm/coordination/BatchSubtopologyBuilder.java ---
@@ -446,31 +446,26 @@ public String getComponent() {
}
private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
+ component.declarations.add(declaration);
}
@Override
public BoltDeclarer addConfigurations(Map<String, Object> conf) {
- _component.componentConfs.add(conf);
+ if (conf != null && !conf.isEmpty()) {
--- End diff --
Nit: I think putAll is a no-op when the conf is empty, so you probably don't need to check.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151697850
--- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
@@ -562,43 +560,54 @@ private void initCommon(String id, IComponent component, Number parallelism) thr
}
Map<String, Object> conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
- _commons.put(id, common);
+ commons.put(id, common);
}
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
- String _id;
+ String id;
public ConfigGetter(String id) {
- _id = id;
+ this.id = id;
}
@SuppressWarnings("unchecked")
@Override
public T addConfigurations(Map<String, Object> conf) {
- if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
- throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
+ if (conf != null) {
+ if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
+ throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
+ }
+ if (!conf.isEmpty()) {
+ String currConf = commons.get(id).get_json_conf();
+ commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
+ }
}
- String currConf = _commons.get(_id).get_json_conf();
- _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
return (T) this;
}
- @SuppressWarnings("unchecked")
@Override
- public T addResource(String resourceName, Number resourceValue) {
- Map<String, Double> resourcesMap = (Map<String, Double>) getRASConfiguration().getOrDefault(
- Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap());
-
- resourcesMap.put(resourceName, resourceValue.doubleValue());
-
- getRASConfiguration().put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
+ public T addResources(Map<String, Double> resources) {
+ if (resources != null && !resources.isEmpty()) {
+ String currConf = commons.get(id).get_json_conf();
+ Map<String, Object> conf = parseJson(currConf);
+ Map<String, Double> currentResources =
+ (Map<String, Double>) conf.computeIfAbsent(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+ currentResources.putAll(resources);
+ commons.get(id).set_json_conf(JSONValue.toJSONString(currConf));
--- End diff --
You are right. Fixing it...
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151707538
--- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java ---
@@ -187,16 +199,21 @@ public void testGenericResourceAwareStrategy() {
rs.prepare(conf);
rs.schedule(topologies, cluster);
+ //We need to have 3 slots on 3 separate hosts to make the GPU situation work
+
HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
- expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(0, 0)))); //Spout
+ expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3 - 500 MB, 50% CPU, 2 GPU
--- End diff --
That makes sense. What I'm unsure about is what determines the task numbering, I'm not understanding why the numbering is spout - 0, bolt1 - 1, 2, bolt2 - 5, 6, bolt3 - 3, 4. I would have thought they'd be numbered in declaration order?
`genExecsAndComps` enumerates the tasks by counting the parallelism hint for each bolt but it iterates the bolts via a map entryset. I think iteration order for those is undefined except for specific map types, and it looks like TopologyBuilder is using a HashMap. It might make this test unstable, or at least be inconsistent for different JVMs?
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r151444918
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
@@ -53,12 +53,12 @@
// Trident subsumes the functionality provided by this class, so it's deprecated
@Deprecated
--- End diff --
I'll file some separate JIRAs to remove LinerarDRPCTopologyBuilder and TransactionalTopologies that too has been deprecated for a while too.
---
[GitHub] storm issue #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2419
@srdo I have fixed the issues you called out
@Ethanlm I fixed the issue you saw.
---
[GitHub] storm pull request #2419: STORM-2805: Clean up confs in TopologyBuilders
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2419#discussion_r150950920
--- Diff: storm-client/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java ---
@@ -104,8 +104,8 @@ public static String spoutIdFromCoordinatorId(String coordId) {
}
Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
- Map<GlobalStreamId, String> ret = new HashMap<>(_batchIds);
- Set<String> allBatches = new HashSet(_batchIds.values());
+ Map<GlobalStreamId, String> ret = new HashMap<>(batchIds);
+ Set<String> allBatches = new HashSet(batchIds.values());
--- End diff --
Also raw here.
---