You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/14 14:23:46 UTC
[1/3] storm git commit: Improvements for Trident RAS API.
Repository: storm
Updated Branches:
refs/heads/1.x-branch 8b1deb977 -> 66f5f68aa
Improvements for Trident RAS API.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5cf73b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5cf73b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5cf73b9
Branch: refs/heads/1.x-branch
Commit: c5cf73b9078176482d273907c79ca8e9a1e85498
Parents: 9e9ec43
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Fri Jun 17 12:59:06 2016 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Aug 11 10:19:50 2016 -0500
----------------------------------------------------------------------
.../topology/BaseConfigurationDeclarer.java | 14 ++--
.../apache/storm/trident/TridentTopology.java | 82 ++++++++++++++------
.../org/apache/storm/trident/graph/Group.java | 74 ++++++++++++++----
.../operation/DefaultResourceDeclarer.java | 12 +--
.../org/apache/storm/trident/planner/Node.java | 9 ++-
.../storm/trident/planner/ProcessorNode.java | 5 ++
.../topology/TridentTopologyBuilder.java | 29 +++++--
.../apache/storm/trident/integration_test.clj | 3 +
8 files changed, 171 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java b/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
index 7c090a7..8b12574 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java
@@ -47,7 +47,7 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration
if(val!=null) val = val.intValue();
return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val);
}
-
+
@Override
public T setNumTasks(Number val) {
if (val != null) val = val.intValue();
@@ -56,16 +56,18 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration
@Override
public T setMemoryLoad(Number onHeap) {
- return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
+ if (onHeap != null) {
+ onHeap = onHeap.doubleValue();
+ return addConfiguration(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
+ }
+ return null;
}
@Override
public T setMemoryLoad(Number onHeap, Number offHeap) {
T ret = null;
- if (onHeap != null) {
- onHeap = onHeap.doubleValue();
- ret = addConfiguration(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
- }
+ ret = setMemoryLoad(onHeap);
+
if (offHeap!=null) {
offHeap = offHeap.doubleValue();
ret = addConfiguration(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
index 06e1576..e87b1d1 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -26,6 +26,7 @@ import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
@@ -43,6 +44,7 @@ import org.apache.storm.trident.fluent.IAggregatableStream;
import org.apache.storm.trident.fluent.UniqueIdGen;
import org.apache.storm.trident.graph.GraphGrouper;
import org.apache.storm.trident.graph.Group;
+import org.apache.storm.trident.operation.DefaultResourceDeclarer;
import org.apache.storm.trident.operation.GroupedMultiReducer;
import org.apache.storm.trident.operation.ITridentResource;
import org.apache.storm.trident.operation.MultiReducer;
@@ -76,7 +78,6 @@ import org.apache.storm.trident.util.ErrorEdgeFactory;
import org.apache.storm.trident.util.IndexedEdge;
import org.apache.storm.trident.util.TridentUtils;
-
// graph with 3 kinds of nodes:
// operation, partition, or spout
// all operations have finishBatch and can optionally be committers
@@ -84,10 +85,12 @@ public class TridentTopology {
//TODO: add a method for drpc stream, needs to know how to automatically do return results, etc
// is it too expensive to do a batch per drpc request?
-
+
final DefaultDirectedGraph<Node, IndexedEdge> _graph;
final Map<String, List<Node>> _colocate;
final UniqueIdGen _gen;
+ Map<String, Number> _resourceDefaults = new HashMap<>();
+ Map<String, Number> _masterCoordResources = new HashMap<>();
public TridentTopology() {
this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
@@ -272,8 +275,18 @@ public class TridentTopology {
groupedStreams(streams, joinFields),
new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
outFields);
- }
-
+ }
+
+ public TridentTopology setResourceDefaults(DefaultResourceDeclarer defaults) {
+ _resourceDefaults = defaults.getResources();
+ return this;
+ }
+
+ public TridentTopology setMasterCoordResources(DefaultResourceDeclarer resources) {
+ _masterCoordResources = resources.getResources();
+ return this;
+ }
+
public StormTopology build() {
DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
@@ -389,29 +402,28 @@ public class TridentTopology {
// System.out.println(graph);
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
-
+
TridentTopologyBuilder builder = new TridentTopologyBuilder();
-
+
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
-
- Map defaults = Utils.readDefaultConfig();
for(SpoutNode sn: spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
- Map<String, Number> spoutRes = null;
- spoutRes = mergeDefaultResources(sn.getResources(), defaults);
+ Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
+ spoutRes.putAll(sn.getResources());
+
Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ SpoutDeclarer spoutDeclarer = null;
+
if(sn.type == SpoutNode.SpoutType.DRPC) {
- builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
- (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn))
- .setMemoryLoad(onHeap, offHeap)
- .setCPULoad(cpuLoad);
+ spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
+ (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {
ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {
@@ -422,9 +434,20 @@ public class TridentTopology {
throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
}
- builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn))
- .setMemoryLoad(onHeap, offHeap)
- .setCPULoad(cpuLoad);
+ spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
+ }
+
+ if(onHeap != null) {
+ if(offHeap != null) {
+ spoutDeclarer.setMemoryLoad(onHeap, offHeap);
+ }
+ else {
+ spoutDeclarer.setMemoryLoad(onHeap);
+ }
+ }
+
+ if(cpuLoad != null) {
+ spoutDeclarer.setCPULoad(cpuLoad);
}
}
@@ -432,16 +455,28 @@ public class TridentTopology {
if(!isSpoutGroup(g)) {
Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
- Map<String, Number> groupRes = mergeDefaultResources(g.getResources(), defaults);
+ Map<String, Number> groupRes = g.getResources(_resourceDefaults);
Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
- committerBatches(g, batchGroupMap), streamToGroup)
- .setMemoryLoad(onHeap, offHeap)
- .setCPULoad(cpuLoad);
+ committerBatches(g, batchGroupMap), streamToGroup);
+
+ if(onHeap != null) {
+ if(offHeap != null) {
+ d.setMemoryLoad(onHeap, offHeap);
+ }
+ else {
+ d.setMemoryLoad(onHeap);
+ }
+ }
+
+ if(cpuLoad != null) {
+ d.setCPULoad(cpuLoad);
+ }
+
Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
@@ -451,8 +486,9 @@ public class TridentTopology {
}
}
}
-
- return builder.buildTopology();
+ HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
+ combinedMasterCoordResources.putAll(_masterCoordResources);
+ return builder.buildTopology(combinedMasterCoordResources);
}
private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
index 2c92304..69deb0e 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/graph/Group.java
@@ -20,18 +20,17 @@ package org.apache.storm.trident.graph;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.jgrapht.DirectedGraph;
-import org.apache.storm.trident.operation.ITridentResource;
import org.apache.storm.trident.planner.Node;
import org.apache.storm.trident.util.IndexedEdge;
import org.apache.storm.trident.util.TridentUtils;
-
-public class Group implements ITridentResource {
+public class Group {
public final Set<Node> nodes = new HashSet<>();
private final DirectedGraph<Node, IndexedEdge> graph;
private final String id = UUID.randomUUID().toString();
@@ -67,21 +66,68 @@ public class Group implements ITridentResource {
return ret;
}
- @Override
- public Map<String, Number> getResources() {
- Map<String, Number> ret = new HashMap<>();
+ /**
+ * In case no resources are specified, returns empty map.
+ * In case differing types of resources are specified, throw.
+ * Otherwise, add all the resources for a group.
+ */
+ public Map<String, Number> getResources(Map<String, Number> defaults) {
+ if(defaults == null) {
+ defaults = new HashMap<>();
+ }
+
+ Map<String, Number> resources = null;
for(Node n: nodes) {
- Map<String, Number> res = n.getResources();
- for(Map.Entry<String, Number> kv : res.entrySet()) {
- String key = kv.getKey();
- Number val = kv.getValue();
- if(ret.containsKey(key)) {
- val = new Double(val.doubleValue() + ret.get(key).doubleValue());
+ if(resources == null) {
+ // After this, resources should contain all the kinds of resources
+ // we can count for the group. If we see a kind of resource in another
+ // node not in resources.keySet(), we'll throw.
+ resources = new HashMap<>(defaults);
+ resources.putAll(n.getResources());
+ }
+ else {
+ Map<String, Number> node_res = new HashMap<>(defaults);
+ node_res.putAll(n.getResources());
+
+ if(!node_res.keySet().equals(resources.keySet())) {
+ StringBuilder ops = new StringBuilder();
+
+ for(Node nod : nodes) {
+ Set<String> resource_keys = new HashSet<>(defaults.keySet());
+ resource_keys.addAll(nod.getResources().keySet());
+ ops.append("\t[ " + nod.shortString() + ", Resources Set: " + resource_keys + " ]\n");
+ }
+
+ if(node_res.keySet().containsAll(resources.keySet())) {
+ Set<String> diffset = new HashSet<>(node_res.keySet());
+ diffset.removeAll(resources.keySet());
+ throw new RuntimeException("Found an operation with resources set which are not set in other operations in the group:\n" +
+ "\t[ " + n.shortString() + " ]: " + diffset + "\n" +
+ "Either set these resources in all other operations in the group, add a default setting, or remove the setting from this operation.\n" +
+ "The group at fault:\n" +
+ ops);
+ }
+ else if(resources.keySet().containsAll(node_res.keySet())) {
+ Set<String> diffset = new HashSet<>(resources.keySet());
+ diffset.removeAll(node_res.keySet());
+ throw new RuntimeException("Found an operation with resources unset which are set in other operations in the group:\n" +
+ "\t[ " + n.shortString() + " ]: " + diffset + "\n" +
+ "Either set these resources in all other operations in the group, add a default setting, or remove the setting from all other operations.\n" +
+ "The group at fault:\n" +
+ ops);
+ }
+ }
+
+ for(Map.Entry<String, Number> kv : node_res.entrySet()) {
+ String key = kv.getKey();
+ Number val = kv.getValue();
+
+ Number newval = new Double(val.doubleValue() + resources.get(key).doubleValue());
+ resources.put(key, newval);
}
- ret.put(key, val);
}
}
- return ret;
+ return resources;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
index d49011a..0b4bdcd 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java
@@ -34,15 +34,17 @@ public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implemen
@Override
public T setMemoryLoad(Number onHeap) {
- return setMemoryLoad(onHeap, Utils.getDouble(conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)));
- }
-
- @Override
- public T setMemoryLoad(Number onHeap, Number offHeap) {
if (onHeap != null) {
onHeap = onHeap.doubleValue();
resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeap);
}
+ return (T)this;
+ }
+
+ @Override
+ public T setMemoryLoad(Number onHeap, Number offHeap) {
+ setMemoryLoad(onHeap);
+
if (offHeap!=null) {
offHeap = offHeap.doubleValue();
resources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, offHeap);
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
index b2466e6..c59b9a9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/Node.java
@@ -28,16 +28,15 @@ import org.apache.commons.lang.builder.ToStringStyle;
public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
private static final AtomicInteger INDEX = new AtomicInteger(0);
-
private String nodeId;
-
+
public String name = null;
public Fields allOutputFields;
public String streamId;
public Integer parallelismHint = null;
public NodeStateInfo stateInfo = null;
public int creationIndex;
-
+
public Node(String streamId, String name, Fields allOutputFields) {
this.nodeId = UUID.randomUUID().toString();
this.allOutputFields = allOutputFields;
@@ -63,4 +62,8 @@ public class Node extends DefaultResourceDeclarer<Node> implements Serializable
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
+
+ public String shortString() {
+ return "nodeId: " + nodeId + ", allOutputFields: " + allOutputFields;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java b/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
index 4aef341..49a9208 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/planner/ProcessorNode.java
@@ -30,4 +30,9 @@ public class ProcessorNode extends Node {
this.processor = processor;
this.selfOutFields = selfOutFields;
}
+
+ @Override
+ public String shortString() {
+ return super.shortString() + ", processor: " + processor + ", selfOutFields: " + selfOutFields;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
index 21f4377..1e366ab 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/TridentTopologyBuilder.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.trident.topology;
+import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.StormTopology;
@@ -129,7 +130,7 @@ public class TridentTopologyBuilder {
return ret;
}
- public StormTopology buildTopology() {
+ public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
@@ -195,11 +196,27 @@ public class TridentTopologyBuilder {
d.addConfigurations(conf);
}
}
-
- for(Map.Entry<String, List<String>> entry: batchesToCommitIds.entrySet()) {
- String batch = entry.getKey();
- List<String> commitIds = entry.getValue();
- builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
+
+ Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+
+ for(String batch: batchesToCommitIds.keySet()) {
+ List<String> commitIds = batchesToCommitIds.get(batch);
+ SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
+
+ if(onHeap != null) {
+ if(offHeap != null) {
+ masterCoord.setMemoryLoad(onHeap, offHeap);
+ }
+ else {
+ masterCoord.setMemoryLoad(onHeap);
+ }
+ }
+
+ if(cpuLoad != null) {
+ masterCoord.setCPULoad(cpuLoad);
+ }
}
for(String id: _bolts.keySet()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c5cf73b9/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
index 7f81c4b..72cebd4 100644
--- a/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/trident/integration_test.clj
@@ -285,6 +285,9 @@
(. collector emit (str (. tuple getString 0) "!")))))
(bind word-counts
(.. topo
+ (setResourceDefaults (doto (org.apache.storm.trident.operation.DefaultResourceDeclarer.)
+ (.setMemoryLoad 0 0)
+ (.setCPULoad 0)))
(newStream "words" feeder)
(parallelismHint 5)
(setCPULoad 20)
[3/3] storm git commit: add STORM-1913 to CHANGELOG
Posted by ka...@apache.org.
add STORM-1913 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66f5f68a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66f5f68a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66f5f68a
Branch: refs/heads/1.x-branch
Commit: 66f5f68aa3a24a5f482ccea17f7eccd9d30d6953
Parents: 00dfe7f
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:23:20 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Aug 14 23:23:20 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/66f5f68a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4b2d9cb..b190cc3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1913: Additions and Improvements for Trident RAS API
* STORM-2037: debug operation should be whitelisted in SimpleAclAuthorizer.
* STORM-2023: Add calcite-core to dependency of storm-sql-runtime
* STORM-2036: Fix minor bug in RAS Tests
[2/3] storm git commit: Merge branch '1.x-STORM-1913' of
https://github.com/knusbaum/incubator-storm into STORM-1913-1.x
Posted by ka...@apache.org.
Merge branch '1.x-STORM-1913' of https://github.com/knusbaum/incubator-storm into STORM-1913-1.x
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/00dfe7f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/00dfe7f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/00dfe7f8
Branch: refs/heads/1.x-branch
Commit: 00dfe7f859ee519f8ba95e4c50fc480b011c16fe
Parents: 8b1deb9 c5cf73b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:20:47 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Aug 14 23:20:47 2016 +0900
----------------------------------------------------------------------
.../topology/BaseConfigurationDeclarer.java | 14 ++--
.../apache/storm/trident/TridentTopology.java | 82 ++++++++++++++------
.../org/apache/storm/trident/graph/Group.java | 74 ++++++++++++++----
.../operation/DefaultResourceDeclarer.java | 12 +--
.../org/apache/storm/trident/planner/Node.java | 9 ++-
.../storm/trident/planner/ProcessorNode.java | 5 ++
.../topology/TridentTopologyBuilder.java | 29 +++++--
.../apache/storm/trident/integration_test.clj | 3 +
8 files changed, 171 insertions(+), 57 deletions(-)
----------------------------------------------------------------------