You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/22 16:30:57 UTC

[10/32] storm git commit: STORM-1617: Release Specific Documentation 0.9.x

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/FAQ.md
----------------------------------------------------------------------
diff --git a/docs/FAQ.md b/docs/FAQ.md
new file mode 100644
index 0000000..8ff7a6f
--- /dev/null
+++ b/docs/FAQ.md
@@ -0,0 +1,121 @@
+---
+layout: documentation
+---
+
+## Best Practices
+
+### What rules of thumb can you give me for configuring Storm+Trident?
+
+* number of workers a multiple of number of machines; parallelism a multiple of number of workers; number of kafka partitions a multiple of number of spout parallelism
+* Use one worker per topology per machine
+* Start with fewer, larger aggregators, one per machine with workers on it
+* Use the isolation scheduler
+* Use one acker per worker -- 0.9 makes that the default, but earlier versions do not.
+* enable GC logging; you should see very few major GCs if things are in reasonable shape.
+* set the trident batch millis to about 50% of your typical end-to-end latency.
+* Start with a max spout pending that is for sure too small -- one for trident, or the number of executors for storm -- and increase it until you stop seeing changes in the flow. You'll probably end up with something near `2*(throughput in recs/sec)*(end-to-end latency)` (2x the Little's law capacity).
+
+### What are some of the best ways to get a worker to mysteriously and bafflingly die?
+
+* Do you have write access to the log directory
+* Are you blowing out your heap?
+* Are all the right libraries installed on all of the workers?
+* Is the zookeeper hostname still set to localhost?
+* Did you supply a correct, unique hostname -- one that resolves back to the machine -- to each worker, and put it in the storm conf file?
+* Have you opened firewall/securitygroup permissions _bidirectionally_ among a) all the workers, b) the storm master, c) zookeeper? Also, from the workers to any kafka/kestrel/database/etc that your topology accesses? Use netcat to poke the appropriate ports and be sure. 
+
+### Halp! I cannot see:
+
+* **my logs** Logs by default go to $STORM_HOME/logs. Check that you have write permissions to that directory. They are configured in the logback/cluster.xml (0.9) and log4j/*.properties in earlier versions.
+* **final JVM settings** Add the `-XX+PrintFlagsFinal` commandline option in the childopts (see the conf file)
+* **final Java system properties** Add `Properties props = System.getProperties(); props.list(System.out);` near where you build your topology.
+
+### How many Workers should I use?
+
+The total number of workers is set by the supervisors -- there's some number of JVM slots each supervisor will superintend. The thing you set on the topology is how many worker slots it will try to claim.
+
+There's no great reason to use more than one worker per topology per machine.
+
+With one topology running on three 8-core nodes, and parallelism hint 24, each bolt gets 8 executors per machine, i.e. one for each core. There are three big benefits to running three workers (with 8 assigned executors each) compare to running say 24 workers (one assigned executor each).
+
+First, data that is repartitioned (shuffles or group-bys) to executors in the same worker will not have to hit the transfer buffer. Instead, tuples are deposited directly from send to receive buffer. That's a big win. By contrast, if the destination executor were on the same machine in a different worker, it would have to go send -> worker transfer -> local socket -> worker recv -> exec recv buffer. It doesn't hit the network card, but it's not as big a win as when executors are in the same worker.
+
+Second, you're typically better off with three aggregators having very large backing cache than having twenty-four aggregators having small backing caches. This reduces the effect of skew, and improves LRU efficiency.
+
+Lastly, fewer workers reduces control flow chatter.
+
+## Topology
+
+### Can a Trident topology have Multiple Streams?
+
+> Can a Trident Topology work like a workflow with conditional paths (if-else)? e.g. A Spout (S1) connects to a bolt (B0) which based on certain values in the incoming tuple routes them to either bolt (B1) or bolt (B2) but not both.
+
+A Trident "each" operator returns a Stream object, which you can store in a variable. You can then run multiple eaches on the same Stream to split it, e.g.: 
+
+        Stream s = topology.each(...).groupBy(...).aggregate(...) 
+        Stream branch1 = s.each(..., FilterA) 
+        Stream branch2 = s.each(..., FilterB) 
+
+You can join streams with join, merge or multiReduce.
+
+At time of writing, you can't emit to multiple output streams from Trident -- see [STORM-68](https://issues.apache.org/jira/browse/STORM-68)
+
+## Spouts
+
+### What is a coordinator, and why are there several?
+
+A trident-spout is actually run within a storm _bolt_. The storm-spout of a trident topology is the MasterBatchCoordinator -- it coordinates trident batches and is the same no matter what spouts you use. A batch is born when the MBC dispenses a seed tuple to each of the spout-coordinators. The spout-coordinator bolts know how your particular spouts should cooperate -- so in the kafka case, it's what helps figure out what partition and offset range each spout should pull from.
+
+### What can I store into the spout's metadata record?
+
+You should only store static data, and as little of it as possible, into the metadata record (note: maybe you _can_ store more interesting things; you shouldn't, though)
+
+### How often is the 'emitPartitionBatchNew' function called?
+
+Since the MBC is the actual spout, all the tuples in a batch are just members of its tupletree. That means storm's "max spout pending" config effectively defines the number of concurrent batches trident runs. The MBC emits a new batch if it has fewer than max-spending tuples pending and if at least one [trident batch interval](https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml#L115)'s worth of seconds has passed since the last batch.
+
+### If nothing was emitted does Trident slow down the calls?
+
+Yes, there's a pluggable "spout wait strategy"; the default is to sleep for a [configurable amount of time](https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml#L110)
+
+### OK, then what is the trident batch interval for?
+
+You know how computers of the 486 era had a [turbo button](http://en.wikipedia.org/wiki/Turbo_button) on them? It's like that. 
+
+Actually, it has two practical uses. One is to throttle spouts that poll a remote source without throttling processing. For example, we have a spout that looks in a given S3 bucket for a new batch-uploaded file to read, linebreak and emit. We don't want it hitting S3 more than every few seconds: files don't show up more than once every few minutes, and a batch takes a few seconds to process.
+
+The other is to limit overpressure on the internal queues during startup or under a heavy burst load -- if the spouts spring to life and suddenly jam ten batches' worth of records into the system, you could have a mass of less-urgent tuples from batch 7 clog up the transfer buffer and prevent the $commit tuple from batch 3 to get through (or even just the regular old tuples from batch 3). What we do is set the trident batch interval to about half the typical end-to-end processing latency -- if it takes 600ms to process a batch, it's OK to only kick off a batch every 300ms.
+
+Note that this is a cap, not an additional delay -- with a period of 300ms, if your batch takes 258ms Trident will only delay an additional 42ms.
+
+### How do you set the batch size?
+
+Trident doesn't place its own limits on the batch count. In the case of the Kafka spout, the max fetch bytes size divided by the average record size defines an effective records per subbatch partition.
+
+### How do I resize a batch?
+
+The trident batch is a somewhat overloaded facility. Together with the number of partitions, the batch size is constrained by or serves to define
+
+1. the unit of transactional safety (tuples at risk vs time)
+2. per partition, an effective windowing mechanism for windowed stream analytics
+3. per partition, the number of simultaneous queries that will be made by a partitionQuery, partitionPersist, etc;
+4. per partition, the number of records convenient for the spout to dispatch at the same time;
+
+You can't change the overall batch size once generated, but you can change the number of partitions -- do a shuffle and then change the parallelism hint
+
+## Time Series
+
+### How do I aggregate events by time?
+
+If have records with an immutable timestamp, and you would like to count, average or otherwise aggregate them into discrete time buckets, Trident is an excellent and scalable solution.
+
+Write an `Each` function that turns the timestamp into a time bucket: if the bucket size was "by hour", then the timestamp `2013-08-08 12:34:56` would be mapped to the `2013-08-08 12:00:00` time bucket, and so would everything else in the twelve o'clock hour. Then group on that timebucket and use a grouped persistentAggregate. The persistentAggregate uses a local cacheMap backed by a data store. Groups with many records require very few reads from the data store, and use efficient bulk reads and writes; as long as your data feed is relatively prompt Trident will make very efficient use of memory and network. Even if a server drops off line for a day, then delivers that full day's worth of data in a rush, the old results will be calmly retrieved and updated -- and without interfering with calculating the current results.
+
+### How can I know that all records for a time bucket have been received?
+
+You cannot know that all events are collected -- this is an epistemological challenge, not a distributed systems challenge. You can:
+
+* Set a time limit using domain knowledge
+* Introduce a _punctuation_: a record known to come after all records in the given time bucket. Trident uses this scheme to know when a batch is complete. If you for instance receive records from a set of sensors, each in order for that sensor, then once all sensors have sent you a 3:02:xx or later timestamp lets you know you can commit. 
+* When possible, make your process incremental: each value that comes in makes the answer more an more true. A Trident ReducerAggregator is an operator that takes a prior result and a set of new records and returns a new result. This lets the result be cached and serialized to a datastore; if a server drops off line for a day and then comes back with a full day's worth of data in a rush, the old results will be calmly retrieved and updated.
+* Lambda architecture: Record all events into an archival store (S3, HBase, HDFS) on receipt. in the fast layer, once the time window is clear, process the bucket to get an actionable answer, and ignore everything older than the time window. Periodically run a global aggregation to calculate a "correct" answer.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Fault-tolerance.md
----------------------------------------------------------------------
diff --git a/docs/Fault-tolerance.md b/docs/Fault-tolerance.md
new file mode 100644
index 0000000..9a7a349
--- /dev/null
+++ b/docs/Fault-tolerance.md
@@ -0,0 +1,28 @@
+---
+layout: documentation
+---
+This page explains the design details of Storm that make it a fault-tolerant system.
+
+## What happens when a worker dies?
+
+When a worker dies, the supervisor will restart it. If it continuously fails on startup and is unable to heartbeat to Nimbus, Nimbus will reassign the worker to another machine.
+
+## What happens when a node dies?
+
+The tasks assigned to that machine will time-out and Nimbus will reassign those tasks to other machines.
+
+## What happens when Nimbus or Supervisor daemons die?
+
+The Nimbus and Supervisor daemons are designed to be fail-fast (process self-destructs whenever any unexpected situation is encountered) and stateless (all state is kept in Zookeeper or on disk). As described in [Setting up a Storm cluster](Setting-up-a-Storm-cluster.html), the Nimbus and Supervisor daemons must be run under supervision using a tool like daemontools or monit. So if the Nimbus or Supervisor daemons die, they restart like nothing happened.
+
+Most notably, no worker processes are affected by the death of Nimbus or the Supervisors. This is in contrast to Hadoop, where if the JobTracker dies, all the running jobs are lost. 
+
+## Is Nimbus a single point of failure?
+
+If you lose the Nimbus node, the workers will still continue to function. Additionally, supervisors will continue to restart workers if they die. However, without Nimbus, workers won't be reassigned to other machines when necessary (like if you lose a worker machine). 
+
+So the answer is that Nimbus is "sort of" a SPOF. In practice, it's not a big deal since nothing catastrophic happens when the Nimbus daemon dies. There are plans to make Nimbus highly available in the future.
+
+## How does Storm guarantee data processing?
+
+Storm provides mechanisms to guarantee data processing even if nodes die or messages are lost. See [Guaranteeing message processing](Guaranteeing-message-processing.html) for the details.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Guaranteeing-message-processing.md
----------------------------------------------------------------------
diff --git a/docs/Guaranteeing-message-processing.md b/docs/Guaranteeing-message-processing.md
new file mode 100644
index 0000000..91d4384
--- /dev/null
+++ b/docs/Guaranteeing-message-processing.md
@@ -0,0 +1,179 @@
+---
+layout: documentation
+---
+Storm guarantees that each message coming off a spout will be fully processed. This page describes how Storm accomplishes this guarantee and what you have to do as a user to benefit from Storm's reliability capabilities.
+
+### What does it mean for a message to be "fully processed"?
+
+A tuple coming off a spout can trigger thousands of tuples to be created based on it. Consider, for example, the streaming word count topology:
+
+```java
+TopologyBuilder builder = new TopologyBuilder();
+builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
+                                               22133,
+                                               "sentence_queue",
+                                               new StringScheme()));
+builder.setBolt("split", new SplitSentence(), 10)
+        .shuffleGrouping("sentences");
+builder.setBolt("count", new WordCount(), 20)
+        .fieldsGrouping("split", new Fields("word"));
+```
+
+This topology reads sentences off of a Kestrel queue, splits the sentences into its constituent words, and then emits for each word the number of times it has seen that word before. A tuple coming off the spout triggers many tuples being created based on it: a tuple for each word in the sentence and a tuple for the updated count for each word. The tree of messages looks something like this:
+
+![Tuple tree](images/tuple_tree.png)
+
+Storm considers a tuple coming off a spout "fully processed" when the tuple tree has been exhausted and every message in the tree has been processed. A tuple is considered failed when its tree of messages fails to be fully processed within a specified timeout. This timeout can be configured on a topology-specific basis using the [Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS](javadocs/backtype/storm/Config.html#TOPOLOGY_MESSAGE_TIMEOUT_SECS) configuration and defaults to 30 seconds.
+
+### What happens if a message is fully processed or fails to be fully processed?
+
+To understand this question, let's take a look at the lifecycle of a tuple coming off of a spout. For reference, here is the interface that spouts implement (see the [Javadoc](javadocs/backtype/storm/spout/ISpout.html) for more information):
+
+```java
+public interface ISpout extends Serializable {
+    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
+    void close();
+    void nextTuple();
+    void ack(Object msgId);
+    void fail(Object msgId);
+}
+```
+
+First, Storm requests a tuple from the `Spout` by calling the `nextTuple` method on the `Spout`. The `Spout` uses the `SpoutOutputCollector` provided in the `open` method to emit a tuple to one of its output streams. When emitting a tuple, the `Spout` provides a "message id" that will be used to identify the tuple later. For example, the `KestrelSpout` reads a message off of the kestrel queue and emits as the "message id" the id provided by Kestrel for the message. Emitting a message to the `SpoutOutputCollector` looks like this:
+
+```java
+_collector.emit(new Values("field1", "field2", 3) , msgId);
+```
+
+Next, the tuple gets sent to consuming bolts and Storm takes care of tracking the tree of messages that is created. If Storm detects that a tuple is fully processed, Storm will call the `ack` method on the originating `Spout` task with the message id that the `Spout` provided to Storm. Likewise, if the tuple times-out Storm will call the `fail` method on the `Spout`. Note that a tuple will be acked or failed by the exact same `Spout` task that created it. So if a `Spout` is executing as many tasks across the cluster, a tuple won't be acked or failed by a different task than the one that created it.
+
+Let's use `KestrelSpout` again to see what a `Spout` needs to do to guarantee message processing. When `KestrelSpout` takes a message off the Kestrel queue, it "opens" the message. This means the message is not actually taken off the queue yet, but instead placed in a "pending" state waiting for acknowledgement that the message is completed. While in the pending state, a message will not be sent to other consumers of the queue. Additionally, if a client disconnects all pending messages for that client are put back on the queue. When a message is opened, Kestrel provides the client with the data for the message as well as a unique id for the message. The `KestrelSpout` uses that exact id as the "message id" for the tuple when emitting the tuple to the `SpoutOutputCollector`. Sometime later on, when `ack` or `fail` are called on the `KestrelSpout`, the `KestrelSpout` sends an ack or fail message to Kestrel with the message id to take the message off the queue or have it put back on.
+
+### What is Storm's reliability API?
+
+There's two things you have to do as a user to benefit from Storm's reliability capabilities. First, you need to tell Storm whenever you're creating a new link in the tree of tuples. Second, you need to tell Storm when you have finished processing an individual tuple. By doing both these things, Storm can detect when the tree of tuples is fully processed and can ack or fail the spout tuple appropriately. Storm's API provides a concise way of doing both of these tasks. 
+
+Specifying a link in the tuple tree is called _anchoring_. Anchoring is done at the same time you emit a new tuple. Let's use the following bolt as an example. This bolt splits a tuple containing a sentence into a tuple for each word:
+
+```java
+public class SplitSentence extends BaseRichBolt {
+        OutputCollector _collector;
+        
+        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+            _collector = collector;
+        }
+
+        public void execute(Tuple tuple) {
+            String sentence = tuple.getString(0);
+            for(String word: sentence.split(" ")) {
+                _collector.emit(tuple, new Values(word));
+            }
+            _collector.ack(tuple);
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }        
+    }
+```
+
+Each word tuple is _anchored_ by specifying the input tuple as the first argument to `emit`. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream. In contrast, let's look at what happens if the word tuple is emitted like this:
+
+```java
+_collector.emit(new Values(word));
+```
+
+Emitting the word tuple this way causes it to be _unanchored_. If the tuple fails be processed downstream, the root tuple will not be replayed. Depending on the fault-tolerance guarantees you need in your topology, sometimes it's appropriate to emit an unanchored tuple.
+
+An output tuple can be anchored to more than one input tuple. This is useful when doing streaming joins or aggregations. A multi-anchored tuple failing to be processed will cause multiple tuples to be replayed from the spouts. Multi-anchoring is done by specifying a list of tuples rather than just a single tuple. For example:
+
+```java
+List<Tuple> anchors = new ArrayList<Tuple>();
+anchors.add(tuple1);
+anchors.add(tuple2);
+_collector.emit(anchors, new Values(1, 2, 3));
+```
+
+Multi-anchoring adds the output tuple into multiple tuple trees. Note that it's also possible for multi-anchoring to break the tree structure and create tuple DAGs, like so:
+
+![Tuple DAG](images/tuple-dag.png)
+
+Storm's implementation works for DAGs as well as trees (pre-release it only worked for trees, and the name "tuple tree" stuck).
+
+Anchoring is how you specify the tuple tree -- the next and final piece to Storm's reliability API is specifying when you've finished processing an individual tuple in the tuple tree. This is done by using the `ack` and `fail` methods on the `OutputCollector`. If you look back at the `SplitSentence` example, you can see that the input tuple is acked after all the word tuples are emitted.
+
+You can use the `fail` method on the `OutputCollector` to immediately fail the spout tuple at the root of the tuple tree. For example, your application may choose to catch an exception from a database client and explicitly fail the input tuple. By failing the tuple explicitly, the spout tuple can be replayed faster than if you waited for the tuple to time-out.
+
+Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don't ack/fail every tuple, the task will eventually run out of memory. 
+
+A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the `execute` method. These bolts fall into the categories of filters and simple functions. Storm has an interface called `BasicBolt` that encapsulates this pattern for you. The `SplitSentence` example can be written as a `BasicBolt` like follows:
+
+```java
+public class SplitSentence extends BaseBasicBolt {
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String sentence = tuple.getString(0);
+            for(String word: sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }        
+    }
+```
+
+This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to `BasicOutputCollector` are automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.
+
+In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Aggregations and joins will commonly multi-anchor their output tuples as well. These things fall outside the simpler pattern of `IBasicBolt`.
+
+### How do I make my applications work correctly given that tuples can be replayed?
+
+As always in software design, the answer is "it depends." Storm 0.7.0 introduced the "transactional topologies" feature, which enables you to get fully fault-tolerant exactly-once messaging semantics for most computations. Read more about transactional topologies [here](Transactional-topologies.html). 
+
+
+### How does Storm implement reliability in an efficient way?
+
+A Storm topology has a set of special "acker" tasks that track the DAG of tuples for every spout tuple. When an acker sees that a DAG is complete, it sends a message to the spout task that created the spout tuple to ack the message. You can set the number of acker tasks for a topology in the topology configuration using [Config.TOPOLOGY_ACKERS](javadocs/backtype/storm/Config.html#TOPOLOGY_ACKERS). Storm defaults TOPOLOGY_ACKERS to one task -- you will need to increase this number for topologies processing large amounts of messages. 
+
+The best way to understand Storm's reliability implementation is to look at the lifecycle of tuples and tuple DAGs. When a tuple is created in a topology, whether in a spout or a bolt, it is given a random 64 bit id. These ids are used by ackers to track the tuple DAG for every spout tuple.
+
+Every tuple knows the ids of all the spout tuples for which it exists in their tuple trees. When you emit a new tuple in a bolt, the spout tuple ids from the tuple's anchors are copied into the new tuple. When a tuple is acked, it sends a message to the appropriate acker tasks with information about how the tuple tree changed. In particular it tells the acker "I am now completed within the tree for this spout tuple, and here are the new tuples in the tree that were anchored to me". 
+
+For example, if tuples "D" and "E" were created based on tuple "C", here's how the tuple tree changes when "C" is acked: 
+
+![What happens on an ack](images/ack_tree.png)
+
+Since "C" is removed from the tree at the same time that "D" and "E" are added to it, the tree can never be prematurely completed.
+
+There are a few more details to how Storm tracks tuple trees. As mentioned already, you can have an arbitrary number of acker tasks in a topology. This leads to the following question: when a tuple is acked in the topology, how does it know to which acker task to send that information? 
+
+Storm uses mod hashing to map a spout tuple id to an acker task. Since every tuple carries with it the spout tuple ids of all the trees they exist within, they know which acker tasks to communicate with. 
+
+Another detail of Storm is how the acker tasks track which spout tasks are responsible for each spout tuple they're tracking. When a spout task emits a new tuple, it simply sends a message to the appropriate acker telling it that its task id is responsible for that spout tuple. Then when an acker sees a tree has been completed, it knows to which task id to send the completion message.
+
+Acker tasks do not track the tree of tuples explicitly. For large tuple trees with tens of thousands of nodes (or more), tracking all the tuple trees could overwhelm the memory used by the ackers. Instead, the ackers take a different strategy that only requires a fixed amount of space per spout tuple (about 20 bytes). This tracking algorithm is the key to how Storm works and is one of its major breakthroughs.
+
+An acker task stores a map from a spout tuple id to a pair of values. The first value is the task id that created the spout tuple which is used later on to send completion messages. The second value is a 64 bit number called the "ack val". The ack val is a representation of the state of the entire tuple tree, no matter how big or how small.  It is simply the xor of all tuple ids that have been created and/or acked in the tree.
+
+When an acker task sees that an "ack val" has become 0, then it knows that the tuple tree is completed. Since tuple ids are random 64 bit numbers, the chances of an "ack val" accidentally becoming 0 is extremely small. If you work the math, at 10K acks per second, it will take 50,000,000 years until a mistake is made. And even then, it will only cause data loss if that tuple happens to fail in the topology.
+
+Now that you understand the reliability algorithm, let's go over all the failure cases and see how in each case Storm avoids data loss:
+
+- **A tuple isn't acked because the task died**: In this case the spout tuple ids at the root of the trees for the failed tuple will time out and be replayed.
+- **Acker task dies**: In this case all the spout tuples the acker was tracking will time out and be replayed.
+- **Spout task dies**: In this case the source that the spout talks to is responsible for replaying the messages. For example, queues like Kestrel and RabbitMQ will place all pending messages back on the queue when a client disconnects.
+
+As you have seen, Storm's reliability mechanisms are completely distributed, scalable, and fault-tolerant. 
+
+### Tuning reliability
+
+Acker tasks are lightweight, so you don't need very many of them in a topology. You can track their performance through the Storm UI (component id "__acker"). If the throughput doesn't look right, you'll need to add more acker tasks. 
+
+If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations -- then you can improve performance by not tracking the tuple tree for spout tuples. Not tracking a tuple tree halves the number of messages transferred since normally there's an ack message for every tuple in the tuple tree. Additionally, it requires fewer ids to be kept in each downstream tuple, reducing bandwidth usage.
+
+There are three ways to remove reliability. The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, Storm will call the `ack` method on the spout immediately after the spout emits a tuple. The tuple tree won't be tracked.
+
+The second way is to remove reliability on a message by message basis. You can turn off tracking for an individual spout tuple by omitting a message id in the `SpoutOutputCollector.emit` method.
+
+Finally, if you don't care if a particular subset of the tuples downstream in the topology fail to be processed, you can emit them as unanchored tuples. Since they're not anchored to any spout tuples, they won't cause any spout tuples to fail if they aren't acked.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Hooks.md
----------------------------------------------------------------------
diff --git a/docs/Hooks.md b/docs/Hooks.md
new file mode 100644
index 0000000..bbe87a9
--- /dev/null
+++ b/docs/Hooks.md
@@ -0,0 +1,7 @@
+---
+layout: documentation
+---
+Storm provides hooks with which you can insert custom code to run on any number of events within Storm. You create a hook by extending the [BaseTaskHook](javadocs/backtype/storm/hooks/BaseTaskHook.html) class and overriding the appropriate method for the event you want to catch. There are two ways to register your hook:
+
+1. In the open method of your spout or prepare method of your bolt using the [TopologyContext#addTaskHook](javadocs/backtype/storm/task/TopologyContext.html) method.
+2. Through the Storm configuration using the ["topology.auto.task.hooks"](javadocs/backtype/storm/Config.html#TOPOLOGY_AUTO_TASK_HOOKS) config. These hooks are automatically registered in every spout or bolt, and are useful for doing things like integrating with a custom monitoring system.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Implementation-docs.md
----------------------------------------------------------------------
diff --git a/docs/Implementation-docs.md b/docs/Implementation-docs.md
new file mode 100644
index 0000000..f01083a
--- /dev/null
+++ b/docs/Implementation-docs.md
@@ -0,0 +1,18 @@
+---
+layout: documentation
+---
+This section of the wiki is dedicated to explaining how Storm is implemented. You should have a good grasp of how to use Storm before reading these sections. 
+
+- [Structure of the codebase](Structure-of-the-codebase.html)
+- [Lifecycle of a topology](Lifecycle-of-a-topology.html)
+- [Message passing implementation](Message-passing-implementation.html)
+- [Acking framework implementation](Acking-framework-implementation.html)
+- [Metrics](Metrics.html)
+- How transactional topologies work
+   - subtopology for TransactionalSpout
+   - how state is stored in ZK
+   - subtleties around what to do when emitting batches out of order
+- Unit testing
+  - time simulation
+  - complete-topology
+  - tracker clusters

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Installing-native-dependencies.md
----------------------------------------------------------------------
diff --git a/docs/Installing-native-dependencies.md b/docs/Installing-native-dependencies.md
new file mode 100644
index 0000000..1937d4b
--- /dev/null
+++ b/docs/Installing-native-dependencies.md
@@ -0,0 +1,38 @@
+---
+layout: documentation
+---
+The native dependencies are only needed on actual Storm clusters. When running Storm in local mode, Storm uses a pure Java messaging system so that you don't need to install native dependencies on your development machine.
+
+Installing ZeroMQ and JZMQ is usually straightforward. Sometimes, however, people run into issues with autoconf and get strange errors. If you run into any issues, please email the [Storm mailing list](http://groups.google.com/group/storm-user) or come get help in the #storm-user room on freenode. 
+
+Storm has been tested with ZeroMQ 2.1.7, and this is the recommended ZeroMQ release that you install. You can download a ZeroMQ release [here](http://download.zeromq.org/). Installing ZeroMQ should look something like this:
+
+```
+wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
+tar -xzf zeromq-2.1.7.tar.gz
+cd zeromq-2.1.7
+./configure
+make
+sudo make install
+```
+
+JZMQ is the Java bindings for ZeroMQ. JZMQ doesn't have any releases (we're working with them on that), so there is risk of a regression if you always install from the master branch. To prevent a regression from happening, you should instead install from [this fork](http://github.com/nathanmarz/jzmq) which is tested to work with Storm. Installing JZMQ should look something like this:
+
+```
+#install jzmq
+git clone https://github.com/nathanmarz/jzmq.git
+cd jzmq
+./autogen.sh
+./configure
+make
+sudo make install
+```
+
+To get the JZMQ build to work, you may need to do one or all of the following:
+
+1. Set JAVA_HOME environment variable appropriately
+2. Install Java dev package (more info [here](http://codeslinger.posterous.com/getting-zeromq-and-jzmq-running-on-mac-os-x) for Mac OSX users)
+3. Upgrade autoconf on your machine
+4. Follow the instructions in [this blog post](http://blog.pmorelli.com/getting-zeromq-and-jzmq-running-on-mac-os-x)
+
+If you run into any errors when running `./configure`, [this thread](http://stackoverflow.com/questions/3522248/how-do-i-compile-jzmq-for-zeromq-on-osx) may provide a solution.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Kestrel-and-Storm.md
----------------------------------------------------------------------
diff --git a/docs/Kestrel-and-Storm.md b/docs/Kestrel-and-Storm.md
new file mode 100644
index 0000000..e16b0d9
--- /dev/null
+++ b/docs/Kestrel-and-Storm.md
@@ -0,0 +1,198 @@
+---
+layout: documentation
+---
+This page explains how to use to Storm to consume items from a Kestrel cluster.
+
+## Preliminaries
+### Storm
+This tutorial uses examples from the [storm-kestrel](https://github.com/nathanmarz/storm-kestrel) project and the [storm-starter](https://github.com/nathanmarz/storm-starter) project. It's recommended that you clone those projects and follow along with the examples. Read [Setting up development environment](https://github.com/apache/incubator-storm/wiki/Setting-up-development-environment) and [Creating a new Storm project](https://github.com/apache/incubator-storm/wiki/Creating-a-new-Storm-project) to get your machine set up.
+### Kestrel
+It assumes you are able to run locally a Kestrel server as described [here](https://github.com/nathanmarz/storm-kestrel).
+
+## Kestrel Server and Queue
+A single kestrel server has a set of queues. A Kestrel queue is a very simple message queue that runs on the JVM and uses the memcache protocol (with some extensions) to talk to clients. For details, look at the implementation of the [KestrelThriftClient](https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/backtype/storm/spout/KestrelThriftClient.java) class provided in [storm-kestrel](https://github.com/nathanmarz/storm-kestrel) project.
+
+Each queue is strictly ordered following the FIFO (first in, first out) principle. To keep up with performance items are cached in system memory; though, only the first 128MB is kept in memory. When stopping the server, the queue state is stored in a journal file.
+
+Further, details can be found [here](https://github.com/nathanmarz/kestrel/blob/master/docs/guide.md).
+
+Kestrel is:
+* fast
+* small
+* durable
+* reliable
+
+For instance, Twitter uses Kestrel as the backbone of its messaging infrastructure as described [here] (http://bhavin.directi.com/notes-on-kestrel-the-open-source-twitter-queue/).
+
+## Add items to Kestrel
+At first, we need to have a program that can add items to a Kestrel queue. The following method takes benefit of the KestrelClient implementation in [storm-kestrel](https://github.com/nathanmarz/storm-kestrel). It adds sentences into a Kestrel queue randomly chosen out of an array that holds five possible sentences.
+
+```
+    private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
+			throws ParseError, IOException {
+
+		String[] sentences = new String[] {
+	            "the cow jumped over the moon",
+	            "an apple a day keeps the doctor away",
+	            "four score and seven years ago",
+	            "snow white and the seven dwarfs",
+	            "i am at two with nature"};
+
+		Random _rand = new Random();
+
+		for(int i=1; i<=10; i++){
+
+			String sentence = sentences[_rand.nextInt(sentences.length)];
+
+			String val = "ID " + i + " " + sentence;
+
+			boolean queueSucess = kestrelClient.queue(queueName, val);
+
+			System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
+		}
+	}
+```
+
+## Remove items from Kestrel
+
+This method dequeues items from a queue without removing them.
+```
+    private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError
+			 {
+		for(int i=1; i<=12; i++){
+
+			Item item = kestrelClient.dequeue(queueName);
+
+			if(item==null){
+				System.out.println("The queue (" + queueName + ") contains no items.");
+			}
+			else
+			{
+				byte[] data = item._data;
+
+				String receivedVal = new String(data);
+
+				System.out.println("receivedItem=" + receivedVal);
+			}
+		}
+```
+
+This method dequeues items from a queue and then removes them.
+```
+    private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
+    throws IOException, ParseError
+		 {
+			for(int i=1; i<=12; i++){
+
+				Item item = kestrelClient.dequeue(queueName);
+
+
+				if(item==null){
+					System.out.println("The queue (" + queueName + ") contains no items.");
+				}
+				else
+				{
+					int itemID = item._id;
+
+
+					byte[] data = item._data;
+
+					String receivedVal = new String(data);
+
+					kestrelClient.ack(queueName, itemID);
+
+					System.out.println("receivedItem=" + receivedVal);
+				}
+			}
+	}
+```
+
+## Add Items continuously to Kestrel
+
+This is our final program to run in order to add continuously sentence items to a queue called **sentence_queue** of a locally running Kestrel server.
+
+In order to stop it type a closing bracket char ']' in console and hit 'Enter'.
+
+```
+    import java.io.IOException;
+    import java.io.InputStream;
+    import java.util.Random;
+
+    import backtype.storm.spout.KestrelClient;
+    import backtype.storm.spout.KestrelClient.Item;
+    import backtype.storm.spout.KestrelClient.ParseError;
+
+    public class AddSentenceItemsToKestrel {
+
+    	/**
+    	 * @param args
+    	 */
+    	public static void main(String[] args) {
+
+    		InputStream is = System.in;
+
+			char closing_bracket = ']';
+
+			int val = closing_bracket;
+
+			boolean aux = true;
+
+			try {
+
+				KestrelClient kestrelClient = null;
+				String queueName = "sentence_queue";
+
+				while(aux){
+
+					kestrelClient = new KestrelClient("localhost",22133);
+
+					queueSentenceItems(kestrelClient, queueName);
+
+					kestrelClient.close();
+
+					Thread.sleep(1000);
+
+					if(is.available()>0){
+					 if(val==is.read())
+						 aux=false;
+					}
+				}
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			catch (ParseError e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (InterruptedException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+
+			System.out.println("end");
+
+	    }
+	}
+```
+## Using KestrelSpout
+
+This topology reads sentences off of a Kestrel queue using KestrelSpout, splits the sentences into its constituent words (Bolt: SplitSentence), and then emits for each word the number of times it has seen that word before (Bolt: WordCount). How data is processed is described in detail in [Guaranteeing message processing](Guaranteeing-message-processing.html).
+
+```
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
+    builder.setBolt("split", new SplitSentence(), 10)
+    	        .shuffleGrouping("sentences");
+    builder.setBolt("count", new WordCount(), 20)
+	        .fieldsGrouping("split", new Fields("word"));
+```
+
+## Execution
+
+At first, start your local kestrel server in production or development mode.
+
+Than, wait about 5 seconds in order to avoid a ConnectException.
+
+Now execute the program to add items to the queue and launch the Storm topology. The order in which you launch the programs is of no importance.
+
+If you run the topology with TOPOLOGY_DEBUG you should see tuples being emitted in the topology.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Lifecycle-of-a-topology.md
----------------------------------------------------------------------
diff --git a/docs/Lifecycle-of-a-topology.md b/docs/Lifecycle-of-a-topology.md
new file mode 100644
index 0000000..4919be8
--- /dev/null
+++ b/docs/Lifecycle-of-a-topology.md
@@ -0,0 +1,80 @@
+---
+layout: documentation
+---
+(**NOTE**: this page is based on the 0.7.1 code; many things have changed since then, including a split between tasks and executors, and a reorganization of the code under `storm-core/src` rather than `src/`.)
+
+This page explains in detail the lifecycle of a topology from running the "storm jar" command to uploading the topology to Nimbus to the supervisors starting/stopping workers to workers and tasks setting themselves up. It also explains how Nimbus monitors topologies and how topologies are shutdown when they are killed.
+
+First a couple of important notes about topologies:
+
+1. The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit "acker" bolt added to manage the acking framework (used to guarantee data processing). The implicit topology is created via the [system-topology!](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/common.clj#L188) function.
+2. `system-topology!` is used in two places:
+  - when Nimbus is creating tasks for the topology [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L316)
+  - in the worker so it knows where it needs to route messages to [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L90)
+
+## Starting a topology
+
+- "storm jar" command executes your class with the specified arguments. The only special thing that "storm jar" does is set the "storm.jar" environment variable for use by `StormSubmitter` later. [code](https://github.com/apache/incubator-storm/blob/0.7.1/bin/storm#L101)
+- When your code uses `StormSubmitter.submitTopology`, `StormSubmitter` takes the following actions:
+  - First, `StormSubmitter` uploads the jar if it hasn't been uploaded before. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/jvm/backtype/storm/StormSubmitter.java#L83)
+    - Jar uploading is done via Nimbus's Thrift interface [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/storm.thrift#L200)
+    - `beginFileUpload` returns a path in Nimbus's inbox
+    - 15 kilobytes are uploaded at a time through `uploadChunk`
+    - `finishFileUpload` is called when it's finished uploading
+    - Here is Nimbus's implementation of those Thrift methods: [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L694)
+  - Second, `StormSubmitter` calls `submitTopology` on the Nimbus thrift interface [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/jvm/backtype/storm/StormSubmitter.java#L60)
+    - The topology config is serialized using JSON (JSON is used so that writing DSL's in any language is as easy as possible)
+    - Notice that the Thrift `submitTopology` call takes in the Nimbus inbox path where the jar was uploaded
+
+- Nimbus receives the topology submission. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L639)
+- Nimbus normalizes the topology configuration. The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L557) 
+- Nimbus sets up the static state for the topology [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L661)
+    - Jars and configs are kept on local filesystem because they're too big for Zookeeper. The jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}
+    - `setup-storm-static` writes task -> component mapping into ZK
+    - `setup-heartbeats` creates a ZK "directory" in which tasks can heartbeat
+- Nimbus calls `mk-assignment` to assign tasks to machines [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L458) 
+    - Assignment record definition is here: [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/common.clj#L25)
+    - Assignment contains:
+      - `master-code-dir`: used by supervisors to download the correct jars/configs for the topology from Nimbus
+      - `task->node+port`: Map from a task id to the worker that task should be running on. (A worker is identified by a node/port pair)
+      - `node->host`: A map from node id to hostname. This is used so workers know which machines to connect to to communicate with other workers. Node ids are used to identify supervisors so that multiple supervisors can be run on one machine. One place this is done is with Mesos integration.
+      - `task->start-time-secs`: Contains a map from task id to the timestamp at which Nimbus launched that task. This is used by Nimbus when monitoring topologies, as tasks are given a longer timeout to heartbeat when they're first launched (the launch timeout is configured by "nimbus.task.launch.secs" config)
+- Once topologies are assigned, they're initially in a deactivated mode. `start-storm` writes data into Zookeeper so that the cluster knows the topology is active and can start emitting tuples from spouts. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L504)
+
+- TODO cluster state diagram (show all nodes and what's kept everywhere)
+
+- Supervisor runs two functions in the background:
+    - `synchronize-supervisor`: This is called whenever assignments in Zookeeper change and also every 10 seconds. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L241)
+      - Downloads code from Nimbus for topologies assigned to this machine for which it doesn't have the code yet. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L258)
+      - Writes into local filesystem what this node is supposed to be running. It writes a map from port -> LocalAssignment. LocalAssignment contains a topology id as well as the list of task ids for that worker. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L13)
+    - `sync-processes`: Reads from the LFS what `synchronize-supervisor` wrote and compares that to what's actually running on the machine. It then starts/stops worker processes as necessary to synchronize. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/supervisor.clj#L177)
+    
+- Worker processes start up through the `mk-worker` function [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L67)
+  - Worker connects to other workers and starts a thread to monitor for changes. So if a worker gets reassigned, the worker will automatically reconnect to the other worker's new location. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L123)
+  - Monitors whether a topology is active or not and stores that state in the `storm-active-atom` variable. This variable is used by tasks to determine whether or not to call `nextTuple` on the spouts. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L155)
+  - The worker launches the actual tasks as threads within it [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L178)
+- Tasks are set up through the `mk-task` function [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L160)
+  - Tasks set up routing function which takes in a stream and an output tuple and returns a list of task ids to send the tuple to [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L207) (there's also a 3-arity version used for direct streams)
+  - Tasks set up the spout-specific or bolt-specific code with [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/task.clj#L241)
+   
+## Topology Monitoring
+
+- Nimbus monitors the topology during its lifetime
+   - Schedules recurring task on the timer thread to check the topologies [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L623)
+   - Nimbus's behavior is represented as a finite state machine [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L98)
+   - The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls `reassign-topology` through `reassign-transition` [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L497)
+   - `reassign-topology` calls `mk-assignments`, the same function used to assign the topology the first time. `mk-assignments` is also capable of incrementally updating a topology
+      - `mk-assignments` checks heartbeats and reassigns workers as necessary
+      - Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers
+      
+## Killing a topology
+
+- "storm kill" command runs this code which just calls the Nimbus Thrift interface to kill the topology: [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/command/kill_topology.clj)
+- Nimbus receives the kill command [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L671)
+- Nimbus applies the "kill" transition to the topology [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L676)
+- The kill transition function changes the status of the topology to "killed" and schedules the "remove" event to run "wait time seconds" in the future. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L63)
+   - The wait time defaults to the topology message timeout but can be overridden with the -w flag in the "storm kill" command
+   - This causes the topology to be deactivated for the wait time before its actually shut down. This gives the topology a chance to finish processing what it's currently processing before shutting down the workers
+   - Changing the status during the kill transition ensures that the kill protocol is fault-tolerant to Nimbus crashing. On startup, if the status of the topology is "killed", Nimbus schedules the remove event to run "wait time seconds" in the future [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L111)
+- Removing a topology cleans out the assignment and static information from ZK [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L116)
+- A separate cleanup thread runs the `do-cleanup` function which will clean up the heartbeat dir and the jars/configs stored locally. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/nimbus.clj#L577)

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Local-mode.md
----------------------------------------------------------------------
diff --git a/docs/Local-mode.md b/docs/Local-mode.md
new file mode 100644
index 0000000..1f98e36
--- /dev/null
+++ b/docs/Local-mode.md
@@ -0,0 +1,27 @@
+---
+layout: documentation
+---
+Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies [on a cluster](Running-topologies-on-a-production-cluster.html). 
+
+To create an in-process cluster, simply use the `LocalCluster` class. For example:
+
+```java
+import backtype.storm.LocalCluster;
+
+LocalCluster cluster = new LocalCluster();
+```
+
+You can then submit topologies using the `submitTopology` method on the `LocalCluster` object. Just like the corresponding method on [StormSubmitter](javadocs/backtype/storm/StormSubmitter.html), `submitTopology` takes a name, a topology configuration, and the topology object. You can then kill a topology using the `killTopology` method which takes the topology name as an argument.
+
+To shutdown a local cluster, simple call:
+
+```java
+cluster.shutdown();
+```
+
+### Common configurations for local mode
+
+You can see a full list of configurations [here](javadocs/backtype/storm/Config.html).
+
+1. **Config.TOPOLOGY_MAX_TASK_PARALLELISM**: This config puts a ceiling on the number of threads spawned for a single component. Oftentimes production topologies have a lot of parallelism (hundreds of threads) which places unreasonable load when trying to test the topology in local mode. This config lets you easy control that parallelism.
+2. **Config.TOPOLOGY_DEBUG**: When this is set to true, Storm will log a message every time a tuple is emitted from any spout or bolt. This is extremely useful for debugging.

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Maven.md
----------------------------------------------------------------------
diff --git a/docs/Maven.md b/docs/Maven.md
new file mode 100644
index 0000000..85828da
--- /dev/null
+++ b/docs/Maven.md
@@ -0,0 +1,56 @@
+---
+layout: documentation
+---
+To develop topologies, you'll need the Storm jars on your classpath. You should either include the unpacked jars in the classpath for your project or use Maven to include Storm as a development dependency. Storm is hosted on Clojars (a Maven repository). To include Storm in your project as a development dependency, add the following to your pom.xml:
+
+```xml
+<repository>
+  <id>clojars.org</id>
+  <url>http://clojars.org/repo</url>
+</repository>
+```
+
+```xml
+<dependency>
+  <groupId>storm</groupId>
+  <artifactId>storm</artifactId>
+  <version>0.7.2</version>
+  <scope>test</scope>
+</dependency>
+```
+
+[Here's an example](https://github.com/nathanmarz/storm-starter/blob/master/m2-pom.xml) of a pom.xml for a Storm project.
+
+If Maven isn't your thing, check out [leiningen](https://github.com/technomancy/leiningen). Leiningen is a build tool for Clojure, but it can be used for pure Java projects as well. Leiningen makes builds and dependency management using Maven dead-simple. Here's an example project.clj for a pure-Java Storm project:
+
+```clojure
+(defproject storm-starter "0.0.1-SNAPSHOT"
+  :java-source-path "src/jvm"
+  :javac-options {:debug "true" :fork "true"}
+  :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
+  :dependencies []
+  :dev-dependencies [
+                     [storm "0.7.2"]
+                     ])
+```
+
+You can fetch dependencies using `lein deps`, build the project with `lein compile`, and make a jar suitable for submitting to a cluster with `lein uberjar`.
+
+### Using Storm as a library
+
+If you want to use Storm as a library (e.g., use the Distributed RPC client) and have the Storm dependency jars be distributed with your application, there's a separate Maven dependency called "storm/storm-lib". The only difference between this dependency and the usual "storm/storm" is that storm-lib does not have any logging configured.
+
+### Developing Storm
+
+You will want to
+
+	bash ./bin/install_zmq.sh   # install the jzmq dependency
+	lein sub install
+
+Build javadocs with
+
+	bash ./bin/javadoc.sh
+
+### Building a Storm Release
+
+Use the file `bin/build_release.sh` to make a zipfile like the ones you would download (and like what the bin files require in order to run daemons).

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Message-passing-implementation.md
----------------------------------------------------------------------
diff --git a/docs/Message-passing-implementation.md b/docs/Message-passing-implementation.md
new file mode 100644
index 0000000..f22a5aa
--- /dev/null
+++ b/docs/Message-passing-implementation.md
@@ -0,0 +1,28 @@
+---
+layout: documentation
+---
+(Note: this walkthrough is out of date as of 0.8.0. 0.8.0 revamped the message passing infrastructure to be based on the Disruptor)
+
+This page walks through how emitting and transferring tuples works in Storm.
+
+- Worker is responsible for message transfer
+   - `refresh-connections` is called every "task.refresh.poll.secs" or whenever assignment in ZK changes. It manages connections to other workers and maintains a mapping from task -> worker [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L123)
+   - Provides a "transfer function" that is used by tasks to send tuples to other tasks. The transfer function takes in a task id and a tuple, and it serializes the tuple and puts it onto a "transfer queue". There is a single transfer queue for each worker. [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L56)
+   - The serializer is thread-safe [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/jvm/backtype/storm/serialization/KryoTupleSerializer.java#L26)
+   - The worker has a single thread which drains the transfer queue and sends the messages to other workers [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/daemon/worker.clj#L185)
+   - Message sending happens through this protocol: [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/messaging/protocol.clj)
+   - The implementation for distributed mode uses ZeroMQ [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/messaging/zmq.clj)
+   - The implementation for local mode uses in memory Java queues (so that it's easy to use Storm locally without needing to get ZeroMQ installed) [code](https://github.com/apache/incubator-storm/blob/0.7.1/src/clj/backtype/storm/messaging/local.clj)
+- Receiving messages in tasks works differently in local mode and distributed mode
+   - In local mode, the tuple is sent directly to an in-memory queue for the receiving task [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/messaging/local.clj#L21)
+   - In distributed mode, each worker listens on a single TCP port for incoming messages and then routes those messages in-memory to tasks. The TCP port is called a "virtual port", because it receives [task id, message] and then routes it to the actual task. [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/worker.clj#L204)
+      - The virtual port implementation is here: [code](https://github.com/apache/incubator-storm/blob/master/src/clj/zilch/virtual_port.clj)
+      - Tasks listen on an in-memory ZeroMQ port for messages from the virtual port [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L201)
+        - Bolts listen here: [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L489)
+        - Spouts listen here: [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L382)
+- Tasks are responsible for message routing. A tuple is emitted either to a direct stream (where the task id is specified) or a regular stream. In direct streams, the message is only sent if that bolt subscribes to that direct stream. In regular streams, the stream grouping functions are used to determine the task ids to send the tuple to.
+  - Tasks have a routing map from {stream id} -> {component id} -> {stream grouping function} [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L198)
+  - The "tasks-fn" returns the task ids to send the tuples to for either regular stream emit or direct stream emit [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L207)
+  - After getting the output task ids, bolts and spouts use the transfer-fn provided by the worker to actually transfer the tuples
+      - Bolt transfer code here: [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L429)
+      - Spout transfer code here: [code](https://github.com/apache/incubator-storm/blob/master/src/clj/backtype/storm/daemon/task.clj#L329)

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Metrics.md
----------------------------------------------------------------------
diff --git a/docs/Metrics.md b/docs/Metrics.md
new file mode 100644
index 0000000..f43f8c7
--- /dev/null
+++ b/docs/Metrics.md
@@ -0,0 +1,34 @@
+---
+layout: documentation
+---
+Storm exposes a metrics interface to report summary statistics across the full topology.
+It's used internally to track the numbers you see in the Nimbus UI console: counts of executes and acks; average process latency per bolt; worker heap usage; and so forth.
+
+### Metric Types
+
+Metrics have to implement just one method, `getValueAndReset` -- do any remaining work to find the summary value, and reset back to an initial state. For example, the MeanReducer divides the running total by its running count to find the mean, then initializes both values back to zero.
+
+Storm gives you these metric types:
+
+* [AssignableMetric]() -- set the metric to the explicit value you supply. Useful if it's an external value or in the case that you are already calculating the summary statistic yourself.
+* [CombinedMetric](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/CombinedMetric.java) -- generic interface for metrics that can be updated associatively. 
+* [CountMetric](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java) -- a running total of the supplied values. Call `incr()` to increment by one, `incrBy(n)` to add/subtract the given number.
+  - [MultiCountMetric](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java) -- a hashmap of count metrics.
+* [ReducedMetric](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/ReducedMetric.java)
+  - [MeanReducer](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java) -- track a running average of values given to its `reduce()` method. (It accepts `Double`, `Integer` or `Long` values, and maintains the internal average as a `Double`.) Despite his reputation, the MeanReducer is actually a pretty nice guy in person.
+  - [MultiReducedMetric](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java) -- a hashmap of reduced metrics.
+
+
+### Metric Consumer
+
+
+### Build your own metric
+
+
+
+### Builtin Metrics
+
+The [builtin metrics](https://github.com/apache/incubator-storm/blob/46c3ba7/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj) instrument Storm itself.
+
+[builtin_metrics.clj](https://github.com/apache/incubator-storm/blob/46c3ba7/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj) sets up data structures for the built-in metrics, and facade methods that the other framework components can use to update them. The metrics themselves are calculated in the calling code -- see for example [`ack-spout-msg`](https://github.com/apache/incubator-storm/blob/46c3ba7/storm-core/src/clj/backtype/storm/daemon/executor.clj#358)  in `clj/b/s/daemon/daemon/executor.clj`
+

http://git-wip-us.apache.org/repos/asf/storm/blob/abbfb973/docs/Multilang-protocol.md
----------------------------------------------------------------------
diff --git a/docs/Multilang-protocol.md b/docs/Multilang-protocol.md
new file mode 100644
index 0000000..a3cb22c
--- /dev/null
+++ b/docs/Multilang-protocol.md
@@ -0,0 +1,221 @@
+---
+layout: documentation
+---
+This page explains the multilang protocol as of Storm 0.7.1. Versions prior to 0.7.1 used a somewhat different protocol, documented [here](Storm-multi-language-protocol-(versions-0.7.0-and-below\).html).
+
+# Storm Multi-Language Protocol
+
+## Shell Components
+
+Support for multiple languages is implemented via the ShellBolt,
+ShellSpout, and ShellProcess classes.  These classes implement the
+IBolt and ISpout interfaces and the protocol for executing a script or
+program via the shell using Java's ProcessBuilder class.
+
+## Output fields
+
+Output fields are part of the Thrift definition of the topology. This means that when you multilang in Java, you need to create a bolt that extends ShellBolt, implements IRichBolt, and declare the fields in `declareOutputFields` (similarly for ShellSpout).
+
+You can learn more about this on [Concepts](Concepts.html)
+
+## Protocol Preamble
+
+A simple protocol is implemented via the STDIN and STDOUT of the
+executed script or program. All data exchanged with the process is
+encoded in JSON, making support possible for pretty much any language.
+
+# Packaging Your Stuff
+
+To run a shell component on a cluster, the scripts that are shelled
+out to must be in the `resources/` directory within the jar submitted
+to the master.
+
+However, during development or testing on a local machine, the resources
+directory just needs to be on the classpath.
+
+## The Protocol
+
+Notes:
+
+* Both ends of this protocol use a line-reading mechanism, so be sure to
+trim off newlines from the input and to append them to your output.
+* All JSON inputs and outputs are terminated by a single line containing "end". Note that this delimiter is not itself JSON encoded.
+* The bullet points below are written from the perspective of the script writer's
+STDIN and STDOUT.
+
+### Initial Handshake
+
+The initial handshake is the same for both types of shell components:
+
+* STDIN: Setup info. This is a JSON object with the Storm configuration, Topology context, and a PID directory, like this:
+
+```
+{
+    "conf": {
+        "topology.message.timeout.secs": 3,
+        // etc
+    },
+    "context": {
+        "task->component": {
+            "1": "example-spout",
+            "2": "__acker",
+            "3": "example-bolt"
+        },
+        "taskid": 3
+    },
+    "pidDir": "..."
+}
+```
+
+Your script should create an empty file named with its PID in this directory. e.g.
+the PID is 1234, so an empty file named 1234 is created in the directory. This
+file lets the supervisor know the PID so it can shutdown the process later on.
+
+* STDOUT: Your PID, in a JSON object, like `{"pid": 1234}`. The shell component will log the PID to its log.
+
+What happens next depends on the type of component:
+
+### Spouts
+
+Shell spouts are synchronous. The rest happens in a while(true) loop:
+
+* STDIN: Either a next, ack, or fail command.
+
+"next" is the equivalent of ISpout's `nextTuple`. It looks like:
+
+```
+{"command": "next"}
+```
+
+"ack" looks like:
+
+```
+{"command": "ack", "id": "1231231"}
+```
+
+"fail" looks like:
+
+```
+{"command": "fail", "id": "1231231"}
+```
+
+* STDOUT: The results of your spout for the previous command. This can
+  be a sequence of emits and logs.
+
+An emit looks like:
+
+```
+{
+	"command": "emit",
+	// The id for the tuple. Leave this out for an unreliable emit. The id can
+    // be a string or a number.
+	"id": "1231231",
+	// The id of the stream this tuple was emitted to. Leave this empty to emit to default stream.
+	"stream": "1",
+	// If doing an emit direct, indicate the task to send the tuple to
+	"task": 9,
+	// All the values in this tuple
+	"tuple": ["field1", 2, 3]
+}
+```
+
+If not doing an emit direct, you will immediately receive the task ids to which the tuple was emitted on STDIN as a JSON array.
+
+A "log" will log a message in the worker log. It looks like:
+
+```
+{
+	"command": "log",
+	// the message to log
+	"msg": "hello world!"
+}
+```
+
+* STDOUT: a "sync" command ends the sequence of emits and logs. It looks like:
+
+```
+{"command": "sync"}
+```
+
+After you sync, ShellSpout will not read your output until it sends another next, ack, or fail command.
+
+Note that, similarly to ISpout, all of the spouts in the worker will be locked up after a next, ack, or fail, until you sync. Also like ISpout, if you have no tuples to emit for a next, you should sleep for a small amount of time before syncing. ShellSpout will not automatically sleep for you.
+
+
+### Bolts
+
+The shell bolt protocol is asynchronous. You will receive tuples on STDIN as soon as they are available, and you may emit, ack, and fail, and log at any time by writing to STDOUT, as follows:
+
+* STDIN: A tuple! This is a JSON encoded structure like this:
+
+```
+{
+    // The tuple's id - this is a string to support languages lacking 64-bit precision
+	"id": "-6955786537413359385",
+	// The id of the component that created this tuple
+	"comp": "1",
+	// The id of the stream this tuple was emitted to
+	"stream": "1",
+	// The id of the task that created this tuple
+	"task": 9,
+	// All the values in this tuple
+	"tuple": ["snow white and the seven dwarfs", "field2", 3]
+}
+```
+
+* STDOUT: An ack, fail, emit, or log. Emits look like:
+
+```
+{
+	"command": "emit",
+	// The ids of the tuples this output tuples should be anchored to
+	"anchors": ["1231231", "-234234234"],
+	// The id of the stream this tuple was emitted to. Leave this empty to emit to default stream.
+	"stream": "1",
+	// If doing an emit direct, indicate the task to send the tuple to
+	"task": 9,
+	// All the values in this tuple
+	"tuple": ["field1", 2, 3]
+}
+```
+
+If not doing an emit direct, you will receive the task ids to which
+the tuple was emitted on STDIN as a JSON array. Note that, due to the
+asynchronous nature of the shell bolt protocol, when you read after
+emitting, you may not receive the task ids. You may instead read the
+task ids for a previous emit or a new tuple to process. You will
+receive the task id lists in the same order as their corresponding
+emits, however.
+
+An ack looks like:
+
+```
+{
+	"command": "ack",
+	// the id of the tuple to ack
+	"id": "123123"
+}
+```
+
+A fail looks like:
+
+```
+{
+	"command": "fail",
+	// the id of the tuple to fail
+	"id": "123123"
+}
+```
+
+A "log" will log a message in the worker log. It looks like:
+
+```
+{
+	"command": "log",
+	// the message to log
+	"msg": "hello world!"
+}
+```
+
+* Note that, as of version 0.7.1, there is no longer any need for a
+  shell bolt to 'sync'.