You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sa...@apache.org on 2017/02/22 00:14:49 UTC

[1/3] storm git commit: [STORM-2367] Documentation for streams API

Repository: storm
Updated Branches:
  refs/heads/master b5053a622 -> 4986373c5


[STORM-2367] Documentation for streams API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7903f9d5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7903f9d5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7903f9d5

Branch: refs/heads/master
Commit: 7903f9d5e00341930e88e254e6355df03df3a16e
Parents: 9bcd566
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Feb 9 19:10:08 2017 +0530
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Tue Feb 21 21:43:07 2017 +0530

----------------------------------------------------------------------
 docs/Stream-API.md | 475 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 475 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7903f9d5/docs/Stream-API.md
----------------------------------------------------------------------
diff --git a/docs/Stream-API.md b/docs/Stream-API.md
new file mode 100644
index 0000000..f2e4cb6
--- /dev/null
+++ b/docs/Stream-API.md
@@ -0,0 +1,475 @@
+---
+title: Stream API Overview
+layout: documentation
+documentation: true
+---
+
+* [Concepts](#concepts)
+    * [Stream Builder](#streambuilder)
+    * [Value mapper](#valuemapper)
+* [Stream APIs](#streamapis)
+    * [Basic transformations](#basictransformations)
+        * [filter](#filter)
+        * [map](#map)
+        * [flatmap](#flatmap)
+    * [Windowing](#windowing)
+    * [Transformation to key-value pairs](#keyvaluepairs)
+        * [mapToPair](#mapflatmaptopair)
+        * [flatMapToPair](#mapflatmaptopair)
+    * [Aggregations](#aggregations)
+        * [aggregate](#aggregatereduce)
+        * [reduce](#aggregatereduce)
+        * [aggregateByKey](#aggregatereducebykey)
+        * [reduceByKey](#aggregatereducebykey)
+        * [groupByKey](#groupbykey)
+        * [countByKey](#countbykey)
+    * [Repartition](#repartition)
+    * [Output operations](#outputoperations)
+        * [print](#print)
+        * [peek](#peek)
+        * [forEach](#foreach)
+        * [to](#to)
+    * [Branch](#branching)
+    * [Joins](#joins)
+    * [State](#state)
+        * [updateStateByKey](#updatestatebykey)
+        * [stateQuery](#statequery)
+* [Guarantees](#guarantees)        
+* [Example](#example)
+
+Historically Storm provided Spout and Bolt apis for expressing streaming computations. Though these apis are fairly simple to use, 
+there are no reusable constructs for expressing common streaming operations like filtering, transformations, windowing, joins, 
+aggregations and so on.
+
+Stream APIs build on top of the Storm's spouts and bolts to provide a typed API for expressing streaming computations and supports functional style operations such as map-reduce. 
+
+# <a name="concepts"></a> Concepts 
+
+Conceptually a `Stream` can be thought of as a stream of messages flowing through a pipeline. A `Stream` may be generated by reading messages out of a source like spout, or by transforming other streams. For example,
+
+```java
+// imports
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+...
+
+StreamBuilder builder = new StreamBuilder();
+
+// a stream of sentences obtained from a source spout
+Stream<String> sentences = builder.newStream(new RandomSentenceSpout()).map(tuple -> tuple.getString(0));
+
+// a stream of words obtained by transforming (splitting) the stream of sentences
+Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));
+
+// output operation that prints the words to console
+words.forEach(w -> System.out.println(w));
+```
+
+
+Most stream operations accept parameters that describe user-specified behavior typically via lambda expressions like `s -> Arrays.asList(s.split(" "))` as in the above example.
+
+A `Stream` supports two kinds of operations, 
+
+1. **Transformations** that produce another stream from the current stream  (like the `flatMap` operation in the example above) 
+1. **Output operations** that produce a result. (like the `forEach` operation in the example above).
+
+## <a name="streambuilder"></a> Stream Builder
+
+`StreamBuilder` provides the builder apis to create a new stream. Typically a spout forms the source of a stream.
+
+```java
+StreamBuilder builder = new StreamBuilder();
+Stream<Tuple> sentences = builder.newStream(new TestSentenceSpout());
+```
+
+The `StreamBuilder` tracks the overall pipeline of operations expressed via the Stream. One can then create the Storm topology
+via `build()` and submit it like a normal storm topology via `StormSubmitter`.
+
+```java
+StormSubmitter.submitTopologyWithProgressBar("test", new Config(), streamBuilder.build());
+```
+
+## <a name="valuemapper"></a> Value mapper
+
+Value mappers can be used to extract specific fields from the tuples emitted from a spout to produce a typed stream of values. Value mappers are passed as arguments to the `StreamBuilder.newStream`.
+
+```java
+StreamBuilder builder = new StreamBuilder();
+
+// extract the first field from the tuple to get a Stream<String> of sentences
+Stream<String> sentences = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0));
+```
+
+Storm provides strongly typed tuples via the `Pair` and Tuple classes (Tuple3 upto Tuple10). One can use a `TupleValueMapper` to produce a stream of typed tuples as shown below.
+
+```java
+// extract first three fields of the tuple emitted by the spout to produce a stream of typed tuples.
+Stream<Tuple3<String, Integer, Long>> stream = builder.newStream(new TestSpout(), TupleValueMappers.of(0, 1, 2));
+```
+
+# <a name="streamapis"></a> Stream APIs 
+
+Storm's streaming apis (defined in [Stream](../storm-core/src/jvm/org/apache/storm/streams/Stream.java) and [PairStream](../storm-core/src/jvm/org/apache/storm/streams/PairStream.java)) currently support a wide range of operations such as transformations, filters, windowing, aggregations, branching, joins, stateful, output and debugging operations.
+
+## <a name="basictransformations"></a> Basic transformations 
+
+### <a name="filter"></a> filter
+
+`filter` returns a stream consisting of the elements of the stream that matches the given `Predicate` (for which the predicate returns true). 
+
+```java
+Stream<String> logs = ...
+Stream<String> errors = logs.filter(line -> line.contains("ERROR"));
+```
+
+In the above example log lines with 'ERROR' are filtered into an error stream which can be then be further processed.
+
+### <a name="map"></a> map 
+
+`map` returns a stream consisting of the result of applying the given mapping function to the values of the stream.
+
+```java
+Stream<String> words = ...
+Stream<Integer> wordLengths = words.map(String::length);
+```
+
+The example generates a stream of word lengths from a stream of words by applying the String.length function on each value. Note that the type of the resultant stream of a map operation can be different from that of the original stream.  
+
+### <a name="flatmap"></a> flatMap
+
+`flatMap` returns a stream consisting of the results of replacing each value of the stream with the contents produced by applying the provided mapping function to each value. This is similar to map but each value can be mapped to 0 or more values.
+
+```java
+Stream<String> sentences = ...
+Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));
+```
+
+
+In the above example, the lambda function splits each value in the stream to a list of words and the flatMap function generates a flattened stream of words out of it.
+
+## <a name="windowing"></a> Windowing
+
+A `window` operation produces a windowed stream consisting of the elements that fall within the window as specified by the window parameter. All the windowing options supported in the underlying windowed bolts are supported via the Stream apis.
+
+`Stream<T> windowedStream = stream.window(Window<?, ?> windowConfig);`
+
+The windowConfig parameter specifies the windowing config like sliding or tumbling windows based on time duration or event count.
+ 
+```java 
+// time based sliding window
+stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1)));
+
+// count based sliding window
+stream.window(SlidingWindows.of(Count.(10), Count.of(2)));
+
+// tumbling window
+stream.window(TumblingWindows.of(Duration.seconds(10));
+
+// specifying timestamp field for event time based processing and a late tuple stream.
+stream.window(TumblingWindows.of(Duration.seconds(10)
+                     .withTimestampField("ts")
+                     .withLateTupleStream("late_events"));
+```
+                     
+A windowing operation splits the continuous stream of values into subsets and is necessary for performing operations like Joins and Aggregations.
+                     
+## <a name="keyvaluepairs"></a> Transformation to key-value pairs
+                 
+### <a name="mapflatmaptopair"></a> mapToPair and flatMapToPair
+                 
+These operations transform a Stream of values into a stream of key-value pairs.
+
+```java                  
+Stream<Integer> integers = \u2026 // 1, 2, 3, 4, ... 
+PairStream<Integer, Integer> squares = integers.mapToPair(x -> Pair.of(x, x*x)); // (1, 1), (2, 4), (3, 9), (4, 16), ...
+```
+
+A key-value pair stream is required for operations like groupByKey, aggregateByKey, joins etc.
+
+## <a name="aggregations"></a> Aggregations
+
+Aggregate operations aggregate the values (or key-values) in a stream. Typically the aggregation operations are performed on a windowed stream where the aggregate results are emitted on each window activation.
+
+### <a name="aggregatereduce"></a> aggregate and reduce
+                  
+`aggregate` and `reduce` computes global aggregation i.e. the values across all partitions are forwarded to a single task for computing the aggregate. 
+
+```java
+Stream<Long> numbers = \u2026
+// aggregate the numbers and produce a stream of last 10 sec sums.
+Stream<Long> sums = numbers.window(TumblingWindows.of(Duration.seconds(10)).aggregate(new Sum());
+
+// the last 10 sec sums computed using reduce
+Stream<Long> sums = numbers.window(...).reduce((x, y) -> x + y);
+```
+
+`aggreagate` and `reduce` differs in the way in which the aggreate results are computed. 
+
+A `reduce` operation repeatedly applies the given reducer and reduces two values to a single value until there is only one value left. This may not be feasible or easy for all kinds of aggreagations (e.g. avg).
+ 
+An `aggregate` operation does a mutable reduction. A mutable reduction accumulates results into an accumulator as it processes the values.
+    
+The aggregation operations (aggregate and reduce) automatically does a local aggregation whenever possible before doing the network shuffle to minimize the amount of messages transmitted over the network. For example to compute sum, a per-partition partial sum is computed and only the partial sums are transferred over the network to the target bolt where the partial sums are merged to produce the final sum. A `CombinerAggregator` interface is used as the argument of `aggregate` to enable this.
+
+For example the `Sum` (passed as the argument of aggregate in the example above) can be implemented as a `CombinerAggregator` as follows.
+
+```java
+public class Sum implements CombinerAggregator<Long, Long, Long> {
+
+    // The initial value of the sum
+    @Override
+    public Long init() {
+        return 0L;
+    }
+
+    // Updates the sum by adding the value (this could be a partial sum)
+    @Override
+    public Long apply(Long aggregate, Long value) {
+        return aggregate + value;
+    }
+
+    // merges the partial sums
+    @Override
+    public Long merge(Long accum1, Long accum2) {
+        return accum1 + accum2;
+    }
+
+    // extract result from the accumulator (here the accumulator and result is the same)
+    @Override
+    public Long result(Long accum) {
+        return accum;
+    }
+}
+```
+
+### <a name="aggregatereducebykey"></a> aggregateByKey and reduceByKey
+
+These are similar to the aggregate and reduce operations but does the aggregation per key.
+
+`aggregateByKey` aggregates the values for each key of the stream using the given Aggregator.
+
+```java
+Stream<String> words = ...                                              // a windowed stream of words
+Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
+                                       .aggregateByKey(new Count<>());  // compute counts per word
+```
+                                       
+`reduceByKey` performs a reduction on the values for each key of this stream by repeatedly applying the reducer.
+
+```java
+Stream<String> words = ...                                              // a windowed stream of words
+Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
+                                       .reduceByKey((x, y) -> x + y);   // compute counts per word
+```
+
+
+Like the global aggregate/reduce, per-partition local aggregate (per key) is computed and the partial results are send to the target bolts where the partial results are merged to produce the final aggregate.
+
+### <a name="groupbykey"></a> groupByKey
+
+`groupByKey` on a stream of key-value pairs returns a new stream where the values are grouped by the keys.
+ 
+```java
+// a stream of (user, score) pairs e.g. ("alice", 10), ("bob", 15), ("bob", 20), ("alice", 11), ("alice", 13)
+PairStream<String, Double> scores = ... 
+
+// list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20])
+PairStream<String, Iterable<Integer>> userScores =  scores.window(...).groupByKey(); 
+```
+ 
+###  <a name="countbykey"></a> countByKey
+`countByKey` counts the values for each key of this stream.
+```java
+Stream<String> words = ...                                              // a windowed stream of words
+Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
+                                       .countByKey();                   // compute counts per word
+```
+
+Internally `countByKey` uses `aggregateByKey` to compute the count.
+
+## <a name="repartition"></a> Repartition 
+
+A `repartition` operation re-partitions the current stream and returns a new stream with the specified number of partitions. Further operations on resultant stream would execute at that level of parallelism. Re-partiton can be used to increase or reduce the parallelism of the operations in the stream.
+
+The initial number of partitions can be also specified while creating the stream (via the StreamBuilder.newStream)
+
+```java
+// Stream 's1' will have 2 partitions and operations on s1 will execute at this level of parallelism
+Stream<String> s1 = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2);
+
+// Stream 's2' and further operations will have three partitions
+Stream<String, Integer> s2 = s1.map(function1).repartition(3);
+
+// perform a map operation on s2 and print the result
+s2.map(function2).print();
+```          
+                  
+Note: a `repartition` operation implies network transfer. In the above example the first map operation (function1) would be executed at a parallelism of 2 (on two partitions of s1), whereas the second map operation (function2) would be executed at a parallelism of 3 (on three partitions of s2). This also means that the first and second map operations has to be executed on two separate bolts and involves network transfer.
+
+## <a name="outputoperations"></a> Output operations
+
+Output operations push out the transformed values in the stream to the console, external sinks like databases, files or even Storm bolts.
+ 
+### <a name="print"></a> print
+
+`print` prints the values in the stream to console. For example,
+
+```java
+// transforms words to uppercase and prints to the console
+words.map(String::toUpperCase).print();
+```
+
+### <a name="peek"></a> peek
+
+`peek` returns a stream consisting of the elements of the stream, additionally performing the provided action on each element as they are consumed from the resulting stream. This can be used to \u2018inspect\u2019 the values flowing at any stage in a stream.
+
+```java
+builder.newStream(...).flatMap(s -> Arrays.asList(s.split(" ")))
+       // print the results of the flatMap operation as the values flow across the stream.
+      .peek(s -> System.out.println(s))
+      .mapToPair(w -> new Pair<>(w, 1))
+```
+
+### <a name="foreach"></a> forEach
+
+This is the most generic output operation and can be used to execute an arbitrary code for each value in the stream, like storing the results into an external database, file and so on.
+
+```java
+stream.forEach(value -> {
+    // log it
+    LOG.debug(value)
+    // store the value into a db and so on...
+    statement.executeUpdate(..);
+  }
+);
+```
+
+### <a name="to"></a>  to 
+
+This allows one to plug in existing bolts as sinks. 
+
+```java
+// The redisBolt is a standard storm bolt
+IRichBolt redisBolt = new RedisStoreBolt(poolConfig, storeMapper);
+...
+// generate the word counts and store it in redis using redis bolt
+builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
+       .mapToPair(w -> Pair.of(w, 1))
+       .countByKey()
+       // the (word, count) pairs are forwarded to the redisBolt which stores it in redis
+       .to(redisBolt);
+```
+
+Note that this will provide guarantees only based on what the bolt provides.
+
+## <a name="branching"></a> Branch
+
+A `branch` operation can be used to express If-then-else logic on streams.
+
+```java
+Stream<T>[] streams  = stream.branch(Predicate<T>... predicates)
+```
+
+The predicates are applied in the given order to the values of the stream and the result is forwarded to the corresponding (index based) result stream based on the first predicate that matches. If none of the predicates match a value, that value is dropped.
+
+For example,
+```java
+Stream<Integer>[] streams = builder.newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0))
+                                   .branch(x -> (x % 2) == 0, 
+                                          x -> (x % 2) == 1);
+Stream<Integer> evenNumbers = streams[0];
+Stream<Integer> oddNumbers = streams[1];
+```
+
+## <a name="joins"></a> Joins
+
+A `join` operation joins the values of one stream with the values having the same key from another stream. 
+
+```java
+PairStream<Long, Long> squares = \u2026 // (1, 1), (2, 4), (3, 9) ...
+PairStream<Long, Long> cubes = \u2026 // (1, 1), (2, 8), (3, 27) ...
+
+// join the sqaures and cubes stream to produce (1, [1, 1]), (2, [4, 8]), (3, [9, 27]) ...
+PairStream<Long, Pair<Long, Long>> joined = squares.window(TumblingWindows.of(Duration.seconds(5))).join(cubes);
+
+```
+
+Joins are typically invoked on a windowed stream, joining the key-values that arrived on each stream in the current window. The parallelism of the stream on which the join is invoked is carried forward to the joined stream. An optional `ValueJoiner` can be passed as an argument to join to specify how to join the two values for each matching key (the default behavior is to return a `Pair` of the value from both streams).
+
+Left, right and full outer joins are supported. 
+
+
+## <a name="state"></a> State
+
+Storm provides APIs for applications to save and update the state of its computation and also to query the state.
+
+### <a name="updatestatebykey"></a> updateStateByKey
+
+`updateStateByKey` updates the state by applying a given state update function to the previous state and the new value for the key. `updateStateByKey` can be invoked with either an initial value for the state and a state update function or by directly providing a `StateUpdater` implementation.
+
+```java
+PairStream<String, Long> wordCounts = ...
+// Update the word counts in the state; here the first argument 0L is the initial value for the state and 
+// the second argument is a function that adds the count to the current value in the state.
+StreamState<String, Long> streamState = wordCounts.updateStateByKey(0L, (state, count) -> state + count)
+streamState.toPairStream().print();
+```
+
+The state value can be of any type. In the above example its of type `Long` and stores the word count. 
+
+Internally storm uses stateful bolts for storing the state. The Storm config `topology.state.provider` can be used to choose the state provider implementation. For example set this to `org.apache.storm.redis.state.RedisKeyValueStateProvider` for redis based state store.
+
+### <a name="statequery"></a> stateQuery
+
+`stateQuery` can be used to query the state (updated by `updateStateByKey`). The `StreamState` returned by the updateStateByKey operation has to be used for querying stream state. The values in the stream are used as the keys to query the state.
+
+```java
+
+// The stream of words emitted by the QuerySpout is used as the keys to query the state.
+builder.newStream(new QuerySpout(), new ValueMapper<String>(0))
+// Queries the state and emits the matching (key, value) as results. 
+// The stream state returned by updateStateByKey is passed as the argument to stateQuery.
+.stateQuery(streamState).print();
+```
+
+# <a name="guarantees"></a> Guarantees
+
+Right now the topologies built using Stream API provides **at-least once** guarantee. 
+
+Note that only the `updateStateByKey` operation currently executes on an underlying StatefulBolt. The other stateful operations (join, windowing, aggregation etc) executes on an IRichBolt and stores its state in memory. It relies on storms acking and replay mechanisms to rebuild the state.
+ 
+In future the underlying framework of the Stream API would be enhanced to provide **exactly once** guarantees.
+
+# <a name="example"></a> Example
+
+Here's a word count topology expressed using the Stream API,
+
+```java
+StreamBuilder builder = new StreamBuilder();
+
+builder
+   // A stream of random sentences with two partitions
+   .newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
+   // a two seconds tumbling window
+   .window(TumblingWindows.of(Duration.seconds(2)))
+   // split the sentences to words
+   .flatMap(s -> Arrays.asList(s.split(" ")))
+   // create a stream of (word, 1) pairs
+   .mapToPair(w -> Pair.of(w, 1))
+   // compute the word counts in the last two second window
+   .countByKey()
+   // print the results to stdout
+   .print();
+```
+
+The `RandomSentenceSpout` is a regular Storm spout that continuously emits random sentences. The stream of sentences are split into two second windows and the word count within each window is computed and printed.
+
+The stream can then be submitted just like a regular topology as shown below.
+
+```java
+  Config config = new Config();
+  config.setNumWorkers(1);
+  StormSubmitter.submitTopologyWithProgressBar("topology-name", config, builder.build());
+```
+
+More examples are available under [storm-starter](../examples/storm-starter/src/jvm/org/apache/storm/starter/streams) which will help you get started.
\ No newline at end of file


[2/3] storm git commit: Merge branch 'STORM-2367' of https://github.com/arunmahadevan/storm into streams-doc

Posted by sa...@apache.org.
Merge branch 'STORM-2367' of https://github.com/arunmahadevan/storm into streams-doc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a76bbd70
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a76bbd70
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a76bbd70

Branch: refs/heads/master
Commit: a76bbd703c2e29a76ec41f904ea8b02cedc293a7
Parents: b5053a6 7903f9d
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Feb 22 05:31:47 2017 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Feb 22 05:31:47 2017 +0530

----------------------------------------------------------------------
 docs/Stream-API.md | 475 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 475 insertions(+)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-2367 to CHANGELOG.md

Posted by sa...@apache.org.
Added STORM-2367 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4986373c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4986373c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4986373c

Branch: refs/heads/master
Commit: 4986373c5ca8d8e3d6903dd479c4442451c49655
Parents: a76bbd7
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Wed Feb 22 05:43:55 2017 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Wed Feb 22 05:43:55 2017 +0530

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4986373c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a6ca131..b6840a9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-2367: Documentation for streams API
  * STORM-2365: Support for specifying output stream in event hubs spout
  * STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability
  * STORM-2346: Files with unapproved licenses: download-rc-directory.sh verify-release-file.sh