You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/20 23:15:25 UTC

[23/46] storm git commit: remove 'docs' directory

http://git-wip-us.apache.org/repos/asf/storm/blob/93e0d028/docs/documentation/Trident-API-Overview.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Trident-API-Overview.md b/docs/documentation/Trident-API-Overview.md
deleted file mode 100644
index bc5dcb1..0000000
--- a/docs/documentation/Trident-API-Overview.md
+++ /dev/null
@@ -1,312 +0,0 @@
----
-title: Trident API Overview
-layout: documentation
-documentation: true
----
-
-The core data model in Trident is the "Stream", processed as a series of batches. A stream is partitioned among the nodes in the cluster, and operations applied to a stream are applied in parallel across each partition.
-
-There are five kinds of operations in Trident:
-
-1. Operations that apply locally to each partition and cause no network transfer
-2. Repartitioning operations that repartition a stream but otherwise don't change the contents (involves network transfer)
-3. Aggregation operations that do network transfer as part of the operation
-4. Operations on grouped streams
-5. Merges and joins
-
-## Partition-local operations
-
-Partition-local operations involve no network transfer and are applied to each batch partition independently.
-
-### Functions
-
-A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple are appended to the original input tuple in the stream. If a function emits no tuples, the original input tuple is filtered out. Otherwise, the input tuple is duplicated for each output tuple. Suppose you have this function:
-
-```java
-public class MyFunction extends BaseFunction {
-    public void execute(TridentTuple tuple, TridentCollector collector) {
-        for(int i=0; i < tuple.getInteger(0); i++) {
-            collector.emit(new Values(i));
-        }
-    }
-}
-```
-
-Now suppose you have a stream in the variable "mystream" with the fields ["a", "b", "c"] with the following tuples:
-
-```
-[1, 2, 3]
-[4, 1, 6]
-[3, 0, 8]
-```
-
-If you run this code:
-
-```java
-mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
-```
-
-The resulting tuples would have fields ["a", "b", "c", "d"] and look like this:
-
-```
-[1, 2, 3, 0]
-[1, 2, 3, 1]
-[4, 1, 6, 0]
-```
-
-### Filters
-
-Filters take in a tuple as input and decide whether or not to keep that tuple or not. Suppose you had this filter:
-
-```java
-public class MyFilter extends BaseFilter {
-    public boolean isKeep(TridentTuple tuple) {
-        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
-    }
-}
-```
-
-Now suppose you had these tuples with fields ["a", "b", "c"]:
-
-```
-[1, 2, 3]
-[2, 1, 1]
-[2, 3, 4]
-```
-
-If you ran this code:
-
-```java
-mystream.each(new Fields("b", "a"), new MyFilter())
-```
-
-The resulting tuples would be:
-
-```
-[2, 1, 1]
-```
-
-### partitionAggregate
-
-partitionAggregate runs a function on each partition of a batch of tuples. Unlike functions, the tuples emitted by partitionAggregate replace the input tuples given to it. Consider this example:
-
-```java
-mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
-```
-
-Suppose the input stream contained fields ["a", "b"] and the following partitions of tuples:
-
-```
-Partition 0:
-["a", 1]
-["b", 2]
-
-Partition 1:
-["a", 3]
-["c", 8]
-
-Partition 2:
-["e", 1]
-["d", 9]
-["d", 10]
-```
-
-Then the output stream of that code would contain these tuples with one field called "sum":
-
-```
-Partition 0:
-[3]
-
-Partition 1:
-[11]
-
-Partition 2:
-[20]
-```
-
-There are three different interfaces for defining aggregators: CombinerAggregator, ReducerAggregator, and Aggregator.
-
-Here's the interface for CombinerAggregator:
-
-```java
-public interface CombinerAggregator<T> extends Serializable {
-    T init(TridentTuple tuple);
-    T combine(T val1, T val2);
-    T zero();
-}
-```
-
-A CombinerAggregator returns a single tuple with a single field as output. CombinerAggregators run the init function on each input tuple and use the combine function to combine values until there's only one value left. If there's no tuples in the partition, the CombinerAggregator emits the output of the zero function. For example, here's the implementation of Count:
-
-```java
-public class Count implements CombinerAggregator<Long> {
-    public Long init(TridentTuple tuple) {
-        return 1L;
-    }
-
-    public Long combine(Long val1, Long val2) {
-        return val1 + val2;
-    }
-
-    public Long zero() {
-        return 0L;
-    }
-}
-```
-
-The benefits of CombinerAggregators are seen when you use them with the aggregate method instead of partitionAggregate. In that case, Trident automatically optimizes the computation by doing partial aggregations before transferring tuples over the network.
-
-A ReducerAggregator has the following interface:
-
-```java
-public interface ReducerAggregator<T> extends Serializable {
-    T init();
-    T reduce(T curr, TridentTuple tuple);
-}
-```
-
-A ReducerAggregator produces an initial value with init, and then it iterates on that value for each input tuple to produce a single tuple with a single value as output. For example, here's how you would define Count as a ReducerAggregator:
-
-```java
-public class Count implements ReducerAggregator<Long> {
-    public Long init() {
-        return 0L;
-    }
-    
-    public Long reduce(Long curr, TridentTuple tuple) {
-        return curr + 1;
-    }
-}
-```
-
-ReducerAggregator can also be used with persistentAggregate, as you'll see later.
-
-The most general interface for performing aggregations is Aggregator, which looks like this:
-
-```java
-public interface Aggregator<T> extends Operation {
-    T init(Object batchId, TridentCollector collector);
-    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
-    void complete(T state, TridentCollector collector);
-}
-```
-
-Aggregators can emit any number of tuples with any number of fields. They can emit tuples at any point during execution. Aggregators execute in the following way:
-
-1. The init method is called before processing the batch. The return value of init is an Object that will represent the state of the aggregation and will be passed into the aggregate and complete methods.
-2. The aggregate method is called for each input tuple in the batch partition. This method can update the state and optionally emit tuples.
-3. The complete method is called when all tuples for the batch partition have been processed by aggregate. 
-
-Here's how you would implement Count as an Aggregator:
-
-```java
-public class CountAgg extends BaseAggregator<CountState> {
-    static class CountState {
-        long count = 0;
-    }
-
-    public CountState init(Object batchId, TridentCollector collector) {
-        return new CountState();
-    }
-
-    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
-        state.count+=1;
-    }
-
-    public void complete(CountState state, TridentCollector collector) {
-        collector.emit(new Values(state.count));
-    }
-}
-```
-
-Sometimes you want to execute multiple aggregators at the same time. This is called chaining and can be accomplished like this:
-
-```java
-mystream.chainedAgg()
-        .partitionAggregate(new Count(), new Fields("count"))
-        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
-        .chainEnd()
-```
-
-This code will run the Count and Sum aggregators on each partition. The output will contain a single tuple with the fields ["count", "sum"].
-
-### stateQuery and partitionPersist
-
-stateQuery and partitionPersist query and update sources of state, respectively. You can read about how to use them on [Trident state doc](Trident-state.html).
-
-### projection
-
-The projection method on Stream keeps only the fields specified in the operation. If you had a Stream with fields ["a", "b", "c", "d"] and you ran this code:
-
-```java
-mystream.project(new Fields("b", "d"))
-```
-
-The output stream would contain only the fields ["b", "d"].
-
-
-## Repartitioning operations
-
-Repartitioning operations run a function to change how the tuples are partitioned across tasks. The number of partitions can also change as a result of repartitioning (for example, if the parallelism hint is greater after repartioning). Repartitioning requires network transfer. Here are the repartitioning functions:
-
-1. shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions
-2. broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.
-3. partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.
-4. global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
-5. batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions. 
-6. partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping
-
-## Aggregation operations
-
-Trident has aggregate and persistentAggregate methods for doing aggregations on Streams. aggregate is run on each batch of the stream in isolation, while persistentAggregate will aggregation on all tuples across all batches in the stream and store the result in a source of state.
-
-Running aggregate on a Stream does a global aggregation. When you use a ReducerAggregator or an Aggregator, the stream is first repartitioned into a single partition, and then the aggregation function is run on that partition. When you use a CombinerAggregator, on the other hand, first Trident will compute partial aggregations of each partition, then repartition to a single partition, and then finish the aggregation after the network transfer. CombinerAggregator's are far more efficient and should be used when possible.
-
-Here's an example of using aggregate to get a global count for a batch:
-
-```java
-mystream.aggregate(new Count(), new Fields("count"))
-```
-
-Like partitionAggregate, aggregators for aggregate can be chained. However, if you chain a CombinerAggregator with a non-CombinerAggregator, Trident is unable to do the partial aggregation optimization.
-
-You can read more about how to use persistentAggregate in the [Trident state doc](https://github.com/apache/storm/wiki/Trident-state).
-
-## Operations on grouped streams
-
-The groupBy operation repartitions the stream by doing a partitionBy on the specified fields, and then within each partition groups tuples together whose group fields are equal. For example, here's an illustration of a groupBy operation:
-
-![Grouping](images/grouping.png)
-
-If you run aggregators on a grouped stream, the aggregation will be run within each group instead of against the whole batch. persistentAggregate can also be run on a GroupedStream, in which case the results will be stored in a [MapState](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/map/MapState.java) with the key being the grouping fields. You can read more about persistentAggregate in the [Trident state doc](Trident-state.html).
-
-Like regular streams, aggregators on grouped streams can be chained.
-
-## Merges and joins
-
-The last part of the API is combining different streams together. The simplest way to combine streams is to merge them into one stream. You can do that with the TridentTopology#merge method, like so:
-
-```java
-topology.merge(stream1, stream2, stream3);
-```
-
-Trident will name the output fields of the new, merged stream as the output fields of the first stream.
-
-Another way to combine streams is with a join. Now, a standard join, like the kind from SQL, require finite input. So they don't make sense with infinite streams. Joins in Trident only apply within each small batch that comes off of the spout. 
-
-Here's an example join between a stream containing fields ["key", "val1", "val2"] and another stream containing ["x", "val1"]:
-
-```java
-topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
-```
-
-This joins stream1 and stream2 together using "key" and "x" as the join fields for each respective stream. Then, Trident requires that all the output fields of the new stream be named, since the input streams could have overlapping field names. The tuples emitted from the join will contain:
-
-1. First, the list of join fields. In this case, "key" corresponds to "key" from stream1 and "x" from stream2.
-2. Next, a list of all non-join fields from all streams, in order of how the streams were passed to the join method. In this case, "a" and "b" correspond to "val1" and "val2" from stream1, and "c" corresponds to "val1" from stream2.
-
-When a join happens between streams originating from different spouts, those spouts will be synchronized with how they emit batches. That is, a batch of processing will include tuples from each spout.
-
-You might be wondering – how do you do something like a "windowed join", where tuples from one side of the join are joined against the last hour of tuples from the other side of the join.
-
-To do this, you would make use of partitionPersist and stateQuery. The last hour of tuples from one side of the join would be stored and rotated in a source of state, keyed by the join field. Then the stateQuery would do lookups by the join field to perform the "join".
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/93e0d028/docs/documentation/Trident-spouts.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Trident-spouts.md b/docs/documentation/Trident-spouts.md
deleted file mode 100644
index f460908..0000000
--- a/docs/documentation/Trident-spouts.md
+++ /dev/null
@@ -1,44 +0,0 @@
----
-title: Trident Spouts
-layout: documentation
-documentation: true
----
-# Trident spouts
-
-Like in the vanilla Storm API, spouts are the source of streams in a Trident topology. On top of the vanilla Storm spouts, Trident exposes additional APIs for more sophisticated spouts.
-
-There is an inextricable link between how you source your data streams and how you update state (e.g. databases) based on those data streams. See [Trident state doc](Trident-state.html) for an explanation of this – understanding this link is imperative for understanding the spout options available.
-
-Regular Storm spouts will be non-transactional spouts in a Trident topology. To use a regular Storm IRichSpout, create the stream like this in a TridentTopology:
-
-```java
-TridentTopology topology = new TridentTopology();
-topology.newStream("myspoutid", new MyRichSpout());
-```
-
-All spouts in a Trident topology are required to be given a unique identifier for the stream – this identifier must be unique across all topologies run on the cluster. Trident will use this identifier to store metadata about what the spout has consumed in Zookeeper, including the txid and any metadata associated with the spout.
-
-You can configure the Zookeeper storage of spout metadata via the following configuration options:
-
-1. `transactional.zookeeper.servers`: A list of Zookeeper hostnames 
-2. `transactional.zookeeper.port`: The port of the Zookeeper cluster
-3. `transactional.zookeeper.root`: The root dir in Zookeeper where metadata is stored. Metadata will be stored at the path <root path>/<spout id>
-
-## Pipelining
-
-By default, Trident processes a single batch at a time, waiting for the batch to succeed or fail before trying another batch. You can get significantly higher throughput – and lower latency of processing of each batch – by pipelining the batches. You configure the maximum amount of batches to be processed simultaneously with the "topology.max.spout.pending" property. 
-
-Even while processing multiple batches simultaneously, Trident will order any state updates taking place in the topology among batches. For example, suppose you're doing a global count aggregation into a database. The idea is that while you're updating the count in the database for batch 1, you can still be computing the partial counts for batches 2 through 10. Trident won't move on to the state updates for batch 2 until the state updates for batch 1 have succeeded. This is essential for achieving exactly-once processing semantics, as outline in [Trident state doc](Trident-state.html).
-
-## Trident spout types
-
-Here are the following spout APIs available:
-
-1. [ITridentSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java): The most general API that can support transactional or opaque transactional semantics. Generally you'll use one of the partitioned flavors of this API rather than this one directly.
-2. [IBatchSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java): A non-transactional spout that emits batches of tuples at a time
-3. [IPartitionedTridentSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java): A transactional spout that reads from a partitioned data source (like a cluster of Kafka servers)
-4. [IOpaquePartitionedTridentSpout](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java): An opaque transactional spout that reads from a partitioned data source
-
-And, like mentioned in the beginning of this tutorial, you can use regular IRichSpout's as well.
- 
-

http://git-wip-us.apache.org/repos/asf/storm/blob/93e0d028/docs/documentation/Trident-state.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Trident-state.md b/docs/documentation/Trident-state.md
deleted file mode 100644
index d7fc75e..0000000
--- a/docs/documentation/Trident-state.md
+++ /dev/null
@@ -1,331 +0,0 @@
----
-title: Trident State
-layout: documentation
----
-
-
-Trident has first-class abstractions for reading from and writing to stateful sources. The state can either be internal to the topology – e.g., kept in-memory and backed by HDFS – or externally stored in a database like Memcached or Cassandra. There's no difference in the Trident API for either case.
-
-Trident manages state in a fault-tolerant way so that state updates are idempotent in the face of retries and failures. This lets you reason about Trident topologies as if each message were processed exactly-once.
-
-There's various levels of fault-tolerance possible when doing state updates. Before getting to those, let's look at an example that illustrates the tricks necessary to achieve exactly-once semantics. Suppose that you're doing a count aggregation of your stream and want to store the running count in a database. Now suppose you store in the database a single value representing the count, and every time you process a new tuple you increment the count.
-
-When failures occur, tuples will be replayed. This brings up a problem when doing state updates (or anything with side effects) – you have no idea if you've ever successfully updated the state based on this tuple before. Perhaps you never processed the tuple before, in which case you should increment the count. Perhaps you've processed the tuple and successfully incremented the count, but the tuple failed processing in another step. In this case, you should not increment the count. Or perhaps you saw the tuple before but got an error when updating the database. In this case, you *should* update the database.
-
-By just storing the count in the database, you have no idea whether or not this tuple has been processed before. So you need more information in order to make the right decision. Trident provides the following semantics which are sufficient for achieving exactly-once processing semantics:
-
-1. Tuples are processed as small batches (see [the tutorial](Trident-tutorial.html))
-2. Each batch of tuples is given a unique id called the "transaction id" (txid). If the batch is replayed, it is given the exact same txid.
-3. State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.
-
-With these primitives, your State implementation can detect whether or not the batch of tuples has been processed before and take the appropriate action to update the state in a consistent way. The action you take depends on the exact semantics provided by your input spouts as to what's in each batch. There's three kinds of spouts possible with respect to fault-tolerance: "non-transactional", "transactional", and "opaque transactional". Likewise, there's three kinds of state possible with respect to fault-tolerance: "non-transactional", "transactional", and "opaque transactional". Let's take a look at each spout type and see what kind of fault-tolerance you can achieve with each.
-
-## Transactional spouts
-
-Remember, Trident processes tuples as small batches with each batch being given a unique transaction id. The properties of spouts vary according to the guarantees they can provide as to what's in each batch. A transactional spout has the following properties:
-
-1. Batches for a given txid are always the same. Replays of batches for a txid will exact same set of tuples as the first time that batch was emitted for that txid.
-2. There's no overlap between batches of tuples (tuples are in one batch or another, never multiple).
-3. Every tuple is in a batch (no tuples are skipped)
-
-This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. storm-contrib has [an implementation of a transactional spout](https://github.com/apache/storm/tree/master/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java) for Kafka.
-
-You might be wondering – why wouldn't you just always use a transactional spout? They're simple and easy to understand. One reason you might not use one is because they're not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You're now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt. 
-
-This is why "opaque transactional" spouts exist – they are fault-tolerant to losing source nodes while still allowing you to achieve exactly-once processing semantics. We'll cover those spouts in the next section though.
-
-(One side note – once Kafka supports replication, it will be possible to have transactional spouts that are fault-tolerant to node failure, but that feature does not exist yet.)
-
-Before we get to "opaque transactional" spouts, let's look at how you would design a State implementation that has exactly-once semantics for transactional spouts. This State type is called a "transactional state" and takes advantage of the fact that any given txid is always associated with the exact same set of tuples.
-
-Suppose your topology computes word count and you want to store the word counts in a key/value database. The key will be the word, and the value will contain the count. You've already seen that storing just the count as the value isn't sufficient to know whether you've processed a batch of tuples before. Instead, what you can do is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they're the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they're different, you increment the count. This logic works because the batch for a txid never changes, and Trident ensures that state updates are ordered among batches.
-
-Consider this example of why it works. Suppose you are processing txid 3 which consists of the following batch of tuples:
-
-```
-["man"]
-["man"]
-["dog"]
-```
-
-Suppose the database currently holds the following key/value pairs:
-
-```
-man => [count=3, txid=1]
-dog => [count=4, txid=3]
-apple => [count=10, txid=2]
-```
-
-The txid associated with "man" is txid 1. Since the current txid is 3, you know for sure that this batch of tuples is not represented in that count. So you can go ahead and increment the count by 2 and update the txid. On the other hand, the txid for "dog" is the same as the current txid. So you know for sure that the increment from the current batch is already represented in the database for the "dog" key. So you can skip the update. After completing updates, the database looks like this:
-
-```
-man => [count=5, txid=3]
-dog => [count=4, txid=3]
-apple => [count=10, txid=2]
-```
-
-Let's now look at opaque transactional spouts and how to design states for that type of spout.
-
-## Opaque transactional spouts
-
-As described before, an opaque transactional spout cannot guarantee that the batch of tuples for a txid remains constant. An opaque transactional spout has the following property:
-
-1. Every tuple is *successfully* processed in exactly one batch. However, it's possible for a tuple to fail to process in one batch and then succeed to process in a later batch.
-
-[OpaqueTridentKafkaSpout](https://github.com/apache/storm/tree/master/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.
-
-With opaque transactional spouts, it's no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.
-
-What you can do is store more state in the database. Rather than store a value and transaction id in the database, you instead store a value, transaction id, and the previous value in the database. Let's again use the example of storing a count in the database. Suppose the partial count for your batch is "2" and it's time to apply a state update. Suppose the value in the database looks like this:
-
-```
-{ value = 4,
-  prevValue = 1,
-  txid = 2
-}
-```
-
-Suppose your current txid is 3, different than what's in the database. In this case, you set "prevValue" equal to "value", increment "value" by your partial count, and update the txid. The new database value will look like this:
-
-```
-{ value = 6,
-  prevValue = 4,
-  txid = 3
-}
-```
-
-Now suppose your current txid is 2, equal to what's in the database. Now you know that the "value" in the database contains an update from a previous batch for your current txid, but that batch may have been different so you have to ignore it. What you do in this case is increment "prevValue" by your partial count to compute the new "value". You then set the value in the database to this:
-
-```
-{ value = 3,
-  prevValue = 1,
-  txid = 2
-}
-```
-
-This works because of the strong ordering of batches provided by Trident. Once Trident moves onto a new batch for state updates, it will never go back to a previous batch. And since opaque transactional spouts guarantee no overlap between batches – that each tuple is successfully processed by one batch – you can safely update based on the previous value.
-
-## Non-transactional spouts
-
-Non-transactional spouts don't provide any guarantees about what's in each batch. So it might have at-most-once processing, in which case tuples are not retried after failed batches. Or it might have at-least-once processing, where tuples can be processed successfully by multiple batches. There's no way to achieve exactly-once semantics for this kind of spout.
-
-## Summary of spout and state types
-
-This diagram shows which combinations of spouts / states enable exactly-once messaging semantics:
-
-![Spouts vs States](images/spout-vs-state.png)
-
-Opaque transactional states have the strongest fault-tolerance, but this comes at the cost of needing to store the txid and two values in the database. Transactional states require less state in the database, but only work with transactional spouts. Finally, non-transactional states require the least state in the database but cannot achieve exactly-once semantics.
-
-The state and spout types you choose are a tradeoff between fault-tolerance and storage costs, and ultimately your application requirements will determine which combination is right for you.
-
-## State APIs
-
-You've seen the intricacies of what it takes to achieve exactly-once semantics. The nice thing about Trident is that it internalizes all the fault-tolerance logic within the State – as a user you don't have to deal with comparing txids, storing multiple values in the database, or anything like that. You can write code like this:
-
-```java
-TridentTopology topology = new TridentTopology();        
-TridentState wordCounts =
-      topology.newStream("spout1", spout)
-        .each(new Fields("sentence"), new Split(), new Fields("word"))
-        .groupBy(new Fields("word"))
-        .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))                
-        .parallelismHint(6);
-```
-
-All the logic necessary to manage opaque transactional state logic is internalized in the MemcachedState.opaque call. Additionally, updates are automatically batched to minimize roundtrips to the database.
-
-The base State interface just has two methods:
-
-```java
-public interface State {
-    void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream
-    void commit(Long txid);
-}
-```
-
-You're told when a state update is beginning, when a state update is ending, and you're given the txid in each case. Trident assumes nothing about how your state works, what kind of methods there are to update it, and what kind of methods there are to read from it.
-
-Suppose you have a home-grown database that contains user location information and you want to be able to access it from Trident. Your State implementation would have methods for getting and setting user information:
-
-```java
-public class LocationDB implements State {
-    public void beginCommit(Long txid) {    
-    }
-    
-    public void commit(Long txid) {    
-    }
-    
-    public void setLocation(long userId, String location) {
-      // code to access database and set location
-    }
-    
-    public String getLocation(long userId) {
-      // code to get location from database
-    }
-}
-```
-
-You then provide Trident a StateFactory that can create instances of your State object within Trident tasks. The StateFactory for your LocationDB might look something like this:
-
-```java
-public class LocationDBFactory implements StateFactory {
-   public State makeState(Map conf, int partitionIndex, int numPartitions) {
-      return new LocationDB();
-   } 
-}
-```
-
-Trident provides the QueryFunction interface for writing Trident operations that query a source of state, and the StateUpdater interface for writing Trident operations that update a source of state. For example, let's write an operation "QueryLocation" that queries the LocationDB for the locations of users. Let's start off with how you would use it in a topology. Let's say this topology consumes an input stream of userids:
-
-```java
-TridentTopology topology = new TridentTopology();
-TridentState locations = topology.newStaticState(new LocationDBFactory());
-topology.newStream("myspout", spout)
-        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
-```
-
-Now let's take a look at what the implementation of QueryLocation would look like:
-
-```java
-public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
-    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
-        List<String> ret = new ArrayList();
-        for(TridentTuple input: inputs) {
-            ret.add(state.getLocation(input.getLong(0)));
-        }
-        return ret;
-    }
-
-    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
-        collector.emit(new Values(location));
-    }    
-}
-```
-
-QueryFunction's execute in two steps. First, Trident collects a batch of reads together and passes them to batchRetrieve. In this case, batchRetrieve will receive multiple user ids. batchRetrieve is expected to return a list of results that's the same size as the list of input tuples. The first element of the result list corresponds to the result for the first input tuple, the second is the result for the second input tuple, and so on.
-
-You can see that this code doesn't take advantage of the batching that Trident does, since it just queries the LocationDB one at a time. So a better way to write the LocationDB would be like this:
-
-```java
-public class LocationDB implements State {
-    public void beginCommit(Long txid) {    
-    }
-    
-    public void commit(Long txid) {    
-    }
-    
-    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
-      // set locations in bulk
-    }
-    
-    public List<String> bulkGetLocations(List<Long> userIds) {
-      // get locations in bulk
-    }
-}
-```
-
-Then, you can write the QueryLocation function like this:
-
-```java
-public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
-    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
-        List<Long> userIds = new ArrayList<Long>();
-        for(TridentTuple input: inputs) {
-            userIds.add(input.getLong(0));
-        }
-        return state.bulkGetLocations(userIds);
-    }
-
-    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
-        collector.emit(new Values(location));
-    }    
-}
-```
-
-This code will be much more efficient by reducing roundtrips to the database. 
-
-To update state, you make use of the StateUpdater interface. Here's a StateUpdater that updates a LocationDB with new location information:
-
-```java
-public class LocationUpdater extends BaseStateUpdater<LocationDB> {
-    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
-        List<Long> ids = new ArrayList<Long>();
-        List<String> locations = new ArrayList<String>();
-        for(TridentTuple t: tuples) {
-            ids.add(t.getLong(0));
-            locations.add(t.getString(1));
-        }
-        state.setLocationsBulk(ids, locations);
-    }
-}
-```
-
-Here's how you would use this operation in a Trident topology:
-
-```java
-TridentTopology topology = new TridentTopology();
-TridentState locations = 
-    topology.newStream("locations", locationsSpout)
-        .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
-```
-
-The partitionPersist operation updates a source of state. The StateUpdater receives the State and a batch of tuples with updates to that State. This code just grabs the userids and locations from the input tuples and does a bulk set into the State. 
-
-partitionPersist returns a TridentState object representing the location db being updated by the Trident topology. You could then use this state in stateQuery operations elsewhere in the topology. 
-
-You can also see that StateUpdaters are given a TridentCollector. Tuples emitted to this collector go to the "new values stream". In this case, there's nothing interesting to emit to that stream, but if you were doing something like updating counts in a database, you could emit the updated counts to that stream. You can then get access to the new values stream for further processing via the TridentState#newValuesStream method.
-
-## persistentAggregate
-
-Trident has another method for updating States called persistentAggregate. You've seen this used in the streaming word count example, shown again below:
-
-```java
-TridentTopology topology = new TridentTopology();        
-TridentState wordCounts =
-      topology.newStream("spout1", spout)
-        .each(new Fields("sentence"), new Split(), new Fields("word"))
-        .groupBy(new Fields("word"))
-        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
-```
-
-persistentAggregate is an additional abstraction built on top of partitionPersist that knows how to take a Trident aggregator and use it to apply updates to the source of state. In this case, since this is a grouped stream, Trident expects the state you provide to implement the "MapState" interface. The grouping fields will be the keys in the state, and the aggregation result will be the values in the state. The "MapState" interface looks like this:
-
-```java
-public interface MapState<T> extends State {
-    List<T> multiGet(List<List<Object>> keys);
-    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
-    void multiPut(List<List<Object>> keys, List<T> vals);
-}
-```
-
-When you do aggregations on non-grouped streams (a global aggregation), Trident expects your State object to implement the "Snapshottable" interface:
-
-```java
-public interface Snapshottable<T> extends State {
-    T get();
-    T update(ValueUpdater updater);
-    void set(T o);
-}
-```
-
-[MemoryMapState](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java) and [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java) each implement both of these interfaces.
-
-## Implementing Map States
-
-Trident makes it easy to implement MapState's, doing almost all the work for you. The OpaqueMap, TransactionalMap, and NonTransactionalMap classes implement all the logic for doing the respective fault-tolerance logic. You simply provide these classes with an IBackingMap implementation that knows how to do multiGets and multiPuts of the respective key/values. IBackingMap looks like this:
-
-```java
-public interface IBackingMap<T> {
-    List<T> multiGet(List<List<Object>> keys); 
-    void multiPut(List<List<Object>> keys, List<T> vals); 
-}
-```
-
-OpaqueMap's will call multiPut with [OpaqueValue](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/OpaqueValue.java)'s for the vals, TransactionalMap's will give [TransactionalValue](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/TransactionalValue.java)'s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.
-
-Trident also provides the [CachedMap](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/map/CachedMap.java) class to do automatic LRU caching of map key/vals.
-
-Finally, Trident provides the [SnapshottableMap](https://github.com/apache/storm/blob/master/storm-core/src/jvm/storm/trident/state/map/SnapshottableMap.java) class that turns a MapState into a Snapshottable object, by storing global aggregations into a fixed key.
-
-Take a look at the implementation of [MemcachedState](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java) to see how all these utilities can be put together to make a high performance MapState implementation. MemcachedState allows you to choose between opaque transactional, transactional, and non-transactional semantics.

http://git-wip-us.apache.org/repos/asf/storm/blob/93e0d028/docs/documentation/Trident-tutorial.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Trident-tutorial.md b/docs/documentation/Trident-tutorial.md
deleted file mode 100644
index 48fdd8d..0000000
--- a/docs/documentation/Trident-tutorial.md
+++ /dev/null
@@ -1,254 +0,0 @@
----
-title: Trident Tutorial
-layout: documentation
-documentation: true
----
-
-Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.
-
-## Illustrative example
-
-Let's look at an illustrative example of Trident. This example will do two things:
-
-1. Compute streaming word count from an input stream of sentences
-2. Implement queries to get the sum of the counts for a list of words
-
-For the purposes of illustration, this example will read an infinite stream of sentences from the following source:
-
-```java
-FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
-               new Values("the cow jumped over the moon"),
-               new Values("the man went to the store and bought some candy"),
-               new Values("four score and seven years ago"),
-               new Values("how many apples can you eat"));
-spout.setCycle(true);
-```
-
-This spout cycles through that set of sentences over and over to produce the sentence stream. Here's the code to do the streaming word count part of the computation:
-
-```java
-TridentTopology topology = new TridentTopology();        
-TridentState wordCounts =
-     topology.newStream("spout1", spout)
-       .each(new Fields("sentence"), new Split(), new Fields("word"))
-       .groupBy(new Fields("word"))
-       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
-       .parallelismHint(6);
-```
-
-Let's go through the code line by line. First a TridentTopology object is created, which exposes the interface for constructing Trident computations. TridentTopology has a method called newStream that creates a new stream of data in the topology reading from an input source. In this case, the input source is just the FixedBatchSpout defined from before. Input sources can also be queue brokers like Kestrel or Kafka. Trident keeps track of a small amount of state for each input source (metadata about what it has consumed) in Zookeeper, and the "spout1" string here specifies the node in Zookeeper where Trident should keep that metadata.
-
-Trident processes the stream as small batches of tuples. For example, the incoming stream of sentences might be divided into batches like so:
-
-![Batched stream](images/batched-stream.png)
-
-Generally the size of those small batches will be on the order of thousands or millions of tuples, depending on your incoming throughput.
-
-Trident provides a fully fledged batch processing API to process those small batches. The API is very similar to what you see in high level abstractions for Hadoop like Pig or Cascading: you can do group by's, joins, aggregations, run functions, run filters, and so on. Of course, processing each small batch in isolation isn't that interesting, so Trident provides functions for doing aggregations across batches and persistently storing those aggregations – whether in memory, in Memcached, in Cassandra, or some other store. Finally, Trident has first-class functions for querying sources of realtime state. That state could be updated by Trident (like in this example), or it could be an independent source of state.
-
-Back to the example, the spout emits a stream containing one field called "sentence". The next line of the topology definition applies the Split function to each tuple in the stream, taking the "sentence" field and splitting it into words. Each sentence tuple creates potentially many word tuples – for instance, the sentence "the cow jumped over the moon" creates six "word" tuples. Here's the definition of Split:
-
-```java
-public class Split extends BaseFunction {
-   public void execute(TridentTuple tuple, TridentCollector collector) {
-       String sentence = tuple.getString(0);
-       for(String word: sentence.split(" ")) {
-           collector.emit(new Values(word));                
-       }
-   }
-}
-```
-
-As you can see, it's really simple. It simply grabs the sentence, splits it on whitespace, and emits a tuple for each word.
-
-The rest of the topology computes word count and keeps the results persistently stored. First the stream is grouped by the "word" field. Then, each group is persistently aggregated using the Count aggregator. The persistentAggregate function knows how to store and update the results of the aggregation in a source of state. In this example, the word counts are kept in memory, but this can be trivially swapped to use Memcached, Cassandra, or any other persistent store. Swapping this topology to store counts in Memcached is as simple as replacing the persistentAggregate line with this (using [trident-memcached](https://github.com/nathanmarz/trident-memcached)), where the "serverLocations" is a list of host/ports for the Memcached cluster:
-
-```java
-.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
-MemcachedState.transactional()
-```
-
-The values stored by persistentAggregate represents the aggregation of all batches ever emitted by the stream.
-
-One of the cool things about Trident is that it has fully fault-tolerant, exactly-once processing semantics. This makes it easy to reason about your realtime processing. Trident persists state in a way so that if failures occur and retries are necessary, it won't perform multiple updates to the database for the same source data.
-
-The persistentAggregate method transforms a Stream into a TridentState object. In this case the TridentState object represents all the word counts. We will use this TridentState object to implement the distributed query portion of the computation.
-
-The next part of the topology implements a low latency distributed query on the word counts. The query takes as input a whitespace separated list of words and return the sum of the counts for those words. These queries are executed just like normal RPC calls, except they are parallelized in the background. Here's an example of how you might invoke one of these queries:
-
-```java
-DRPCClient client = new DRPCClient("drpc.server.location", 3772);
-System.out.println(client.execute("words", "cat dog the man");
-// prints the JSON-encoded result, e.g.: "[[5078]]"
-```
-
-As you can see, it looks just like a regular remote procedure call (RPC), except it's executing in parallel across a Storm cluster. The latency for small queries like this are typically around 10ms. More intense DRPC queries can take longer of course, although the latency largely depends on how many resources you have allocated for the computation.
-
-The implementation of the distributed query portion of the topology looks like this:
-
-```java
-topology.newDRPCStream("words")
-       .each(new Fields("args"), new Split(), new Fields("word"))
-       .groupBy(new Fields("word"))
-       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
-       .each(new Fields("count"), new FilterNull())
-       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
-```
-
-The same TridentTopology object is used to create the DRPC stream, and the function is named "words". The function name corresponds to the function name given in the first argument of execute when using a DRPCClient.
-
-Each DRPC request is treated as its own little batch processing job that takes as input a single tuple representing the request. The tuple contains one field called "args" that contains the argument provided by the client. In this case, the argument is a whitespace separated list of words.
-
-First, the Split function is used to split the arguments for the request into its constituent words. The stream is grouped by "word", and the stateQuery operator is used to query the TridentState object that the first part of the topology generated. stateQuery takes in a source of state – in this case, the word counts computed by the other portion of the topology – and a function for querying that state. In this case, the MapGet function is invoked, which gets the count for each word. Since the DRPC stream is grouped the exact same way as the TridentState was (by the "word" field), each word query is routed to the exact partition of the TridentState object that manages updates for that word.
-
-Next, words that didn't have a count are filtered out via the FilterNull filter and the counts are summed using the Sum aggregator to get the result. Then, Trident automatically sends the result back to the waiting client.
-
-Trident is intelligent about how it executes a topology to maximize performance. There's two interesting things happening automatically in this topology:
-
-1. Operations that read from or write to state (like persistentAggregate and stateQuery) automatically batch operations to that state. So if there's 20 updates that need to be made to the database for the current batch of processing, rather than do 20 read requests and 20 writes requests to the database, Trident will automatically batch up the reads and writes, doing only 1 read request and 1 write request (and in many cases, you can use caching in your State implementation to eliminate the read request). So you get the best of both words of convenience – being able to express your computation in terms of what should be done with each tuple – and performance.
-2. Trident aggregators are heavily optimized. Rather than transfer all tuples for a group to the same machine and then run the aggregator, Trident will do partial aggregations when possible before sending tuples over the network. For example, the Count aggregator computes the count on each partition, sends the partial count over the network, and then sums together all the partial counts to get the total count. This technique is similar to the use of combiners in MapReduce.
-
-Let's look at another example of Trident.
-
-## Reach
-
-The next example is a pure DRPC topology that computes the reach of a URL on demand. Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you need to fetch all the people who ever tweeted a URL, fetch all the followers of all those people, unique that set of followers, and that count that uniqued set. Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples. With Storm and Trident, you can parallelize the computation of each step across a cluster.
-
-This topology will read from two sources of state. One database maps URLs to a list of people who tweeted that URL. The other database maps a person to a list of followers for that person. The topology definition looks like this:
-
-```java
-TridentState urlToTweeters =
-       topology.newStaticState(getUrlToTweetersState());
-TridentState tweetersToFollowers =
-       topology.newStaticState(getTweeterToFollowersState());
-
-topology.newDRPCStream("reach")
-       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
-       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
-       .shuffle()
-       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
-       .parallelismHint(200)
-       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
-       .groupBy(new Fields("follower"))
-       .aggregate(new One(), new Fields("one"))
-       .parallelismHint(20)
-       .aggregate(new Count(), new Fields("reach"));
-```
-
-The topology creates TridentState objects representing each external database using the newStaticState method. These can then be queried in the topology. Like all sources of state, queries to these databases will be automatically batched for maximum efficiency.
-
-The topology definition is straightforward – it's just a simple batch processing job. First, the urlToTweeters database is queried to get the list of people who tweeted the URL for this request. That returns a list, so the ExpandList function is invoked to create a tuple for each tweeter.
-
-Next, the followers for each tweeter must be fetched. It's important that this step be parallelized, so shuffle is invoked to evenly distribute the tweeters among all workers for the topology. Then, the followers database is queried to get the list of followers for each tweeter. You can see that this portion of the topology is given a large parallelism since this is the most intense portion of the computation.
-
-Next, the set of followers is uniqued and counted. This is done in two steps. First a "group by" is done on the batch by "follower", running the "One" aggregator on each group. The "One" aggregator simply emits a single tuple containing the number one for each group. Then, the ones are summed together to get the unique count of the followers set. Here's the definition of the "One" aggregator:
-
-```java
-public class One implements CombinerAggregator<Integer> {
-   public Integer init(TridentTuple tuple) {
-       return 1;
-   }
-
-   public Integer combine(Integer val1, Integer val2) {
-       return 1;
-   }
-
-   public Integer zero() {
-       return 1;
-   }        
-}
-```
-
-This is a "combiner aggregator", which knows how to do partial aggregations before transferring tuples over the network to maximize efficiency. Sum is also defined as a combiner aggregator, so the global sum done at the end of the topology will be very efficient.
-
-Let's now look at Trident in more detail.
-
-## Fields and tuples
-
-The Trident data model is the TridentTuple which is a named list of values. During a topology, tuples are incrementally built up through a sequence of operations. Operations generally take in a set of input fields and emit a set of "function fields". The input fields are used to select a subset of the tuple as input to the operation, while the "function fields" name the fields the operation emits.
-
-Consider this example. Suppose you have a stream called "stream" that contains the fields "x", "y", and "z". To run a filter MyFilter that takes in "y" as input, you would say:
-
-```java
-stream.each(new Fields("y"), new MyFilter())
-```
-
-Suppose the implementation of MyFilter is this:
-
-```java
-public class MyFilter extends BaseFilter {
-   public boolean isKeep(TridentTuple tuple) {
-       return tuple.getInteger(0) < 10;
-   }
-}
-```
-
-This will keep all tuples whose "y" field is less than 10. The TridentTuple given as input to MyFilter will only contain the "y" field. Note that Trident is able to project a subset of a tuple extremely efficiently when selecting the input fields: the projection is essentially free.
-
-Let's now look at how "function fields" work. Suppose you had this function:
-
-```java
-public class AddAndMultiply extends BaseFunction {
-   public void execute(TridentTuple tuple, TridentCollector collector) {
-       int i1 = tuple.getInteger(0);
-       int i2 = tuple.getInteger(1);
-       collector.emit(new Values(i1 + i2, i1 * i2));
-   }
-}
-```
-
-This function takes two numbers as input and emits two new values: the addition of the numbers and the multiplication of the numbers. Suppose you had a stream with the fields "x", "y", and "z". You would use this function like this:
-
-```java
-stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
-```
-
-The output of functions is additive: the fields are added to the input tuple. So the output of this each call would contain tuples with the five fields "x", "y", "z", "added", and "multiplied". "added" corresponds to the first value emitted by AddAndMultiply, while "multiplied" corresponds to the second value.
-
-With aggregators, on the other hand, the function fields replace the input tuples. So if you had a stream containing the fields "val1" and "val2", and you did this:
-
-```java
-stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
-```
-
-The output stream would only contain a single tuple with a single field called "sum", representing the sum of all "val2" fields in that batch.
-
-With grouped streams, the output will contain the grouping fields followed by the fields emitted by the aggregator. For example:
-
-```java
-stream.groupBy(new Fields("val1"))
-     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
-```
-
-In this example, the output will contain the fields "val1" and "sum".
-
-## State
-
-A key problem to solve with realtime computation is how to manage state so that updates are idempotent in the face of failures and retries. It's impossible to eliminate failures, so when a node dies or something else goes wrong, batches need to be retried. The question is – how do you do state updates (whether external databases or state internal to the topology) so that it's like each message was only processed only once?
-
-This is a tricky problem, and can be illustrated with the following example. Suppose that you're doing a count aggregation of your stream and want to store the running count in a database. If you store only the count in the database and it's time to apply a state update for a batch, there's no way to know if you applied that state update before. The batch could have been attempted before, succeeded in updating the database, and then failed at a later step. Or the batch could have been attempted before and failed to update the database. You just don't know.
-
-Trident solves this problem by doing two things:
-
-1. Each batch is given a unique id called the "transaction id". If a batch is retried it will have the exact same transaction id.
-2. State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.
-
-With these two primitives, you can achieve exactly-once semantics with your state updates. Rather than store just the count in the database, what you can do instead is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they're the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they're different, you increment the count.
-
-Of course, you don't have to do this logic manually in your topologies. This logic is wrapped by the State abstraction and done automatically. Nor is your State object required to implement the transaction id trick: if you don't want to pay the cost of storing the transaction id in the database, you don't have to. In that case the State will have at-least-once-processing semantics in the case of failures (which may be fine for your application). You can read more about how to implement a State and the various fault-tolerance tradeoffs possible [in this doc](/documentation/Trident-state).
-
-A State is allowed to use whatever strategy it wants to store state. So it could store state in an external database or it could keep the state in-memory but backed by HDFS (like how HBase works). State's are not required to hold onto state forever. For example, you could have an in-memory State implementation that only keeps the last X hours of data available and drops anything older. Take a look at the implementation of the [Memcached integration](https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java) for an example State implementation.
-
-## Execution of Trident topologies
-
-Trident topologies compile down into as efficient of a Storm topology as possible. Tuples are only sent over the network when a repartitioning of the data is required, such as if you do a groupBy or a shuffle. So if you had this Trident topology:
-
-![Compiling Trident to Storm 1](images/trident-to-storm1.png)
-
-It would compile into Storm spouts/bolts like this:
-
-![Compiling Trident to Storm 2](images/trident-to-storm2.png)
-
-## Conclusion
-
-Trident makes realtime computation elegant. You've seen how high throughput stream processing, state manipulation, and low-latency querying can be seamlessly intermixed via Trident's API. Trident lets you express your realtime computations in a natural way while still getting maximal performance.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/93e0d028/docs/documentation/Troubleshooting.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Troubleshooting.md b/docs/documentation/Troubleshooting.md
deleted file mode 100644
index 17aceb9..0000000
--- a/docs/documentation/Troubleshooting.md
+++ /dev/null
@@ -1,182 +0,0 @@
----
-title: Troubleshooting
-layout: documentation
-documentation: true
----
-
-This page lists issues people have run into when using Storm along with their solutions.
-
-### Worker processes are crashing on startup with no stack trace
-
-Possible symptoms:
- 
- * Topologies work with one node, but workers crash with multiple nodes
-
-Solutions:
-
- * You may have a misconfigured subnet, where nodes can't locate other nodes based on their hostname. ZeroMQ sometimes crashes the process when it can't resolve a host. There are two solutions:
-  * Make a mapping from hostname to IP address in /etc/hosts
-  * Set up an internal DNS so that nodes can locate each other based on hostname.
-  
-### Nodes are unable to communicate with each other
-
-Possible symptoms:
-
- * Every spout tuple is failing
- * Processing is not working
-
-Solutions:
-
- * Storm doesn't work with ipv6. You can force ipv4 by adding `-Djava.net.preferIPv4Stack=true` to the supervisor child options and restarting the supervisor. 
- * You may have a misconfigured subnet. See the solutions for `Worker processes are crashing on startup with no stack trace`
-
-### Topology stops processing tuples after awhile
-
-Symptoms:
-
- * Processing works fine for awhile, and then suddenly stops and spout tuples start failing en masse. 
- 
-Solutions:
-
- * This is a known issue with ZeroMQ 2.1.10. Downgrade to ZeroMQ 2.1.7.
- 
-### Not all supervisors appear in Storm UI
-
-Symptoms:
- 
- * Some supervisor processes are missing from the Storm UI
- * List of supervisors in Storm UI changes on refreshes
-
-Solutions:
-
- * Make sure the supervisor local dirs are independent (e.g., not sharing a local dir over NFS)
- * Try deleting the local dirs for the supervisors and restarting the daemons. Supervisors create a unique id for themselves and store it locally. When that id is copied to other nodes, Storm gets confused. 
-
-### "Multiple defaults.yaml found" error
-
-Symptoms:
-
- * When deploying a topology with "storm jar", you get this error
-
-Solution:
-
- * You're most likely including the Storm jars inside your topology jar. When packaging your topology jar, don't include the Storm jars as Storm will put those on the classpath for you.
-
-### "NoSuchMethodError" when running storm jar
-
-Symptoms:
-
- * When running storm jar, you get a cryptic "NoSuchMethodError"
-
-Solution:
-
- * You're deploying your topology with a different version of Storm than you built your topology against. Make sure the storm client you use comes from the same version as the version you compiled your topology against.
-
-
-### Kryo ConcurrentModificationException
-
-Symptoms:
-
- * At runtime, you get a stack trace like the following:
-
-```
-java.lang.RuntimeException: java.util.ConcurrentModificationException
-	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
-	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
-	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
-	at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
-	at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
-	at clojure.lang.AFn.run(AFn.java:24)
-	at java.lang.Thread.run(Thread.java:679)
-Caused by: java.util.ConcurrentModificationException
-	at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:390)
-	at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:409)
-	at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:408)
-	at java.util.HashMap.writeObject(HashMap.java:1016)
-	at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
-	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-	at java.lang.reflect.Method.invoke(Method.java:616)
-	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:959)
-	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
-	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
-	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
-	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
-	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
-	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
-	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
-	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
-	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
-	at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
-```
-
-Solution: 
-
- * This means that you're emitting a mutable object as an output tuple. Everything you emit into the output collector must be immutable. What's happening is that your bolt is modifying the object while it is being serialized to be sent over the network.
-
-
-### NullPointerException from deep inside Storm
-
-Symptoms:
-
- * You get a NullPointerException that looks something like:
-
-```
-java.lang.RuntimeException: java.lang.NullPointerException
-    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
-    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
-    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
-    at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
-    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
-    at clojure.lang.AFn.run(AFn.java:24)
-    at java.lang.Thread.run(Thread.java:662)
-Caused by: java.lang.NullPointerException
-    at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
-    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126$fn__4130.invoke(worker.clj:99)
-    at backtype.storm.util$fast_list_map.invoke(util.clj:771)
-    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4126.invoke(worker.clj:99)
-    at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3904.invoke(executor.clj:205)
-    at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
-    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
-    ... 6 more
-```
-
-or 
-
-```
-java.lang.RuntimeException: java.lang.NullPointerException
-        at
-backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
-~[storm-core-0.9.3.jar:0.9.3]
-        at
-backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
-~[storm-core-0.9.3.jar:0.9.3]
-        at
-backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
-~[storm-core-0.9.3.jar:0.9.3]
-        at
-backtype.storm.disruptor$consume_loop_STAR_$fn__759.invoke(disruptor.clj:94)
-~[storm-core-0.9.3.jar:0.9.3]
-        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)
-~[storm-core-0.9.3.jar:0.9.3]
-        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
-        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
-Caused by: java.lang.NullPointerException: null
-        at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]
-        at
-backtype.storm.daemon.worker$mk_transfer_fn$fn__3548.invoke(worker.clj:129)
-~[storm-core-0.9.3.jar:0.9.3]
-        at
-backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3282.invoke(executor.clj:258)
-~[storm-core-0.9.3.jar:0.9.3]
-        at
-backtype.storm.disruptor$clojure_handler$reify__746.onEvent(disruptor.clj:58)
-~[storm-core-0.9.3.jar:0.9.3]
-        at
-backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
-~[storm-core-0.9.3.jar:0.9.3]
-        ... 6 common frames omitted
-```
-
-Solution:
-
- * This is caused by having multiple threads issue methods on the `OutputCollector`. All emits, acks, and fails must happen on the same thread. One subtle way this can happen is if you make a `IBasicBolt` that emits on a separate thread. `IBasicBolt`'s automatically ack after execute is called, so this would cause multiple threads to use the `OutputCollector` leading to this exception. When using a basic bolt, all emits must happen in the same thread that runs `execute`.
\ No newline at end of file