You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/05/12 15:58:51 UTC

[48/51] [abbrv] [partial] storm-site git commit: Publish up to date 2.0.0-SNAPSHOT documentation

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/State-checkpointing.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/State-checkpointing.md b/releases/2.0.0-SNAPSHOT/State-checkpointing.md
index 3d6212c..42af3a7 100644
--- a/releases/2.0.0-SNAPSHOT/State-checkpointing.md
+++ b/releases/2.0.0-SNAPSHOT/State-checkpointing.md
@@ -22,55 +22,98 @@ For example a word count bolt could use the key value state abstraction for the
 last committed by the framework during the previous run.
 3. In the execute method, update the word count.
 
- ```java
- public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
- private KeyValueState<String, Long> wordCounts;
- private OutputCollector collector;
- ...
-     @Override
-     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-       this.collector = collector;
-     }
-     @Override
-     public void initState(KeyValueState<String, Long> state) {
-       wordCounts = state;
-     }
-     @Override
-     public void execute(Tuple tuple) {
-       String word = tuple.getString(0);
-       Integer count = wordCounts.get(word, 0);
-       count++;
-       wordCounts.put(word, count);
-       collector.emit(tuple, new Values(word, count));
-       collector.ack(tuple);
-     }
- ...
- }
- ```
+```java
+    public class WordCountBolt extends BaseStatefulBolt<KeyValueState<String, Long>> {
+    private KeyValueState<String, Long> wordCounts;
+    private OutputCollector collector;
+    ...
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+          this.collector = collector;
+        }
+        @Override
+        public void initState(KeyValueState<String, Long> state) {
+          wordCounts = state;
+        }
+        @Override
+        public void execute(Tuple tuple) {
+          String word = tuple.getString(0);
+          Integer count = wordCounts.get(word, 0);
+          count++;
+          wordCounts.put(word, count);
+          collector.emit(tuple, new Values(word, count));
+          collector.ack(tuple);
+        }
+    ...
+    }
+```
+
 4. The framework periodically checkpoints the state of the bolt (default every second). The frequency
 can be changed by setting the storm config `topology.state.checkpoint.interval.ms`
 5. For state persistence, use a state provider that supports persistence by setting the `topology.state.provider` in the
 storm config. E.g. for using Redis based key-value state implementation set `topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider`
-in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the `storm-redis-*.jar`
-in the extlib directory.
+in storm.yaml. The provider implementation jar should be in the class path, which in this case means adding `storm-redis` 
+to dependency of your topology, or adding `--artifacts "org.apache.storm:storm-sql-redis:<storm-version>"` when submitting your topology with `storm jar`.
 6. The state provider properties can be overridden by setting `topology.state.provider.config`. For Redis state this is a
 json config with the following properties.
 
- ```
+```
+{
+  "keyClass": "Optional fully qualified class name of the Key type.",
+  "valueClass": "Optional fully qualified class name of the Value type.",
+  "keySerializerClass": "Optional Key serializer implementation class.",
+  "valueSerializerClass": "Optional Value Serializer implementation class.",
+  "jedisPoolConfig": {
+    "host": "localhost",
+    "port": 6379,
+    "timeout": 2000,
+    "database": 0,
+    "password": "xyz"
+    }
+}
+```
+ 
+For Redis Cluster state this is a json config with the following properties.
+ 
+```
  {
    "keyClass": "Optional fully qualified class name of the Key type.",
    "valueClass": "Optional fully qualified class name of the Value type.",
    "keySerializerClass": "Optional Key serializer implementation class.",
    "valueSerializerClass": "Optional Value Serializer implementation class.",
-   "jedisPoolConfig": {
-     "host": "localhost",
-     "port": 6379,
+   "jedisClusterConfig": {
+     "nodes": ["localhost:7379", "localhost:7380", "localhost:7381"],
      "timeout": 2000,
-     "database": 0,
-     "password": "xyz"
-     }
+     "maxRedirections": 5
+   }
  }
- ```
+```
+
+NOTE: If you used Redis state with Storm version 1.1.0 or earlier, you would need to also migrate your state since the representation of state has changed  
+from Base64-encoded string to binary to reduce huge overhead. Storm provides a migration tool to help, which is placed on `storm-redis-example` module.
+
+Please download the source from download page or clone the project, and type below command:
+
+```
+mvn clean install -DskipTests
+cd examples/storm-redis-examples
+<storm-installation-dir>/bin/storm jar target/storm-redis-examples-*.jar org.apache.storm.redis.tools.Base64ToBinaryStateMigrationUtil [options]
+```
+
+Supported options are listed here:
+
+```
+ -d,--dbnum <arg>       Redis DB number (default: 0)
+ -h,--host <arg>        Redis hostname (default: localhost)
+ -n,--namespace <arg>   REQUIRED the list of namespace to migrate.
+ -p,--port <arg>        Redis port (default: 6379)
+    --password <arg>    Redis password (default: no password)
+```
+
+You can provide multiple `namespace` options to migrate multiple namespaces at once. 
+(e.g.: `--namespace total-7 --namespace partialsum-3`)
+Other options are not mandatory.
+Please note that you need to also migrate the key starting with "$checkpointspout-" since it's internal namespace of state. 
 
 ## Checkpoint mechanism
 Checkpoint is triggered by an internal checkpoint spout at the specified `topology.state.checkpoint.interval.ms`. If there is
@@ -119,10 +162,14 @@ duplicate state updates during recovery.
 
 The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee.
 
-In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the `WordCountBolt` example in the State management section above.
+In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples 
+while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically 
+managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple 
+after processing like in the `WordCountBolt` example in the State management section above.
 
 ### IStateful bolt hooks
 IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions.
+
 ```java
     /**
      * This is a hook for the component to perform some actions just before the
@@ -142,6 +189,7 @@ IStateful bolt interface provides hook methods where in the stateful bolts could
      */
     void preRollback();
 ```
+
 This is optional and stateful bolts are not expected to provide any implementation. This is provided so that other
 system level components can be built on top of the stateful abstractions where we might want to take some actions before the
 stateful bolt's state is prepared, committed or rolled back.
@@ -161,3 +209,81 @@ The framework instantiates the state via the corresponding `StateProvider` imple
 a `StateProvider` implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace.
 The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding
 State implementation should be available in the class path of Storm (by placing them in the extlib directory).
+
+
+### Supported State Backends
+
+#### Redis
+
+* State provider class name (`topology.state.provider`)
+
+`org.apache.storm.redis.state.RedisKeyValueStateProvider`
+
+* Provider config (`topology.state.provider.config`)
+
+```
+ {
+   "keyClass": "Optional fully qualified class name of the Key type.",
+   "valueClass": "Optional fully qualified class name of the Value type.",
+   "keySerializerClass": "Optional Key serializer implementation class.",
+   "valueSerializerClass": "Optional Value Serializer implementation class.",
+   "jedisPoolConfig": {
+     "host": "localhost",
+     "port": 6379,
+     "timeout": 2000,
+     "database": 0,
+     "password": "xyz"
+   }
+ }
+ ```
+ 
+* Artifacts to add (`--artifacts`)
+
+`org.apache.storm:storm-redis:<storm-version>`
+
+#### HBase
+
+In order to make state scalable, HBaseKeyValueState stores state KV to a row. This introduces `non-atomic` commit phase and guarantee 
+eventual consistency on HBase side. It doesn't matter in point of state's view because HBaseKeyValueState can still provide not-yet-committed value.
+Even if worker crashes at commit phase, after restart it will read pending-commit states (stored atomically) from HBase and states will be stored eventually. 
+
+NOTE: HBase state provider uses pre-created table and column family, so users need to create and provide one to the provider config.
+
+You can simply create table via `create 'state', 'cf'` in `hbase shell` but in production you may want to give some more properties.
+
+* State provider class name (`topology.state.provider`)
+
+`org.apache.storm.hbase.state.HBaseKeyValueStateProvider`
+
+* Provider config (`topology.state.provider.config`)
+        
+```
+ {
+   "keyClass": "Optional fully qualified class name of the Key type.",
+   "valueClass": "Optional fully qualified class name of the Value type.",
+   "keySerializerClass": "Optional Key serializer implementation class.",
+   "valueSerializerClass": "Optional Value Serializer implementation class.",
+   "hbaseConfigKey": "config key to load hbase configuration from storm root configuration. (similar to storm-hbase)",
+   "tableName": "Pre-created table name for state.",
+   "columnFamily": "Pre-created column family for state."
+ }
+ ```
+
+If you want to initialize HBase state provider from codebase, please see below example:
+
+```
+Config conf = new Config();
+    Map<String, Object> hbConf = new HashMap<String, Object>();
+    hbConf.put("hbase.rootdir", "file:///tmp/hbase");
+    conf.put("hbase.conf", hbConf);
+    conf.put("topology.state.provider",  "org.apache.storm.hbase.state.HBaseKeyValueStateProvider");
+    conf.put("topology.state.provider.config", "{" +
+            "   \"hbaseConfigKey\": \"hbase.conf\"," +
+            "   \"tableName\": \"state\"," +
+            "   \"columnFamily\": \"cf\"" +
+            " }");
+```
+
+* Artifacts to add (`--artifacts`)
+
+`org.apache.storm:storm-hbase:<storm-version>`
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Storm-Scheduler.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Storm-Scheduler.md b/releases/2.0.0-SNAPSHOT/Storm-Scheduler.md
index b0bd45e..4b9807e 100644
--- a/releases/2.0.0-SNAPSHOT/Storm-Scheduler.md
+++ b/releases/2.0.0-SNAPSHOT/Storm-Scheduler.md
@@ -4,10 +4,10 @@ layout: documentation
 documentation: true
 ---
 
-Storm now has 4 kinds of built-in schedulers: [DefaultScheduler]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/scheduler/DefaultScheduler.java), [IsolationScheduler]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java), [MultitenantScheduler]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java), [ResourceAwareScheduler](Resource_Aware_Scheduler_overview.html). 
+Storm now has 4 kinds of built-in schedulers: [DefaultScheduler]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java), [IsolationScheduler]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java), [MultitenantScheduler]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java), [ResourceAwareScheduler](Resource_Aware_Scheduler_overview.html). 
 
 ## Pluggable scheduler
-You can implement your own scheduler to replace the default scheduler to assign executors to workers. You configure the class to use the "storm.scheduler" config in your storm.yaml, and your scheduler must implement the [IScheduler]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/scheduler/IScheduler.java) interface.
+You can implement your own scheduler to replace the default scheduler to assign executors to workers. You configure the class to use the "storm.scheduler" config in your storm.yaml, and your scheduler must implement the [IScheduler]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/scheduler/IScheduler.java) interface.
 
 ## Isolation Scheduler
 The isolation scheduler makes it easy and safe to share a cluster among many topologies. The isolation scheduler lets you specify which topologies should be "isolated", meaning that they run on a dedicated set of machines within the cluster where no other topologies will be running. These isolated topologies are given priority on the cluster, so resources will be allocated to isolated topologies if there's competition with non-isolated topologies, and resources will be taken away from non-isolated topologies if necessary to get resources for an isolated topology. Once all isolated topologies are allocated, the remaining machines on the cluster are shared among all non-isolated topologies.

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Stream-API.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Stream-API.md b/releases/2.0.0-SNAPSHOT/Stream-API.md
new file mode 100644
index 0000000..fb381a4
--- /dev/null
+++ b/releases/2.0.0-SNAPSHOT/Stream-API.md
@@ -0,0 +1,491 @@
+---
+title: Stream API Overview
+layout: documentation
+documentation: true
+---
+
+* [Concepts](#concepts)
+    * [Stream Builder](#streambuilder)
+    * [Value mapper](#valuemapper)
+* [Stream APIs](#streamapis)
+    * [Basic transformations](#basictransformations)
+        * [filter](#filter)
+        * [map](#map)
+        * [flatmap](#flatmap)
+    * [Windowing](#windowing)
+    * [Transformation to key-value pairs](#keyvaluepairs)
+        * [mapToPair](#mapflatmaptopair)
+        * [flatMapToPair](#mapflatmaptopair)
+    * [Aggregations](#aggregations)
+        * [aggregate](#aggregatereduce)
+        * [reduce](#aggregatereduce)
+        * [aggregateByKey](#aggregatereducebykey)
+        * [reduceByKey](#aggregatereducebykey)
+        * [groupByKey](#groupbykey)
+        * [countByKey](#countbykey)
+    * [Repartition](#repartition)
+    * [Output operations](#outputoperations)
+        * [print](#print)
+        * [peek](#peek)
+        * [forEach](#foreach)
+        * [to](#to)
+    * [Branch](#branching)
+    * [Joins](#joins)
+    * [CoGroupByKey](#cogroupbykey)
+    * [State](#state)
+        * [updateStateByKey](#updatestatebykey)
+        * [stateQuery](#statequery)
+* [Guarantees](#guarantees)        
+* [Example](#example)
+
+Historically Storm provided Spout and Bolt apis for expressing streaming computations. Though these apis are fairly simple to use, 
+there are no reusable constructs for expressing common streaming operations like filtering, transformations, windowing, joins, 
+aggregations and so on.
+
+Stream APIs build on top of the Storm's spouts and bolts to provide a typed API for expressing streaming computations and supports functional style operations such as map-reduce. 
+
+# <a name="concepts"></a> Concepts 
+
+Conceptually a `Stream` can be thought of as a stream of messages flowing through a pipeline. A `Stream` may be generated by reading messages out of a source like spout, or by transforming other streams. For example,
+
+```java
+// imports
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+...
+
+StreamBuilder builder = new StreamBuilder();
+
+// a stream of sentences obtained from a source spout
+Stream<String> sentences = builder.newStream(new RandomSentenceSpout()).map(tuple -> tuple.getString(0));
+
+// a stream of words obtained by transforming (splitting) the stream of sentences
+Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));
+
+// output operation that prints the words to console
+words.forEach(w -> System.out.println(w));
+```
+
+
+Most stream operations accept parameters that describe user-specified behavior typically via lambda expressions like `s -> Arrays.asList(s.split(" "))` as in the above example.
+
+A `Stream` supports two kinds of operations, 
+
+1. **Transformations** that produce another stream from the current stream  (like the `flatMap` operation in the example above) 
+1. **Output operations** that produce a result. (like the `forEach` operation in the example above).
+
+## <a name="streambuilder"></a> Stream Builder
+
+`StreamBuilder` provides the builder apis to create a new stream. Typically a spout forms the source of a stream.
+
+```java
+StreamBuilder builder = new StreamBuilder();
+Stream<Tuple> sentences = builder.newStream(new TestSentenceSpout());
+```
+
+The `StreamBuilder` tracks the overall pipeline of operations expressed via the Stream. One can then create the Storm topology
+via `build()` and submit it like a normal storm topology via `StormSubmitter`.
+
+```java
+StormSubmitter.submitTopologyWithProgressBar("test", new Config(), streamBuilder.build());
+```
+
+## <a name="valuemapper"></a> Value mapper
+
+Value mappers can be used to extract specific fields from the tuples emitted from a spout to produce a typed stream of values. Value mappers are passed as arguments to the `StreamBuilder.newStream`.
+
+```java
+StreamBuilder builder = new StreamBuilder();
+
+// extract the first field from the tuple to get a Stream<String> of sentences
+Stream<String> sentences = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0));
+```
+
+Storm provides strongly typed tuples via the `Pair` and Tuple classes (Tuple3 upto Tuple10). One can use a `TupleValueMapper` to produce a stream of typed tuples as shown below.
+
+```java
+// extract first three fields of the tuple emitted by the spout to produce a stream of typed tuples.
+Stream<Tuple3<String, Integer, Long>> stream = builder.newStream(new TestSpout(), TupleValueMappers.of(0, 1, 2));
+```
+
+# <a name="streamapis"></a> Stream APIs 
+
+Storm's streaming apis (defined in [Stream](../storm-client/src/jvm/org/apache/storm/streams/Stream.java) and [PairStream](../storm-client/src/jvm/org/apache/storm/streams/PairStream.java)) currently support a wide range of operations such as transformations, filters, windowing, aggregations, branching, joins, stateful, output and debugging operations.
+
+## <a name="basictransformations"></a> Basic transformations 
+
+### <a name="filter"></a> filter
+
+`filter` returns a stream consisting of the elements of the stream that matches the given `Predicate` (for which the predicate returns true). 
+
+```java
+Stream<String> logs = ...
+Stream<String> errors = logs.filter(line -> line.contains("ERROR"));
+```
+
+In the above example log lines with 'ERROR' are filtered into an error stream which can be then be further processed.
+
+### <a name="map"></a> map 
+
+`map` returns a stream consisting of the result of applying the given mapping function to the values of the stream.
+
+```java
+Stream<String> words = ...
+Stream<Integer> wordLengths = words.map(String::length);
+```
+
+The example generates a stream of word lengths from a stream of words by applying the String.length function on each value. Note that the type of the resultant stream of a map operation can be different from that of the original stream.  
+
+### <a name="flatmap"></a> flatMap
+
+`flatMap` returns a stream consisting of the results of replacing each value of the stream with the contents produced by applying the provided mapping function to each value. This is similar to map but each value can be mapped to 0 or more values.
+
+```java
+Stream<String> sentences = ...
+Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));
+```
+
+
+In the above example, the lambda function splits each value in the stream to a list of words and the flatMap function generates a flattened stream of words out of it.
+
+## <a name="windowing"></a> Windowing
+
+A `window` operation produces a windowed stream consisting of the elements that fall within the window as specified by the window parameter. All the windowing options supported in the underlying windowed bolts are supported via the Stream apis.
+
+`Stream<T> windowedStream = stream.window(Window<?, ?> windowConfig);`
+
+The windowConfig parameter specifies the windowing config like sliding or tumbling windows based on time duration or event count.
+ 
+```java 
+// time based sliding window
+stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1)));
+
+// count based sliding window
+stream.window(SlidingWindows.of(Count.(10), Count.of(2)));
+
+// tumbling window
+stream.window(TumblingWindows.of(Duration.seconds(10));
+
+// specifying timestamp field for event time based processing and a late tuple stream.
+stream.window(TumblingWindows.of(Duration.seconds(10)
+                     .withTimestampField("ts")
+                     .withLateTupleStream("late_events"));
+```
+                     
+A windowing operation splits the continuous stream of values into subsets and is necessary for performing operations like Joins and Aggregations.
+                     
+## <a name="keyvaluepairs"></a> Transformation to key-value pairs
+                 
+### <a name="mapflatmaptopair"></a> mapToPair and flatMapToPair
+                 
+These operations transform a Stream of values into a stream of key-value pairs.
+
+```java                  
+Stream<Integer> integers = … // 1, 2, 3, 4, ... 
+PairStream<Integer, Integer> squares = integers.mapToPair(x -> Pair.of(x, x*x)); // (1, 1), (2, 4), (3, 9), (4, 16), ...
+```
+
+A key-value pair stream is required for operations like groupByKey, aggregateByKey, joins etc.
+
+## <a name="aggregations"></a> Aggregations
+
+Aggregate operations aggregate the values (or key-values) in a stream. Typically the aggregation operations are performed on a windowed stream where the aggregate results are emitted on each window activation.
+
+### <a name="aggregatereduce"></a> aggregate and reduce
+                  
+`aggregate` and `reduce` computes global aggregation i.e. the values across all partitions are forwarded to a single task for computing the aggregate. 
+
+```java
+Stream<Long> numbers = …
+// aggregate the numbers and produce a stream of last 10 sec sums.
+Stream<Long> sums = numbers.window(TumblingWindows.of(Duration.seconds(10)).aggregate(new Sum());
+
+// the last 10 sec sums computed using reduce
+Stream<Long> sums = numbers.window(...).reduce((x, y) -> x + y);
+```
+
+`aggreagate` and `reduce` differs in the way in which the aggreate results are computed. 
+
+A `reduce` operation repeatedly applies the given reducer and reduces two values to a single value until there is only one value left. This may not be feasible or easy for all kinds of aggreagations (e.g. avg).
+ 
+An `aggregate` operation does a mutable reduction. A mutable reduction accumulates results into an accumulator as it processes the values.
+    
+The aggregation operations (aggregate and reduce) automatically does a local aggregation whenever possible before doing the network shuffle to minimize the amount of messages transmitted over the network. For example to compute sum, a per-partition partial sum is computed and only the partial sums are transferred over the network to the target bolt where the partial sums are merged to produce the final sum. A `CombinerAggregator` interface is used as the argument of `aggregate` to enable this.
+
+For example the `Sum` (passed as the argument of aggregate in the example above) can be implemented as a `CombinerAggregator` as follows.
+
+```java
+public class Sum implements CombinerAggregator<Long, Long, Long> {
+
+    // The initial value of the sum
+    @Override
+    public Long init() {
+        return 0L;
+    }
+
+    // Updates the sum by adding the value (this could be a partial sum)
+    @Override
+    public Long apply(Long aggregate, Long value) {
+        return aggregate + value;
+    }
+
+    // merges the partial sums
+    @Override
+    public Long merge(Long accum1, Long accum2) {
+        return accum1 + accum2;
+    }
+
+    // extract result from the accumulator (here the accumulator and result is the same)
+    @Override
+    public Long result(Long accum) {
+        return accum;
+    }
+}
+```
+
+### <a name="aggregatereducebykey"></a> aggregateByKey and reduceByKey
+
+These are similar to the aggregate and reduce operations but does the aggregation per key.
+
+`aggregateByKey` aggregates the values for each key of the stream using the given Aggregator.
+
+```java
+Stream<String> words = ...                                              // a windowed stream of words
+Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
+                                       .aggregateByKey(new Count<>());  // compute counts per word
+```
+                                       
+`reduceByKey` performs a reduction on the values for each key of this stream by repeatedly applying the reducer.
+
+```java
+Stream<String> words = ...                                              // a windowed stream of words
+Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
+                                       .reduceByKey((x, y) -> x + y);   // compute counts per word
+```
+
+
+Like the global aggregate/reduce, per-partition local aggregate (per key) is computed and the partial results are send to the target bolts where the partial results are merged to produce the final aggregate.
+
+### <a name="groupbykey"></a> groupByKey
+
+`groupByKey` on a stream of key-value pairs returns a new stream where the values are grouped by the keys.
+ 
+```java
+// a stream of (user, score) pairs e.g. ("alice", 10), ("bob", 15), ("bob", 20), ("alice", 11), ("alice", 13)
+PairStream<String, Double> scores = ... 
+
+// list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20])
+PairStream<String, Iterable<Integer>> userScores =  scores.window(...).groupByKey(); 
+```
+
+###  <a name="countbykey"></a> countByKey
+`countByKey` counts the values for each key of this stream.
+
+```java
+Stream<String> words = ...                                              // a windowed stream of words
+Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1)     // convert to a stream of (word, 1) pairs
+                                       .countByKey();                   // compute counts per word
+```
+
+Internally `countByKey` uses `aggregateByKey` to compute the count.
+
+## <a name="repartition"></a> Repartition 
+
+A `repartition` operation re-partitions the current stream and returns a new stream with the specified number of partitions. Further operations on resultant stream would execute at that level of parallelism. Re-partiton can be used to increase or reduce the parallelism of the operations in the stream.
+
+The initial number of partitions can be also specified while creating the stream (via the StreamBuilder.newStream)
+
+```java
+// Stream 's1' will have 2 partitions and operations on s1 will execute at this level of parallelism
+Stream<String> s1 = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2);
+
+// Stream 's2' and further operations will have three partitions
+Stream<String, Integer> s2 = s1.map(function1).repartition(3);
+
+// perform a map operation on s2 and print the result
+s2.map(function2).print();
+```          
+                  
+Note: a `repartition` operation implies network transfer. In the above example the first map operation (function1) would be executed at a parallelism of 2 (on two partitions of s1), whereas the second map operation (function2) would be executed at a parallelism of 3 (on three partitions of s2). This also means that the first and second map operations has to be executed on two separate bolts and involves network transfer.
+
+## <a name="outputoperations"></a> Output operations
+
+Output operations push out the transformed values in the stream to the console, external sinks like databases, files or even Storm bolts.
+ 
+### <a name="print"></a> print
+
+`print` prints the values in the stream to console. For example,
+
+```java
+// transforms words to uppercase and prints to the console
+words.map(String::toUpperCase).print();
+```
+
+### <a name="peek"></a> peek
+
+`peek` returns a stream consisting of the elements of the stream, additionally performing the provided action on each element as they are consumed from the resulting stream. This can be used to ‘inspect’ the values flowing at any stage in a stream.
+
+```java
+builder.newStream(...).flatMap(s -> Arrays.asList(s.split(" ")))
+       // print the results of the flatMap operation as the values flow across the stream.
+      .peek(s -> System.out.println(s))
+      .mapToPair(w -> new Pair<>(w, 1))
+```
+
+### <a name="foreach"></a> forEach
+
+This is the most generic output operation and can be used to execute an arbitrary code for each value in the stream, like storing the results into an external database, file and so on.
+
+```java
+stream.forEach(value -> {
+    // log it
+    LOG.debug(value)
+    // store the value into a db and so on...
+    statement.executeUpdate(..);
+  }
+);
+```
+
+### <a name="to"></a>  to 
+
+This allows one to plug in existing bolts as sinks. 
+
+```java
+// The redisBolt is a standard storm bolt
+IRichBolt redisBolt = new RedisStoreBolt(poolConfig, storeMapper);
+...
+// generate the word counts and store it in redis using redis bolt
+builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
+       .mapToPair(w -> Pair.of(w, 1))
+       .countByKey()
+       // the (word, count) pairs are forwarded to the redisBolt which stores it in redis
+       .to(redisBolt);
+```
+
+Note that this will provide guarantees only based on what the bolt provides.
+
+## <a name="branching"></a> Branch
+
+A `branch` operation can be used to express If-then-else logic on streams.
+
+```java
+Stream<T>[] streams  = stream.branch(Predicate<T>... predicates)
+```
+
+The predicates are applied in the given order to the values of the stream and the result is forwarded to the corresponding (index based) result stream based on the first predicate that matches. If none of the predicates match a value, that value is dropped.
+
+For example,
+
+```java
+Stream<Integer>[] streams = builder.newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0))
+                                   .branch(x -> (x % 2) == 0, 
+                                          x -> (x % 2) == 1);
+Stream<Integer> evenNumbers = streams[0];
+Stream<Integer> oddNumbers = streams[1];
+```
+
+## <a name="joins"></a> Joins
+
+A `join` operation joins the values of one stream with the values having the same key from another stream. 
+
+```java
+PairStream<Long, Long> squares = … // (1, 1), (2, 4), (3, 9) ...
+PairStream<Long, Long> cubes = … // (1, 1), (2, 8), (3, 27) ...
+
+// join the sqaures and cubes stream to produce (1, [1, 1]), (2, [4, 8]), (3, [9, 27]) ...
+PairStream<Long, Pair<Long, Long>> joined = squares.window(TumblingWindows.of(Duration.seconds(5))).join(cubes);
+```
+
+Joins are typically invoked on a windowed stream, joining the key-values that arrived on each stream in the current window. The parallelism of the stream on which the join is invoked is carried forward to the joined stream. An optional `ValueJoiner` can be passed as an argument to join to specify how to join the two values for each matching key (the default behavior is to return a `Pair` of the value from both streams).
+
+Left, right and full outer joins are supported. 
+
+## <a name="cogroupbykey"></a> CoGroupByKey
+
+`coGroupByKey` Groups the values of this stream with the values having the same key from the other stream.
+
+```java
+// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
+PairStream<String, String> stream1 = ...
+
+// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
+PairStream<String, String> stream2 = ...
+
+// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
+PairStream<String, Iterable<String>> coGroupedStream =  stream1.window(...).coGroupByKey(stream2);
+```
+
+## <a name="state"></a> State
+
+Storm provides APIs for applications to save and update the state of its computation and also to query the state.
+
+### <a name="updatestatebykey"></a> updateStateByKey
+
+`updateStateByKey` updates the state by applying a given state update function to the previous state and the new value for the key. `updateStateByKey` can be invoked with either an initial value for the state and a state update function or by directly providing a `StateUpdater` implementation.
+
+```java
+PairStream<String, Long> wordCounts = ...
+// Update the word counts in the state; here the first argument 0L is the initial value for the state and 
+// the second argument is a function that adds the count to the current value in the state.
+StreamState<String, Long> streamState = wordCounts.updateStateByKey(0L, (state, count) -> state + count)
+streamState.toPairStream().print();
+```
+
+The state value can be of any type. In the above example its of type `Long` and stores the word count. 
+
+Internally storm uses stateful bolts for storing the state. The Storm config `topology.state.provider` can be used to choose the state provider implementation. For example set this to `org.apache.storm.redis.state.RedisKeyValueStateProvider` for redis based state store.
+
+### <a name="statequery"></a> stateQuery
+
+`stateQuery` can be used to query the state (updated by `updateStateByKey`). The `StreamState` returned by the updateStateByKey operation has to be used for querying stream state. The values in the stream are used as the keys to query the state.
+
+```java
+
+// The stream of words emitted by the QuerySpout is used as the keys to query the state.
+builder.newStream(new QuerySpout(), new ValueMapper<String>(0))
+// Queries the state and emits the matching (key, value) as results. 
+// The stream state returned by updateStateByKey is passed as the argument to stateQuery.
+.stateQuery(streamState).print();
+```
+
+# <a name="guarantees"></a> Guarantees
+
+Right now the topologies built using Stream API provides **at-least once** guarantee. 
+
+Note that only the `updateStateByKey` operation currently executes on an underlying StatefulBolt. The other stateful operations (join, windowing, aggregation etc) executes on an IRichBolt and stores its state in memory. It relies on storms acking and replay mechanisms to rebuild the state.
+ 
+In future the underlying framework of the Stream API would be enhanced to provide **exactly once** guarantees.
+
+# <a name="example"></a> Example
+
+Here's a word count topology expressed using the Stream API,
+
+```java
+StreamBuilder builder = new StreamBuilder();
+
+builder
+   // A stream of random sentences with two partitions
+   .newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
+   // a two seconds tumbling window
+   .window(TumblingWindows.of(Duration.seconds(2)))
+   // split the sentences to words
+   .flatMap(s -> Arrays.asList(s.split(" ")))
+   // create a stream of (word, 1) pairs
+   .mapToPair(w -> Pair.of(w, 1))
+   // compute the word counts in the last two second window
+   .countByKey()
+   // print the results to stdout
+   .print();
+```
+
+The `RandomSentenceSpout` is a regular Storm spout that continuously emits random sentences. The stream of sentences are split into two second windows and the word count within each window is computed and printed.
+
+The stream can then be submitted just like a regular topology as shown below.
+
+```java
+  Config config = new Config();
+  config.setNumWorkers(1);
+  StormSubmitter.submitTopologyWithProgressBar("topology-name", config, builder.build());
+```
+
+More examples are available under [storm-starter](../examples/storm-starter/src/jvm/org/apache/storm/starter/streams) which will help you get started.

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Structure-of-the-codebase.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Structure-of-the-codebase.md b/releases/2.0.0-SNAPSHOT/Structure-of-the-codebase.md
index e4ca9ac..8c53002 100644
--- a/releases/2.0.0-SNAPSHOT/Structure-of-the-codebase.md
+++ b/releases/2.0.0-SNAPSHOT/Structure-of-the-codebase.md
@@ -15,18 +15,16 @@ The following sections explain each of these layers in more detail.
 
 ### storm.thrift
 
-The first place to look to understand the structure of Storm's codebase is the [storm.thrift]({{page.git-blob-base}}/storm-core/src/storm.thrift) file.
+The first place to look to understand the structure of Storm's codebase is the [storm.thrift]({{page.git-blob-base}}/storm-client/src/storm.thrift) file.
 
-Storm uses [this fork](https://github.com/nathanmarz/thrift/tree/storm) of Thrift (branch 'storm') to produce the generated code. This "fork" is actually Thrift 7 with all the Java packages renamed to be `org.apache.thrift7`. Otherwise, it's identical to Thrift 7. This fork was done because of the lack of backwards compatibility in Thrift and the need for many people to use other versions of Thrift in their Storm topologies.
+Every spout or bolt in a topology is given a user-specified identifier called the "component id". The component id is used to specify subscriptions from a bolt to the output streams of other spouts or bolts. A [StormTopology]({{page.git-blob-base}}/storm-client/src/storm.thrift) structure contains a map from component id to component for each type of component (spouts and bolts).
 
-Every spout or bolt in a topology is given a user-specified identifier called the "component id". The component id is used to specify subscriptions from a bolt to the output streams of other spouts or bolts. A [StormTopology]({{page.git-blob-base}}/storm-core/src/storm.thrift#L91) structure contains a map from component id to component for each type of component (spouts and bolts).
-
-Spouts and bolts have the same Thrift definition, so let's just take a look at the [Thrift definition for bolts]({{page.git-blob-base}}/storm-core/src/storm.thrift#L79). It contains a `ComponentObject` struct and a `ComponentCommon` struct.
+Spouts and bolts have the same Thrift definition, so let's just take a look at the [Thrift definition for bolts]({{page.git-blob-base}}/storm-client/src/storm.thrift). It contains a `ComponentObject` struct and a `ComponentCommon` struct.
 
 The `ComponentObject` defines the implementation for the bolt. It can be one of three types:
 
-1. A serialized java object (that implements [IBolt]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/task/IBolt.java))
-2. A `ShellComponent` object that indicates the implementation is in another language. Specifying a bolt this way will cause Storm to instantiate a [ShellBolt]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java) object to handle the communication between the JVM-based worker process and the non-JVM-based implementation of the component.
+1. A serialized java object (that implements [IBolt]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/task/IBolt.java))
+2. A `ShellComponent` object that indicates the implementation is in another language. Specifying a bolt this way will cause Storm to instantiate a [ShellBolt]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java) object to handle the communication between the JVM-based worker process and the non-JVM-based implementation of the component.
 3. A `JavaObject` structure which tells Storm the classname and constructor arguments to use to instantiate that bolt. This is useful if you want to define a topology in a non-JVM language. This way, you can make use of JVM-based spouts and bolts without having to create and serialize a Java object yourself.
 
 `ComponentCommon` defines everything else for this component. This includes:
@@ -36,7 +34,7 @@ The `ComponentObject` defines the implementation for the bolt. It can be one of
 3. The parallelism for this component
 4. The component-specific [configuration](Configuration.html) for this component
 
-Note that the structure spouts also have a `ComponentCommon` field, and so spouts can also have declarations to consume other input streams. Yet the Storm Java API does not provide a way for spouts to consume other streams, and if you put any input declarations there for a spout you would get an error when you tried to submit the topology. The reason that spouts have an input declarations field is not for users to use, but for Storm itself to use. Storm adds implicit streams and bolts to the topology to set up the [acking framework](https://github.com/apache/storm/wiki/Guaranteeing-message-processing), and two of these implicit streams are from the acker bolt to each spout in the topology. The acker sends "ack" or "fail" messages along these streams whenever a tuple tree is detected to be completed or failed. The code that transforms the user's topology into the runtime topology is located [here]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/common.clj#L279).
+Note that the structure spouts also have a `ComponentCommon` field, and so spouts can also have declarations to consume other input streams. Yet the Storm Java API does not provide a way for spouts to consume other streams, and if you put any input declarations there for a spout you would get an error when you tried to submit the topology. The reason that spouts have an input declarations field is not for users to use, but for Storm itself to use. Storm adds implicit streams and bolts to the topology to set up the [acking framework](Acking-framework-implementation.html), and two of these implicit streams are from the acker bolt to each spout in the topology. The acker sends "ack" or "fail" messages along these streams whenever a tuple tree is detected to be completed or failed. The code that transforms the user's topology into the runtime topology is located [here]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java).
 
 ### Java interfaces
 
@@ -55,80 +53,78 @@ You can see this strategy at work with the [BaseRichSpout](javadocs/org/apache/s
 
 Spouts and bolts are serialized into the Thrift definition of the topology as described above. 
 
-One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java#L205) of the `TopologyBuilder` code.
+One subtle aspect of the interfaces is the difference between `IBolt` and `ISpout` vs. `IRichBolt` and `IRichSpout`. The main difference between them is the addition of the `declareOutputFields` method in the "Rich" versions of the interfaces. The reason for the split is that the output fields declaration for each output stream needs to be part of the Thrift struct (so it can be specified from any language), but as a user you want to be able to declare the streams as part of your class. What `TopologyBuilder` does when constructing the Thrift representation is call `declareOutputFields` to get the declaration and convert it into the Thrift structure. The conversion happens [at this portion]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java) of the `TopologyBuilder` code.
 
 
 ### Implementation
 
 Specifying all the functionality via Java interfaces ensures that every feature of Storm is available via Java. Moreso, the focus on Java interfaces ensures that the user experience from Java-land is pleasant as well.
 
-The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [org.apache.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/coordination), [org.apache.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/drpc), and [org.apache.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/transactional) packages.
+The implementation of Storm, on the other hand, is primarily in Clojure. While the codebase is about 50% Java and 50% Clojure in terms of LOC, most of the implementation logic is in Clojure. There are two notable exceptions to this, and that is the [DRPC](https://github.com/apache/storm/wiki/Distributed-RPC) and [transactional topologies](https://github.com/apache/storm/wiki/Transactional-topologies) implementations. These are implemented purely in Java. This was done to serve as an illustration for how to implement a higher level abstraction on Storm. The DRPC and transactional topologies implementations are in the [org.apache.storm.coordination]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/coordination), [org.apache.storm.drpc]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/drpc), and [org.apache.storm.transactional]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/transactional) packages.
 
 Here's a summary of the purpose of the main Java packages and Clojure namespace:
 
 #### Java packages
 
-[org.apache.storm.coordination]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here.
+[org.apache.storm.coordination]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/coordination): Implements the pieces required to coordinate batch-processing on top of Storm, which both DRPC and transactional topologies use. `CoordinatedBolt` is the most important class here.
 
-[org.apache.storm.drpc]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/drpc): Implementation of the DRPC higher level abstraction
+[org.apache.storm.drpc]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/drpc): Implementation of the DRPC higher level abstraction
 
-[org.apache.storm.generated]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions)
+[org.apache.storm.generated]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/generated): The generated Thrift code for Storm (generated using [this fork](https://github.com/nathanmarz/thrift) of Thrift, which simply renames the packages to org.apache.thrift7 to avoid conflicts with other Thrift versions)
 
-[org.apache.storm.grouping]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/grouping): Contains interface for making custom stream groupings
+[org.apache.storm.grouping]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/grouping): Contains interface for making custom stream groupings
 
-[org.apache.storm.hooks]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/storm/wiki/Hooks).
+[org.apache.storm.hooks]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/hooks): Interfaces for hooking into various events in Storm, such as when tasks emit tuples, when tuples are acked, etc. User guide for hooks is [here](https://github.com/apache/storm/wiki/Hooks).
 
-[org.apache.storm.serialization]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/).
+[org.apache.storm.serialization]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/serialization): Implementation of how Storm serializes/deserializes tuples. Built on top of [Kryo](http://code.google.com/p/kryo/).
 
-[org.apache.storm.spout]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages.
+[org.apache.storm.spout]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/spout): Definition of spout and associated interfaces (like the `SpoutOutputCollector`). Also contains `ShellSpout` which implements the protocol for defining spouts in non-JVM languages.
 
-[org.apache.storm.task]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/task): Definition of bolt and associated interfaces (like `OutputCollector`). Also contains `ShellBolt` which implements the protocol for defining bolts in non-JVM languages. Finally, `TopologyContext` is defined here as well, which is provided to spouts and bolts so they can get data about the topology and its execution at runtime.
+[org.apache.storm.task]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/task): Definition of bolt and associated interfaces (like `OutputCollector`). Also contains `ShellBolt` which implements the protocol for defining bolts in non-JVM languages. Finally, `TopologyContext` is defined here as well, which is provided to spouts and bolts so they can get data about the topology and its execution at runtime.
 
-[org.apache.storm.testing]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/testing): Contains a variety of test bolts and utilities used in Storm's unit tests.
+[org.apache.storm.testing]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/testing): Contains a variety of test bolts and utilities used in Storm's unit tests.
 
-[org.apache.storm.topology]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts.
+[org.apache.storm.topology]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/topology): Java layer over the underlying Thrift structure to provide a clean, pure-Java API to Storm (users don't have to know about Thrift). `TopologyBuilder` is here as well as the helpful base classes for the different spouts and bolts. The slightly-higher level `IBasicBolt` interface is here, which is a simpler way to write certain kinds of bolts.
 
-[org.apache.storm.transactional]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/transactional): Implementation of transactional topologies.
+[org.apache.storm.transactional]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/transactional): Implementation of transactional topologies.
 
-[org.apache.storm.tuple]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/tuple): Implementation of Storm's tuple data model.
+[org.apache.storm.tuple]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/tuple): Implementation of Storm's tuple data model.
 
-[org.apache.storm.utils]({{page.git-tree-base}}/storm-core/src/jvm/org/apache/storm/tuple): Data structures and miscellaneous utilities used throughout the codebase.
+[org.apache.storm.utils]({{page.git-tree-base}}/storm-client/src/jvm/org/apache/storm/utils): Data structures and miscellaneous utilities used throughout the codebase.
 
 [org.apache.storm.command.*]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/command): These implement various commands for the `storm` command line client. These implementations are very short.
-[org.apache.storm.cluster]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/cluster): This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is stored, typically in Zookeeper.
+[org.apache.storm.cluster]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/cluster): This code manages how cluster state (like what tasks are running where, what spout/bolt each task runs as) is stored, typically in Zookeeper.
 
-[org.apache.storm.daemon.Acker]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/daemon/Acker.java): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing.
+[org.apache.storm.daemon.Acker]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/daemon/Acker.java): Implementation of the "acker" bolt, which is a key part of how Storm guarantees data processing.
 
-[org.apache.storm.daemon.DrpcServer]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java): Implementation of the DRPC server for use with DRPC topologies.
+[org.apache.storm.daemon.DrpcServer]({{page.git-blob-base}}/storm-webapp/src/jvm/org/apache/storm/daemon/DrpcServer.java): Implementation of the DRPC server for use with DRPC topologies.
 
-[org.apache.storm.event]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/event): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions.
+[org.apache.storm.event]({{page.git-blob-base}}/storm-server/src/jvm/org/apache/storm/event): Implements a simple asynchronous function executor. Used in various places in Nimbus and Supervisor to make functions execute in serial to avoid any race conditions.
 
-[org.apache.storm.messaging.*]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses Netty, but it is pluggable.
+[org.apache.storm.messaging.*]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/messaging): Defines a higher level interface to implementing point to point messaging. In local mode Storm uses in-memory Java queues to do this; on a cluster, it uses Netty, but it is pluggable.
 
-[org.apache.storm.stats]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/stats): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities.
+[org.apache.storm.stats]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/stats): Implementation of stats rollup routines used when sending stats to ZK for use by the UI. Does things like windowed and rolling aggregations at multiple granularities.
 
-[org.apache.storm.Thrift]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/Thrift.java): Wrappers around the generated Thrift API to make working with Thrift structures more pleasant.
+[org.apache.storm.Thrift]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/Thrift.java): Wrappers around the generated Thrift API to make working with Thrift structures more pleasant.
 
-[org.apache.storm.StormTimer]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/StormTimer.java): Implementation of a background timer to execute functions in the future or on a recurring interval. Storm couldn't use the [Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) class because it needed integration with time simulation in order to be able to unit test Nimbus and the Supervisor.
+[org.apache.storm.StormTimer]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/StormTimer.java): Implementation of a background timer to execute functions in the future or on a recurring interval. Storm couldn't use the [Timer](http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Timer.html) class because it needed integration with time simulation in order to be able to unit test Nimbus and the Supervisor.
 
-#### Clojure namespaces
+[org.apache.storm.daemon.nimbus]({{page.git-blob-base}}/storm-server/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java): Implementation of Nimbus.
 
-[org.apache.storm.clojure]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/clojure.clj): Implementation of the Clojure DSL for Storm.
-
-[org.apache.storm.config]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/config.clj): Created clojure symbols for config names in [Config.java](javadocs/org/apache/storm/Config.html)
- 
-[org.apache.storm.daemon.common]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/common.clj): Implementation of common functions used in Storm daemons, like getting the id for a topology based on the name, mapping a user's topology into the one that actually executes (with implicit acking streams and acker bolt added - see `system-topology!` function), and definitions for the various heartbeat and other structures persisted by Storm.
+[org.apache.storm.daemon.supervisor]({{page.git-blob-base}}/storm-server/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java): Implementation of Supervisor.
 
-[org.apache.storm.daemon.nimbus]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj): Implementation of Nimbus.
+[org.apache.storm.daemon.task]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/daemon/Task.java): Implementation of an individual task for a spout or bolt. Handles message routing, serialization, stats collection for the UI, as well as the spout-specific and bolt-specific execution implementations.
 
-[org.apache.storm.daemon.supervisor]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj): Implementation of Supervisor.
+[org.apache.storm.daemon.worker]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching.
 
-[org.apache.storm.daemon.task]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/task.clj): Implementation of an individual task for a spout or bolt. Handles message routing, serialization, stats collection for the UI, as well as the spout-specific and bolt-specific execution implementations.
+#### Clojure namespaces
 
-[org.apache.storm.daemon.worker]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/daemon/worker.clj): Implementation of a worker process (which will contain many tasks within). Implements message transferring and task launching.
+[org.apache.storm.clojure]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/clojure.clj): Implementation of the Clojure DSL for Storm.
 
+[org.apache.storm.config]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/config.clj): Created clojure symbols for config names in [Config.java](javadocs/org/apache/storm/Config.html)
+ 
 [org.apache.storm.log]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/log.clj): Defines the functions used to log messages to log4j.
 
-[org.apache.storm.testing]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities.
+[org.apache.storm.testing]({{page.git-blob-base}}/storm-clojure/src/clj/org/apache/storm/testing.clj): Implementation of facilities used to test Storm topologies. Includes time simulation, `complete-topology` for running a fixed set of tuples through a topology and capturing the output, tracker topologies for having fine grained control over detecting when a cluster is "idle", and other utilities.
 
 [org.apache.storm.ui.*]({{page.git-blob-base}}/storm-core/src/clj/org/apache/storm/ui): Implementation of Storm UI. Completely independent from rest of code base and uses the Nimbus Thrift API to get data.

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Trident-API-Overview.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Trident-API-Overview.md b/releases/2.0.0-SNAPSHOT/Trident-API-Overview.md
index 309127d..b9f074c 100644
--- a/releases/2.0.0-SNAPSHOT/Trident-API-Overview.md
+++ b/releases/2.0.0-SNAPSHOT/Trident-API-Overview.md
@@ -161,6 +161,7 @@ mystream.flatMap(new Split(), new Fields("word"))
  This could be useful for debugging to see the tuples as they flow past a certain point in a pipeline.
 
 For example, the below code would print the result of converting the words to uppercase before they are passed to `groupBy`
+
 ```java
  mystream.flatMap(new Split()).map(new UpperCase())
          .peek(new Consumer() {
@@ -206,14 +207,14 @@ Partition 2:
 [74,  37]
 [51,  49]
 [37,  98]
-
 ```
 
 `minBy` operation can be applied on the above stream of tuples like below which results in emitting tuples with minimum values of `count` field in each partition.
 
-``` java
+```java
   mystream.minBy(new Fields("count"))
 ```
+
 Result of the above code on mentioned partitions is:
  
 ```
@@ -227,7 +228,6 @@ Partition 1:
 
 Partition 2:
 [82,  23]
-
 ```
 
 You can look at other `min` and `minBy` operations on Stream
@@ -484,7 +484,7 @@ public class Count implements CombinerAggregator<Long> {
 }
 ```
 
-The benefits of CombinerAggregators are seen when you use them with the aggregate method instead of partitionAggregate. In that case, Trident automatically optimizes the computation by doing partial aggregations before transferring tuples over the network.
+CombinerAggregators offer high efficiency when used with the aggregate method instead of partitionAggregate ([see below](#aggregation-operations)).
 
 A ReducerAggregator has the following interface:
 
@@ -608,7 +608,7 @@ The groupBy operation repartitions the stream by doing a partitionBy on the spec
 
 ![Grouping](images/grouping.png)
 
-If you run aggregators on a grouped stream, the aggregation will be run within each group instead of against the whole batch. persistentAggregate can also be run on a GroupedStream, in which case the results will be stored in a [MapState]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/state/map/MapState.java) with the key being the grouping fields. You can read more about persistentAggregate in the [Trident state doc](Trident-state.html).
+If you run aggregators on a grouped stream, the aggregation will be run within each group instead of against the whole batch. persistentAggregate can also be run on a GroupedStream, in which case the results will be stored in a [MapState]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/state/map/MapState.java) with the key being the grouping fields. You can read more about persistentAggregate in the [Trident state doc](Trident-state.html).
 
 Like regular streams, aggregators on grouped streams can be chained.
 

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Trident-RAS-API.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Trident-RAS-API.md b/releases/2.0.0-SNAPSHOT/Trident-RAS-API.md
index 969eab8..ce18e02 100644
--- a/releases/2.0.0-SNAPSHOT/Trident-RAS-API.md
+++ b/releases/2.0.0-SNAPSHOT/Trident-RAS-API.md
@@ -16,6 +16,9 @@ First, an example:
 
 ```java
     TridentTopology topo = new TridentTopology();
+    topo.setResourceDefaults(new DefaultResourceDeclarer();
+                                                          .setMemoryLoad(128)
+                                                          .setCPULoad(20));
     TridentState wordCounts =
         topology
             .newStream("words", feeder)
@@ -29,6 +32,7 @@ First, an example:
             .parallelismHint(10)
             .setCPULoad(50)
             .setMemoryLoad(1024)
+            .each(new Fields("word!"), new QMarkAdder(), new Fields("word!?"))
             .groupBy(new Fields("word!"))
             .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
             .setCPULoad(100)
@@ -44,9 +48,9 @@ In the above case, we end up with
 
 
 - a spout and spout coordinator with a CPU load of 20% each, and a memory load of 512MiB on-heap and 256MiB off-heap.
-- a bolt with 60% cpu load (10% + 50%) and a memory load of 1536MiB (1024 + 512) on-heap from the combined `Split` and `BangAdder`
+- a bolt with 80% cpu load (10% + 50% + 20%) and a memory load of 1664MiB (1024 + 512 + 128) on-heap from the combined `Split` and `BangAdder` and the `QMarkAdder` which used the default resources contained in the DefaultResourceDeclarer
 - a bolt with 100% cpu load and a memory load of 2048MiB on-heap, with default value for off-heap.
 
-The API can be called as many times as is desired.
-It may be called after every operation, after some of the operations, or used in the same manner as `parallelismHint()` to set resources for a whole section.
+Resource declarations may be called after any operation. The operations without explicit resources will get the defaults. If you choose to set resources for only some operations, defaults must be declared, or topology submission will fail.
 Resource declarations have the same *boundaries* as parallelism hints. They don't cross any groupings, shufflings, or any other kind of repartitioning.
+Resources are declared per operation, but get combined within boundaries.

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Trident-spouts.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Trident-spouts.md b/releases/2.0.0-SNAPSHOT/Trident-spouts.md
index e433c4e..978881d 100644
--- a/releases/2.0.0-SNAPSHOT/Trident-spouts.md
+++ b/releases/2.0.0-SNAPSHOT/Trident-spouts.md
@@ -34,10 +34,10 @@ Even while processing multiple batches simultaneously, Trident will order any st
 
 Here are the following spout APIs available:
 
-1. [ITridentSpout]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/spout/ITridentSpout.java): The most general API that can support transactional or opaque transactional semantics. Generally you'll use one of the partitioned flavors of this API rather than this one directly.
-2. [IBatchSpout]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/spout/IBatchSpout.java): A non-transactional spout that emits batches of tuples at a time
-3. [IPartitionedTridentSpout]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java): A transactional spout that reads from a partitioned data source (like a cluster of Kafka servers)
-4. [IOpaquePartitionedTridentSpout]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java): An opaque transactional spout that reads from a partitioned data source
+1. [ITridentSpout]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/spout/ITridentSpout.java): The most general API that can support transactional or opaque transactional semantics. Generally you'll use one of the partitioned flavors of this API rather than this one directly.
+2. [IBatchSpout]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/spout/IBatchSpout.java): A non-transactional spout that emits batches of tuples at a time
+3. [IPartitionedTridentSpout]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java): A transactional spout that reads from a partitioned data source (like a cluster of Kafka servers)
+4. [IOpaquePartitionedTridentSpout]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java): An opaque transactional spout that reads from a partitioned data source
 
 And, like mentioned in the beginning of this tutorial, you can use regular IRichSpout's as well.
  

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Trident-state.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Trident-state.md b/releases/2.0.0-SNAPSHOT/Trident-state.md
index bb5b1ee..a89dc3c 100644
--- a/releases/2.0.0-SNAPSHOT/Trident-state.md
+++ b/releases/2.0.0-SNAPSHOT/Trident-state.md
@@ -309,7 +309,7 @@ public interface Snapshottable<T> extends State {
 }
 ```
 
-[MemoryMapState]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java) and [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/{{page.version}}/src/jvm/trident/memcached/MemcachedState.java) each implement both of these interfaces.
+[MemoryMapState]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryMapState.java) and [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/{{page.version}}/src/jvm/trident/memcached/MemcachedState.java) each implement both of these interfaces.
 
 ## Implementing Map States
 
@@ -322,10 +322,10 @@ public interface IBackingMap<T> {
 }
 ```
 
-OpaqueMap's will call multiPut with [OpaqueValue]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/state/OpaqueValue.java)'s for the vals, TransactionalMap's will give [TransactionalValue]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/state/TransactionalValue.java)'s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.
+OpaqueMap's will call multiPut with [OpaqueValue]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/state/OpaqueValue.java)'s for the vals, TransactionalMap's will give [TransactionalValue]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/state/TransactionalValue.java)'s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.
 
-Trident also provides the [CachedMap]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/state/map/CachedMap.java) class to do automatic LRU caching of map key/vals.
+Trident also provides the [CachedMap]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/state/map/CachedMap.java) class to do automatic LRU caching of map key/vals.
 
-Finally, Trident provides the [SnapshottableMap]({{page.git-blob-base}}/storm-core/src/jvm/org/apache/storm/trident/state/map/SnapshottableMap.java) class that turns a MapState into a Snapshottable object, by storing global aggregations into a fixed key.
+Finally, Trident provides the [SnapshottableMap]({{page.git-blob-base}}/storm-client/src/jvm/org/apache/storm/trident/state/map/SnapshottableMap.java) class that turns a MapState into a Snapshottable object, by storing global aggregations into a fixed key.
 
 Take a look at the implementation of [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java) to see how all these utilities can be put together to make a high performance MapState implementation. MemcachedState allows you to choose between opaque transactional, transactional, and non-transactional semantics.

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Tutorial.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Tutorial.md b/releases/2.0.0-SNAPSHOT/Tutorial.md
index 5dad834..f71c209 100644
--- a/releases/2.0.0-SNAPSHOT/Tutorial.md
+++ b/releases/2.0.0-SNAPSHOT/Tutorial.md
@@ -206,36 +206,9 @@ public static class ExclamationBolt extends BaseRichBolt {
 
 Let's see how to run the `ExclamationTopology` in local mode and see that it's working.
 
-Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies. When you run the topologies in storm-starter, they'll run in local mode and you'll be able to see what messages each component is emitting. You can read more about running topologies in local mode on [Local mode](Local-mode.html).
+Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies. You can read more about running topologies in local mode on [Local mode](Local-mode.html).
 
-In distributed mode, Storm operates as a cluster of machines. When you submit a topology to the master, you also submit all the code necessary to run the topology. The master will take care of distributing your code and allocating workers to run your topology. If workers go down, the master will reassign them somewhere else. You can read more about running topologies on a cluster on [Running topologies on a production cluster](Running-topologies-on-a-production-cluster.html)]. 
-
-Here's the code that runs `ExclamationTopology` in local mode:
-
-```java
-Config conf = new Config();
-conf.setDebug(true);
-conf.setNumWorkers(2);
-
-LocalCluster cluster = new LocalCluster();
-cluster.submitTopology("test", conf, builder.createTopology());
-Utils.sleep(10000);
-cluster.killTopology("test");
-cluster.shutdown();
-```
-
-First, the code defines an in-process cluster by creating a `LocalCluster` object. Submitting topologies to this virtual cluster is identical to submitting topologies to distributed clusters. It submits a topology to the `LocalCluster` by calling `submitTopology`, which takes as arguments a name for the running topology, a configuration for the topology, and then the topology itself.
-
-The name is used to identify the topology so that you can kill it later on. A topology will run indefinitely until you kill it.
-
-The configuration is used to tune various aspects of the running topology. The two configurations specified here are very common:
-
-1. **TOPOLOGY_WORKERS** (set with `setNumWorkers`) specifies how many _processes_ you want allocated around the cluster to execute the topology. Each component in the topology will execute as many _threads_. The number of threads allocated to a given component is configured through the `setBolt` and `setSpout` methods. Those _threads_ exist within worker _processes_. Each worker _process_ contains within it some number of _threads_ for some number of components. For instance, you may have 300 threads specified across all your components and 50 worker processes specified in your config. Each worker process will execute 6 threads, each of which of could belong to a different component. You tune the performance of Storm topologies by tweaking the parallelism for each component and the number of worker processes those threads should run within.
-2. **TOPOLOGY_DEBUG** (set with `setDebug`), when set to true, tells Storm to log every message every emitted by a component. This is useful in local mode when testing topologies, but you probably want to keep this turned off when running topologies on the cluster.
-
-There's many other configurations you can set for the topology. The various configurations are detailed on [the Javadoc for Config](javadocs/org/apache/storm/Config.html).
-
-To learn about how to set up your development environment so that you can run topologies in local mode (such as in Eclipse), see [Creating a new Storm project](Creating-a-new-Storm-project.html).
+To run a topology in local mode run the command `storm local` instead of `storm jar`.
 
 ## Stream groupings
 

http://git-wip-us.apache.org/repos/asf/storm-site/blob/ef81b5ca/releases/2.0.0-SNAPSHOT/Windowing.md
----------------------------------------------------------------------
diff --git a/releases/2.0.0-SNAPSHOT/Windowing.md b/releases/2.0.0-SNAPSHOT/Windowing.md
index 82212ea..01d8857 100644
--- a/releases/2.0.0-SNAPSHOT/Windowing.md
+++ b/releases/2.0.0-SNAPSHOT/Windowing.md
@@ -146,8 +146,20 @@ public BaseWindowedBolt withTimestampField(String fieldName)
 ```
 
 The value for the above `fieldName` will be looked up from the incoming tuple and considered for windowing calculations. 
-If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter 
-can also be specified which indicates the max time limit for tuples with out of order timestamps. 
+If the field is not present in the tuple an exception will be thrown. Alternatively a [TimestampExtractor](../storm-client/src/jvm/org/apache/storm/windowing/TimestampExtractor.java) can be used to
+derive a timestamp value from a tuple (e.g. extract timestamp from a nested field within the tuple).
+
+```java
+/**
+* Specify the timestamp extractor implementation.
+*
+* @param timestampExtractor the {@link TimestampExtractor} implementation
+*/
+public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
+```
+
+
+Along with the timestamp field name/extractor, a time lag parameter can also be specified which indicates the max time limit for tuples with out of order timestamps.
 
 ```java
 /**
@@ -238,7 +250,7 @@ e10 is not evaluated since the tuple ts `8:00:39` is beyond the watermark time `
 
 The window calculation considers the time gaps and computes the windows based on the tuple timestamp.
 
-## Guarentees
+## Guarantees
 The windowing functionality in storm core currently provides at-least once guarentee. The values emitted from the bolts
 `execute(TupleWindow inputWindow)` method are automatically anchored to all the tuples in the inputWindow. The downstream
 bolts are expected to ack the received tuple (i.e the tuple emitted from the windowed bolt) to complete the tuple tree. 
@@ -254,3 +266,111 @@ tuples can be received within the timeout period.
 An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window 
 average.
 
+## Stateful windowing
+The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the 
+window. This limits the use cases to windows that
+fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts
+(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads 
+due to the complex acking and anchoring requirements.
+ 
+To address the above limitations and to support larger window sizes, storm provides stateful windowing support via `IStatefulWindowedBolt`. 
+User bolts should typically extend `BaseStatefulWindowedBolt` for the windowing operations with the framework automatically 
+managing the state of the window in the background.
+
+If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this
+to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of 
+failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with 
+message id between the last expired and last evaluated message ids are fed into the system without activating any previously
+activated windows.
+The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting
+the `messageIdField` as shown below,
+
+```java
+topologyBuilder.setBolt("mybolt",
+                   new MyStatefulWindowedBolt()
+                   .withWindow(...) // windowing configuarations
+                   .withMessageIdField("msgid"), // a monotonically increasing 'long' field in the tuple
+                   parallelism)
+               .shuffleGrouping("spout");
+```
+
+However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained
+while re-emitting the messages in case of failures. With this option the tuples are still buffered in memory until processed
+and expired from the window.
+
+For more details take a look at the sample topology in storm-starter [StatefulWindowingTopology](../examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java) which will help you get started.
+
+### Window checkpointing
+
+With window checkpointing, the monotonically increasing id is no longer required since the framework transparently saves the state of the window periodically into the configured state backend.
+The state that is saved includes the tuples in the window, any system state that is required to recover the state of processing
+and also the user state.
+
+```java
+topologyBuilder.setBolt("mybolt",
+                   new MyStatefulPersistentWindowedBolt()
+                   .withWindow(...) // windowing configuarations
+                   .withPersistence() // persist the window state
+                   .withMaxEventsInMemory(25000), // max number of events to be cached in memory
+                    parallelism)
+               .shuffleGrouping("spout");
+
+```
+
+The `withPersistence` instructs the framework to transparently save the tuples in window along with
+any associated system and user state to the state backend. The `withMaxEventsInMemory` is an optional 
+configuration that specifies the maximum number of tuples that may be kept in memory. The tuples are transparently loaded from 
+the state backend as required and the ones that are most likely to be used again are retained in memory.
+
+The state backend can be configured by setting the topology state provider config,
+
+```java
+// use redis for state persistence
+conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+```
+Currently storm supports Redis and HBase as state backends and uses the underlying state-checkpointing
+framework for saving the window state. For more details on state checkpointing see [State-checkpointing.md](State-checkpointing.md)
+
+Here is an example of a persistent windowed bolt that uses the window checkpointing to save its state. The `initState`
+is invoked with the last saved state (user state) at initialization time. The execute method is invoked based on the configured
+windowing parameters and the tuples in the active window can be accessed via an `iterator` as shown below.
+
+```java
+public class MyStatefulPersistentWindowedBolt extends BaseStatefulWindowedBolt<K, V> {
+  private KeyValueState<K, V> state;
+  
+  @Override
+  public void initState(KeyValueState<K, V> state) {
+    this.state = state;
+   // ...
+   // restore the state from the last saved state.
+   // ...
+  }
+  
+  @Override
+  public void execute(TupleWindow window) {      
+    // iterate over tuples in the current window
+    Iterator<Tuple> it = window.getIter();
+    while (it.hasNext()) {
+        // compute some result based on the tuples in window
+    }
+    
+    // possibly update any state to be maintained across windows
+    state.put(STATE_KEY, updatedValue);
+    
+    // emit the results downstream
+    collector.emit(new Values(result));
+  }
+}
+```
+
+**Note:** In case of persistent windowed bolts, use `TupleWindow.getIter` to retrieve an iterator over the
+events in the window. If the number of tuples in windows is huge, invoking `TupleWindow.get` would
+try to load all the tuples into memory and may throw an OOM exception.
+
+**Note:** In case of persistent windowed bolts the `TupleWindow.getNew` and `TupleWindow.getExpired` are currently not supported
+and will throw an `UnsupportedOperationException`.
+
+For more details take a look at the sample topology in storm-starter [PersistentWindowingTopology](../examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java)
+which will help you get started.