You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by na...@apache.org on 2014/05/25 19:47:13 UTC

svn commit: r1597454 [8/9] - in /incubator/storm/site: ./ publish/ publish/about/ publish/documentation/

Modified: incubator/storm/site/publish/documentation/Trident-tutorial.html
URL: http://svn.apache.org/viewvc/incubator/storm/site/publish/documentation/Trident-tutorial.html?rev=1597454&r1=1597453&r2=1597454&view=diff
==============================================================================
--- incubator/storm/site/publish/documentation/Trident-tutorial.html (original)
+++ incubator/storm/site/publish/documentation/Trident-tutorial.html Sun May 25 17:47:12 2014
@@ -65,42 +65,43 @@
   </ul>
 </div>
 <div id="aboutcontent">
-<h1>Trident tutorial</h1>
+<h1 id="trident-tutorial">Trident tutorial</h1>
 
-<p>Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.</p>
+<p>Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. If you&#8217;re familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar – Trident has joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.</p>
 
-<h2>Illustrative example</h2>
+<h2 id="illustrative-example">Illustrative example</h2>
 
-<p>Let's look at an illustrative example of Trident. This example will do two things:</p>
+<p>Let&#8217;s look at an illustrative example of Trident. This example will do two things:</p>
 
 <ol>
-<li>Compute streaming word count from an input stream of sentences</li>
-<li>Implement queries to get the sum of the counts for a list of words</li>
+  <li>Compute streaming word count from an input stream of sentences</li>
+  <li>Implement queries to get the sum of the counts for a list of words</li>
 </ol>
 
-
 <p>For the purposes of illustration, this example will read an infinite stream of sentences from the following source:</p>
 
-<pre><code class="java">FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
+<p><code>java
+FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
 spout.setCycle(true);
-</code></pre>
+</code></p>
 
-<p>This spout cycles through that set of sentences over and over to produce the sentence stream. Here's the code to do the streaming word count part of the computation:</p>
+<p>This spout cycles through that set of sentences over and over to produce the sentence stream. Here&#8217;s the code to do the streaming word count part of the computation:</p>
 
-<pre><code class="java">TridentTopology topology = new TridentTopology();        
+<p><code>java
+TridentTopology topology = new TridentTopology();        
 TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
        .parallelismHint(6);
-</code></pre>
+</code></p>
 
-<p>Let's go through the code line by line. First a TridentTopology object is created, which exposes the interface for constructing Trident computations. TridentTopology has a method called newStream that creates a new stream of data in the topology reading from an input source. In this case, the input source is just the FixedBatchSpout defined from before. Input sources can also be queue brokers like Kestrel or Kafka. Trident keeps track of a small amount of state for each input source (metadata about what it has consumed) in Zookeeper, and the "spout1" string here specifies the node in Zookeeper where Trident should keep that metadata.</p>
+<p>Let&#8217;s go through the code line by line. First a TridentTopology object is created, which exposes the interface for constructing Trident computations. TridentTopology has a method called newStream that creates a new stream of data in the topology reading from an input source. In this case, the input source is just the FixedBatchSpout defined from before. Input sources can also be queue brokers like Kestrel or Kafka. Trident keeps track of a small amount of state for each input source (metadata about what it has consumed) in Zookeeper, and the &#8220;spout1&#8221; string here specifies the node in Zookeeper where Trident should keep that metadata.</p>
 
 <p>Trident processes the stream as small batches of tuples. For example, the incoming stream of sentences might be divided into batches like so:</p>
 
@@ -108,11 +109,12 @@ TridentState wordCounts =
 
 <p>Generally the size of those small batches will be on the order of thousands or millions of tuples, depending on your incoming throughput.</p>
 
-<p>Trident provides a fully fledged batch processing API to process those small batches. The API is very similar to what you see in high level abstractions for Hadoop like Pig or Cascading: you can do group by's, joins, aggregations, run functions, run filters, and so on. Of course, processing each small batch in isolation isn't that interesting, so Trident provides functions for doing aggregations across batches and persistently storing those aggregations – whether in memory, in Memcached, in Cassandra, or some other store. Finally, Trident has first-class functions for querying sources of realtime state. That state could be updated by Trident (like in this example), or it could be an independent source of state.</p>
+<p>Trident provides a fully fledged batch processing API to process those small batches. The API is very similar to what you see in high level abstractions for Hadoop like Pig or Cascading: you can do group by&#8217;s, joins, aggregations, run functions, run filters, and so on. Of course, processing each small batch in isolation isn&#8217;t that interesting, so Trident provides functions for doing aggregations across batches and persistently storing those aggregations – whether in memory, in Memcached, in Cassandra, or some other store. Finally, Trident has first-class functions for querying sources of realtime state. That state could be updated by Trident (like in this example), or it could be an independent source of state.</p>
 
-<p>Back to the example, the spout emits a stream containing one field called "sentence". The next line of the topology definition applies the Split function to each tuple in the stream, taking the "sentence" field and splitting it into words. Each sentence tuple creates potentially many word tuples – for instance, the sentence "the cow jumped over the moon" creates six "word" tuples. Here's the definition of Split:</p>
+<p>Back to the example, the spout emits a stream containing one field called &#8220;sentence&#8221;. The next line of the topology definition applies the Split function to each tuple in the stream, taking the &#8220;sentence&#8221; field and splitting it into words. Each sentence tuple creates potentially many word tuples – for instance, the sentence &#8220;the cow jumped over the moon&#8221; creates six &#8220;word&#8221; tuples. Here&#8217;s the definition of Split:</p>
 
-<pre><code class="java">public class Split extends BaseFunction {
+<p><code>java
+public class Split extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
@@ -120,184 +122,193 @@ TridentState wordCounts =
        }
    }
 }
-</code></pre>
+</code></p>
 
-<p>As you can see, it's really simple. It simply grabs the sentence, splits it on whitespace, and emits a tuple for each word.</p>
+<p>As you can see, it&#8217;s really simple. It simply grabs the sentence, splits it on whitespace, and emits a tuple for each word.</p>
 
-<p>The rest of the topology computes word count and keeps the results persistently stored. First the stream is grouped by the "word" field. Then, each group is persistently aggregated using the Count aggregator. The persistentAggregate function knows how to store and update the results of the aggregation in a source of state. In this example, the word counts are kept in memory, but this can be trivially swapped to use Memcached, Cassandra, or any other persistent store. Swapping this topology to store counts in Memcached is as simple as replacing the persistentAggregate line with this (using <a href="https://github.com/nathanmarz/trident-memcached">trident-memcached</a>), where the "serverLocations" is a list of host/ports for the Memcached cluster:</p>
+<p>The rest of the topology computes word count and keeps the results persistently stored. First the stream is grouped by the &#8220;word&#8221; field. Then, each group is persistently aggregated using the Count aggregator. The persistentAggregate function knows how to store and update the results of the aggregation in a source of state. In this example, the word counts are kept in memory, but this can be trivially swapped to use Memcached, Cassandra, or any other persistent store. Swapping this topology to store counts in Memcached is as simple as replacing the persistentAggregate line with this (using <a href="https://github.com/nathanmarz/trident-memcached">trident-memcached</a>), where the &#8220;serverLocations&#8221; is a list of host/ports for the Memcached cluster:</p>
 
-<pre><code class="java">.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
+<p><code>java
+.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
 MemcachedState.transactional()
-</code></pre>
+</code></p>
 
 <p>The values stored by persistentAggregate represents the aggregation of all batches ever emitted by the stream.</p>
 
-<p>One of the cool things about Trident is that it has fully fault-tolerant, exactly-once processing semantics. This makes it easy to reason about your realtime processing. Trident persists state in a way so that if failures occur and retries are necessary, it won't perform multiple updates to the database for the same source data.</p>
+<p>One of the cool things about Trident is that it has fully fault-tolerant, exactly-once processing semantics. This makes it easy to reason about your realtime processing. Trident persists state in a way so that if failures occur and retries are necessary, it won&#8217;t perform multiple updates to the database for the same source data.</p>
 
 <p>The persistentAggregate method transforms a Stream into a TridentState object. In this case the TridentState object represents all the word counts. We will use this TridentState object to implement the distributed query portion of the computation.</p>
 
-<p>The next part of the topology implements a low latency distributed query on the word counts. The query takes as input a whitespace separated list of words and return the sum of the counts for those words. These queries are executed just like normal RPC calls, except they are parallelized in the background. Here's an example of how you might invoke one of these queries:</p>
+<p>The next part of the topology implements a low latency distributed query on the word counts. The query takes as input a whitespace separated list of words and return the sum of the counts for those words. These queries are executed just like normal RPC calls, except they are parallelized in the background. Here&#8217;s an example of how you might invoke one of these queries:</p>
 
-<pre><code class="java">DRPCClient client = new DRPCClient("drpc.server.location", 3772);
+<p><code>java
+DRPCClient client = new DRPCClient("drpc.server.location", 3772);
 System.out.println(client.execute("words", "cat dog the man");
 // prints the JSON-encoded result, e.g.: "[[5078]]"
-</code></pre>
+</code></p>
 
-<p>As you can see, it looks just like a regular remote procedure call (RPC), except it's executing in parallel across a Storm cluster. The latency for small queries like this are typically around 10ms. More intense DRPC queries can take longer of course, although the latency largely depends on how many resources you have allocated for the computation.</p>
+<p>As you can see, it looks just like a regular remote procedure call (RPC), except it&#8217;s executing in parallel across a Storm cluster. The latency for small queries like this are typically around 10ms. More intense DRPC queries can take longer of course, although the latency largely depends on how many resources you have allocated for the computation.</p>
 
 <p>The implementation of the distributed query portion of the topology looks like this:</p>
 
-<pre><code class="java">topology.newDRPCStream("words")
+<p><code>java
+topology.newDRPCStream("words")
        .each(new Fields("args"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
        .each(new Fields("count"), new FilterNull())
        .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
-</code></pre>
+</code></p>
 
-<p>The same TridentTopology object is used to create the DRPC stream, and the function is named "words". The function name corresponds to the function name given in the first argument of execute when using a DRPCClient.</p>
+<p>The same TridentTopology object is used to create the DRPC stream, and the function is named &#8220;words&#8221;. The function name corresponds to the function name given in the first argument of execute when using a DRPCClient.</p>
 
-<p>Each DRPC request is treated as its own little batch processing job that takes as input a single tuple representing the request. The tuple contains one field called "args" that contains the argument provided by the client. In this case, the argument is a whitespace separated list of words.</p>
+<p>Each DRPC request is treated as its own little batch processing job that takes as input a single tuple representing the request. The tuple contains one field called &#8220;args&#8221; that contains the argument provided by the client. In this case, the argument is a whitespace separated list of words.</p>
 
-<p>First, the Split function is used to split the arguments for the request into its constituent words. The stream is grouped by "word", and the stateQuery operator is used to query the TridentState object that the first part of the topology generated. stateQuery takes in a source of state – in this case, the word counts computed by the other portion of the topology – and a function for querying that state. In this case, the MapGet function is invoked, which gets the count for each word. Since the DRPC stream is grouped the exact same way as the TridentState was (by the "word" field), each word query is routed to the exact partition of the TridentState object that manages updates for that word.</p>
+<p>First, the Split function is used to split the arguments for the request into its constituent words. The stream is grouped by &#8220;word&#8221;, and the stateQuery operator is used to query the TridentState object that the first part of the topology generated. stateQuery takes in a source of state – in this case, the word counts computed by the other portion of the topology – and a function for querying that state. In this case, the MapGet function is invoked, which gets the count for each word. Since the DRPC stream is grouped the exact same way as the TridentState was (by the &#8220;word&#8221; field), each word query is routed to the exact partition of the TridentState object that manages updates for that word.</p>
 
-<p>Next, words that didn't have a count are filtered out via the FilterNull filter and the counts are summed using the Sum aggregator to get the result. Then, Trident automatically sends the result back to the waiting client.</p>
+<p>Next, words that didn&#8217;t have a count are filtered out via the FilterNull filter and the counts are summed using the Sum aggregator to get the result. Then, Trident automatically sends the result back to the waiting client.</p>
 
-<p>Trident is intelligent about how it executes a topology to maximize performance. There's two interesting things happening automatically in this topology:</p>
+<p>Trident is intelligent about how it executes a topology to maximize performance. There&#8217;s two interesting things happening automatically in this topology:</p>
 
 <ol>
-<li>Operations that read from or write to state (like persistentAggregate and stateQuery) automatically batch operations to that state. So if there's 20 updates that need to be made to the database for the current batch of processing, rather than do 20 read requests and 20 writes requests to the database, Trident will automatically batch up the reads and writes, doing only 1 read request and 1 write request (and in many cases, you can use caching in your State implementation to eliminate the read request). So you get the best of both words of convenience – being able to express your computation in terms of what should be done with each tuple – and performance.</li>
-<li>Trident aggregators are heavily optimized. Rather than transfer all tuples for a group to the same machine and then run the aggregator, Trident will do partial aggregations when possible before sending tuples over the network. For example, the Count aggregator computes the count on each partition, sends the partial count over the network, and then sums together all the partial counts to get the total count. This technique is similar to the use of combiners in MapReduce.</li>
+  <li>Operations that read from or write to state (like persistentAggregate and stateQuery) automatically batch operations to that state. So if there&#8217;s 20 updates that need to be made to the database for the current batch of processing, rather than do 20 read requests and 20 writes requests to the database, Trident will automatically batch up the reads and writes, doing only 1 read request and 1 write request (and in many cases, you can use caching in your State implementation to eliminate the read request). So you get the best of both words of convenience – being able to express your computation in terms of what should be done with each tuple – and performance.</li>
+  <li>Trident aggregators are heavily optimized. Rather than transfer all tuples for a group to the same machine and then run the aggregator, Trident will do partial aggregations when possible before sending tuples over the network. For example, the Count aggregator computes the count on each partition, sends the partial count over the network, and then sums together all the partial counts to get the total count. This technique is similar to the use of combiners in MapReduce.</li>
 </ol>
 
+<p>Let&#8217;s look at another example of Trident.</p>
 
-<p>Let's look at another example of Trident.</p>
-
-<h2>Reach</h2>
+<h2 id="reach">Reach</h2>
 
 <p>The next example is a pure DRPC topology that computes the reach of a URL on demand. Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you need to fetch all the people who ever tweeted a URL, fetch all the followers of all those people, unique that set of followers, and that count that uniqued set. Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples. With Storm and Trident, you can parallelize the computation of each step across a cluster.</p>
 
 <p>This topology will read from two sources of state. One database maps URLs to a list of people who tweeted that URL. The other database maps a person to a list of followers for that person. The topology definition looks like this:</p>
 
-<pre><code class="java">TridentState urlToTweeters =
+<p>```java
+TridentState urlToTweeters =
        topology.newStaticState(getUrlToTweetersState());
 TridentState tweetersToFollowers =
-       topology.newStaticState(getTweeterToFollowersState());
+       topology.newStaticState(getTweeterToFollowersState());</p>
 
-topology.newDRPCStream("reach")
-       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
-       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
+<p>topology.newDRPCStream(&#8220;reach&#8221;)
+       .stateQuery(urlToTweeters, new Fields(&#8220;args&#8221;), new MapGet(), new Fields(&#8220;tweeters&#8221;))
+       .each(new Fields(&#8220;tweeters&#8221;), new ExpandList(), new Fields(&#8220;tweeter&#8221;))
        .shuffle()
-       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
+       .stateQuery(tweetersToFollowers, new Fields(&#8220;tweeter&#8221;), new MapGet(), new Fields(&#8220;followers&#8221;))
        .parallelismHint(200)
-       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
-       .groupBy(new Fields("follower"))
-       .aggregate(new One(), new Fields("one"))
+       .each(new Fields(&#8220;followers&#8221;), new ExpandList(), new Fields(&#8220;follower&#8221;))
+       .groupBy(new Fields(&#8220;follower&#8221;))
+       .aggregate(new One(), new Fields(&#8220;one&#8221;))
        .parallelismHint(20)
-       .aggregate(new Count(), new Fields("reach"));
-</code></pre>
+       .aggregate(new Count(), new Fields(&#8220;reach&#8221;));
+```</p>
 
 <p>The topology creates TridentState objects representing each external database using the newStaticState method. These can then be queried in the topology. Like all sources of state, queries to these databases will be automatically batched for maximum efficiency.</p>
 
-<p>The topology definition is straightforward – it's just a simple batch processing job. First, the urlToTweeters database is queried to get the list of people who tweeted the URL for this request. That returns a list, so the ExpandList function is invoked to create a tuple for each tweeter.</p>
+<p>The topology definition is straightforward – it&#8217;s just a simple batch processing job. First, the urlToTweeters database is queried to get the list of people who tweeted the URL for this request. That returns a list, so the ExpandList function is invoked to create a tuple for each tweeter.</p>
 
-<p>Next, the followers for each tweeter must be fetched. It's important that this step be parallelized, so shuffle is invoked to evenly distribute the tweeters among all workers for the topology. Then, the followers database is queried to get the list of followers for each tweeter. You can see that this portion of the topology is given a large parallelism since this is the most intense portion of the computation.</p>
+<p>Next, the followers for each tweeter must be fetched. It&#8217;s important that this step be parallelized, so shuffle is invoked to evenly distribute the tweeters among all workers for the topology. Then, the followers database is queried to get the list of followers for each tweeter. You can see that this portion of the topology is given a large parallelism since this is the most intense portion of the computation.</p>
 
-<p>Next, the set of followers is uniqued and counted. This is done in two steps. First a "group by" is done on the batch by "follower", running the "One" aggregator on each group. The "One" aggregator simply emits a single tuple containing the number one for each group. Then, the ones are summed together to get the unique count of the followers set. Here's the definition of the "One" aggregator:</p>
+<p>Next, the set of followers is uniqued and counted. This is done in two steps. First a &#8220;group by&#8221; is done on the batch by &#8220;follower&#8221;, running the &#8220;One&#8221; aggregator on each group. The &#8220;One&#8221; aggregator simply emits a single tuple containing the number one for each group. Then, the ones are summed together to get the unique count of the followers set. Here&#8217;s the definition of the &#8220;One&#8221; aggregator:</p>
 
-<pre><code class="java">public class One implements CombinerAggregator&lt;Integer&gt; {
+<p>```java
+public class One implements CombinerAggregator<integer> {
    public Integer init(TridentTuple tuple) {
        return 1;
-   }
+   }</integer></p>
 
-   public Integer combine(Integer val1, Integer val2) {
+<p>public Integer combine(Integer val1, Integer val2) {
        return 1;
-   }
+   }</p>
 
-   public Integer zero() {
+<p>public Integer zero() {
        return 1;
-   }        
+   }      <br />
 }
-</code></pre>
+```</p>
 
-<p>This is a "combiner aggregator", which knows how to do partial aggregations before transferring tuples over the network to maximize efficiency. Sum is also defined as a combiner aggregator, so the global sum done at the end of the topology will be very efficient.</p>
+<p>This is a &#8220;combiner aggregator&#8221;, which knows how to do partial aggregations before transferring tuples over the network to maximize efficiency. Sum is also defined as a combiner aggregator, so the global sum done at the end of the topology will be very efficient.</p>
 
-<p>Let's now look at Trident in more detail.</p>
+<p>Let&#8217;s now look at Trident in more detail.</p>
 
-<h2>Fields and tuples</h2>
+<h2 id="fields-and-tuples">Fields and tuples</h2>
 
-<p>The Trident data model is the TridentTuple which is a named list of values. During a topology, tuples are incrementally built up through a sequence of operations. Operations generally take in a set of input fields and emit a set of "function fields". The input fields are used to select a subset of the tuple as input to the operation, while the "function fields" name the fields the operation emits.</p>
+<p>The Trident data model is the TridentTuple which is a named list of values. During a topology, tuples are incrementally built up through a sequence of operations. Operations generally take in a set of input fields and emit a set of &#8220;function fields&#8221;. The input fields are used to select a subset of the tuple as input to the operation, while the &#8220;function fields&#8221; name the fields the operation emits.</p>
 
-<p>Consider this example. Suppose you have a stream called "stream" that contains the fields "x", "y", and "z". To run a filter MyFilter that takes in "y" as input, you would say:</p>
+<p>Consider this example. Suppose you have a stream called &#8220;stream&#8221; that contains the fields &#8220;x&#8221;, &#8220;y&#8221;, and &#8220;z&#8221;. To run a filter MyFilter that takes in &#8220;y&#8221; as input, you would say:</p>
 
-<pre><code class="java">stream.each(new Fields("y"), new MyFilter())
-</code></pre>
+<p><code>java
+stream.each(new Fields("y"), new MyFilter())
+</code></p>
 
 <p>Suppose the implementation of MyFilter is this:</p>
 
-<pre><code class="java">public class MyFilter extends BaseFilter {
+<p><code>java
+public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) &lt; 10;
    }
 }
-</code></pre>
+</code></p>
 
-<p>This will keep all tuples whose "y" field is less than 10. The TridentTuple given as input to MyFilter will only contain the "y" field. Note that Trident is able to project a subset of a tuple extremely efficiently when selecting the input fields: the projection is essentially free.</p>
+<p>This will keep all tuples whose &#8220;y&#8221; field is less than 10. The TridentTuple given as input to MyFilter will only contain the &#8220;y&#8221; field. Note that Trident is able to project a subset of a tuple extremely efficiently when selecting the input fields: the projection is essentially free.</p>
 
-<p>Let's now look at how "function fields" work. Suppose you had this function:</p>
+<p>Let&#8217;s now look at how &#8220;function fields&#8221; work. Suppose you had this function:</p>
 
-<pre><code class="java">public class AddAndMultiply extends BaseFunction {
+<p><code>java
+public class AddAndMultiply extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        int i1 = tuple.getInteger(0);
        int i2 = tuple.getInteger(1);
        collector.emit(new Values(i1 + i2, i1 * i2));
    }
 }
-</code></pre>
+</code></p>
 
-<p>This function takes two numbers as input and emits two new values: the addition of the numbers and the multiplication of the numbers. Suppose you had a stream with the fields "x", "y", and "z". You would use this function like this:</p>
+<p>This function takes two numbers as input and emits two new values: the addition of the numbers and the multiplication of the numbers. Suppose you had a stream with the fields &#8220;x&#8221;, &#8220;y&#8221;, and &#8220;z&#8221;. You would use this function like this:</p>
 
-<pre><code class="java">stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
-</code></pre>
+<p><code>java
+stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
+</code></p>
 
-<p>The output of functions is additive: the fields are added to the input tuple. So the output of this each call would contain tuples with the five fields "x", "y", "z", "added", and "multiplied". "added" corresponds to the first value emitted by AddAndMultiply, while "multiplied" corresponds to the second value.</p>
+<p>The output of functions is additive: the fields are added to the input tuple. So the output of this each call would contain tuples with the five fields &#8220;x&#8221;, &#8220;y&#8221;, &#8220;z&#8221;, &#8220;added&#8221;, and &#8220;multiplied&#8221;. &#8220;added&#8221; corresponds to the first value emitted by AddAndMultiply, while &#8220;multiplied&#8221; corresponds to the second value.</p>
 
-<p>With aggregators, on the other hand, the function fields replace the input tuples. So if you had a stream containing the fields "val1" and "val2", and you did this:</p>
+<p>With aggregators, on the other hand, the function fields replace the input tuples. So if you had a stream containing the fields &#8220;val1&#8221; and &#8220;val2&#8221;, and you did this:</p>
 
-<pre><code class="java">stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
-</code></pre>
+<p><code>java
+stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
+</code></p>
 
-<p>The output stream would only contain a single tuple with a single field called "sum", representing the sum of all "val2" fields in that batch.</p>
+<p>The output stream would only contain a single tuple with a single field called &#8220;sum&#8221;, representing the sum of all &#8220;val2&#8221; fields in that batch.</p>
 
 <p>With grouped streams, the output will contain the grouping fields followed by the fields emitted by the aggregator. For example:</p>
 
-<pre><code class="java">stream.groupBy(new Fields("val1"))
+<p><code>java
+stream.groupBy(new Fields("val1"))
      .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
-</code></pre>
+</code></p>
 
-<p>In this example, the output will contain the fields "val1" and "sum".</p>
+<p>In this example, the output will contain the fields &#8220;val1&#8221; and &#8220;sum&#8221;.</p>
 
-<h2>State</h2>
+<h2 id="state">State</h2>
 
-<p>A key problem to solve with realtime computation is how to manage state so that updates are idempotent in the face of failures and retries. It's impossible to eliminate failures, so when a node dies or something else goes wrong, batches need to be retried. The question is – how do you do state updates (whether external databases or state internal to the topology) so that it's like each message was only processed only once?</p>
+<p>A key problem to solve with realtime computation is how to manage state so that updates are idempotent in the face of failures and retries. It&#8217;s impossible to eliminate failures, so when a node dies or something else goes wrong, batches need to be retried. The question is – how do you do state updates (whether external databases or state internal to the topology) so that it&#8217;s like each message was only processed only once?</p>
 
-<p>This is a tricky problem, and can be illustrated with the following example. Suppose that you're doing a count aggregation of your stream and want to store the running count in a database. If you store only the count in the database and it's time to apply a state update for a batch, there's no way to know if you applied that state update before. The batch could have been attempted before, succeeded in updating the database, and then failed at a later step. Or the batch could have been attempted before and failed to update the database. You just don't know.</p>
+<p>This is a tricky problem, and can be illustrated with the following example. Suppose that you&#8217;re doing a count aggregation of your stream and want to store the running count in a database. If you store only the count in the database and it&#8217;s time to apply a state update for a batch, there&#8217;s no way to know if you applied that state update before. The batch could have been attempted before, succeeded in updating the database, and then failed at a later step. Or the batch could have been attempted before and failed to update the database. You just don&#8217;t know.</p>
 
 <p>Trident solves this problem by doing two things:</p>
 
 <ol>
-<li>Each batch is given a unique id called the "transaction id". If a batch is retried it will have the exact same transaction id.</li>
-<li>State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.</li>
+  <li>Each batch is given a unique id called the &#8220;transaction id&#8221;. If a batch is retried it will have the exact same transaction id.</li>
+  <li>State updates are ordered among batches. That is, the state updates for batch 3 won&#8217;t be applied until the state updates for batch 2 have succeeded.</li>
 </ol>
 
+<p>With these two primitives, you can achieve exactly-once semantics with your state updates. Rather than store just the count in the database, what you can do instead is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they&#8217;re the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they&#8217;re different, you increment the count.</p>
 
-<p>With these two primitives, you can achieve exactly-once semantics with your state updates. Rather than store just the count in the database, what you can do instead is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they're the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they're different, you increment the count.</p>
-
-<p>Of course, you don't have to do this logic manually in your topologies. This logic is wrapped by the State abstraction and done automatically. Nor is your State object required to implement the transaction id trick: if you don't want to pay the cost of storing the transaction id in the database, you don't have to. In that case the State will have at-least-once-processing semantics in the case of failures (which may be fine for your application). You can read more about how to implement a State and the various fault-tolerance tradeoffs possible <a href="/documentation/Trident-state">in this doc</a>.</p>
+<p>Of course, you don&#8217;t have to do this logic manually in your topologies. This logic is wrapped by the State abstraction and done automatically. Nor is your State object required to implement the transaction id trick: if you don&#8217;t want to pay the cost of storing the transaction id in the database, you don&#8217;t have to. In that case the State will have at-least-once-processing semantics in the case of failures (which may be fine for your application). You can read more about how to implement a State and the various fault-tolerance tradeoffs possible <a href="/documentation/Trident-state">in this doc</a>.</p>
 
-<p>A State is allowed to use whatever strategy it wants to store state. So it could store state in an external database or it could keep the state in-memory but backed by HDFS (like how HBase works). State's are not required to hold onto state forever. For example, you could have an in-memory State implementation that only keeps the last X hours of data available and drops anything older. Take a look at the implementation of the <a href="https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java">Memcached integration</a> for an example State implementation.</p>
+<p>A State is allowed to use whatever strategy it wants to store state. So it could store state in an external database or it could keep the state in-memory but backed by HDFS (like how HBase works). State&#8217;s are not required to hold onto state forever. For example, you could have an in-memory State implementation that only keeps the last X hours of data available and drops anything older. Take a look at the implementation of the <a href="https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java">Memcached integration</a> for an example State implementation.</p>
 
-<h2>Execution of Trident topologies</h2>
+<h2 id="execution-of-trident-topologies">Execution of Trident topologies</h2>
 
 <p>Trident topologies compile down into as efficient of a Storm topology as possible. Tuples are only sent over the network when a repartitioning of the data is required, such as if you do a groupBy or a shuffle. So if you had this Trident topology:</p>
 
@@ -307,9 +318,9 @@ topology.newDRPCStream("reach")
 
 <p><img src="images/trident-to-storm2.png" alt="Compiling Trident to Storm 2" /></p>
 
-<h2>Conclusion</h2>
+<h2 id="conclusion">Conclusion</h2>
 
-<p>Trident makes realtime computation elegant. You've seen how high throughput stream processing, state manipulation, and low-latency querying can be seamlessly intermixed via Trident's API. Trident lets you express your realtime computations in a natural way while still getting maximal performance.</p>
+<p>Trident makes realtime computation elegant. You&#8217;ve seen how high throughput stream processing, state manipulation, and low-latency querying can be seamlessly intermixed via Trident&#8217;s API. Trident lets you express your realtime computations in a natural way while still getting maximal performance.</p>
 
 </div>
 </div>

Modified: incubator/storm/site/publish/documentation/Troubleshooting.html
URL: http://svn.apache.org/viewvc/incubator/storm/site/publish/documentation/Troubleshooting.html?rev=1597454&r1=1597453&r2=1597454&view=diff
==============================================================================
--- incubator/storm/site/publish/documentation/Troubleshooting.html (original)
+++ incubator/storm/site/publish/documentation/Troubleshooting.html Sun May 25 17:47:12 2014
@@ -65,167 +65,154 @@
   </ul>
 </div>
 <div id="aboutcontent">
-<h2>Troubleshooting</h2>
+<h2 id="troubleshooting">Troubleshooting</h2>
 
 <p>This page lists issues people have run into when using Storm along with their solutions.</p>
 
-<h3>Worker processes are crashing on startup with no stack trace</h3>
+<h3 id="worker-processes-are-crashing-on-startup-with-no-stack-trace">Worker processes are crashing on startup with no stack trace</h3>
 
 <p>Possible symptoms:</p>
 
 <ul>
-<li>Topologies work with one node, but workers crash with multiple nodes</li>
+  <li>Topologies work with one node, but workers crash with multiple nodes</li>
 </ul>
 
-
 <p>Solutions:</p>
 
 <ul>
-<li>You may have a misconfigured subnet, where nodes can't locate other nodes based on their hostname. ZeroMQ sometimes crashes the process when it can't resolve a host. There are two solutions:</li>
-<li>Make a mapping from hostname to IP address in /etc/hosts</li>
-<li>Set up an internal DNS so that nodes can locate each other based on hostname.</li>
+  <li>You may have a misconfigured subnet, where nodes can&#8217;t locate other nodes based on their hostname. ZeroMQ sometimes crashes the process when it can&#8217;t resolve a host. There are two solutions:</li>
+  <li>Make a mapping from hostname to IP address in /etc/hosts</li>
+  <li>Set up an internal DNS so that nodes can locate each other based on hostname.</li>
 </ul>
 
-
-<h3>Nodes are unable to communicate with each other</h3>
+<h3 id="nodes-are-unable-to-communicate-with-each-other">Nodes are unable to communicate with each other</h3>
 
 <p>Possible symptoms:</p>
 
 <ul>
-<li>Every spout tuple is failing</li>
-<li>Processing is not working</li>
+  <li>Every spout tuple is failing</li>
+  <li>Processing is not working</li>
 </ul>
 
-
 <p>Solutions:</p>
 
 <ul>
-<li>Storm doesn't work with ipv6. You can force ipv4 by adding <code>-Djava.net.preferIPv4Stack=true</code> to the supervisor child options and restarting the supervisor.</li>
-<li>You may have a misconfigured subnet. See the solutions for <code>Worker processes are crashing on startup with no stack trace</code></li>
+  <li>Storm doesn&#8217;t work with ipv6. You can force ipv4 by adding <code>-Djava.net.preferIPv4Stack=true</code> to the supervisor child options and restarting the supervisor. </li>
+  <li>You may have a misconfigured subnet. See the solutions for <code>Worker processes are crashing on startup with no stack trace</code></li>
 </ul>
 
-
-<h3>Topology stops processing tuples after awhile</h3>
+<h3 id="topology-stops-processing-tuples-after-awhile">Topology stops processing tuples after awhile</h3>
 
 <p>Symptoms:</p>
 
 <ul>
-<li>Processing works fine for awhile, and then suddenly stops and spout tuples start failing en masse.</li>
+  <li>Processing works fine for awhile, and then suddenly stops and spout tuples start failing en masse. </li>
 </ul>
 
-
 <p>Solutions:</p>
 
 <ul>
-<li>This is a known issue with ZeroMQ 2.1.10. Downgrade to ZeroMQ 2.1.7.</li>
+  <li>This is a known issue with ZeroMQ 2.1.10. Downgrade to ZeroMQ 2.1.7.</li>
 </ul>
 
-
-<h3>Not all supervisors appear in Storm UI</h3>
+<h3 id="not-all-supervisors-appear-in-storm-ui">Not all supervisors appear in Storm UI</h3>
 
 <p>Symptoms:</p>
 
 <ul>
-<li>Some supervisor processes are missing from the Storm UI</li>
-<li>List of supervisors in Storm UI changes on refreshes</li>
+  <li>Some supervisor processes are missing from the Storm UI</li>
+  <li>List of supervisors in Storm UI changes on refreshes</li>
 </ul>
 
-
 <p>Solutions:</p>
 
 <ul>
-<li>Make sure the supervisor local dirs are independent (e.g., not sharing a local dir over NFS)</li>
-<li>Try deleting the local dirs for the supervisors and restarting the daemons. Supervisors create a unique id for themselves and store it locally. When that id is copied to other nodes, Storm gets confused.</li>
+  <li>Make sure the supervisor local dirs are independent (e.g., not sharing a local dir over NFS)</li>
+  <li>Try deleting the local dirs for the supervisors and restarting the daemons. Supervisors create a unique id for themselves and store it locally. When that id is copied to other nodes, Storm gets confused. </li>
 </ul>
 
-
-<h3>"Multiple defaults.yaml found" error</h3>
+<h3 id="multiple-defaultsyaml-found-error">&#8220;Multiple defaults.yaml found&#8221; error</h3>
 
 <p>Symptoms:</p>
 
 <ul>
-<li>When deploying a topology with "storm jar", you get this error</li>
+  <li>When deploying a topology with &#8220;storm jar&#8221;, you get this error</li>
 </ul>
 
-
 <p>Solution:</p>
 
 <ul>
-<li>You're most likely including the Storm jars inside your topology jar. When packaging your topology jar, don't include the Storm jars as Storm will put those on the classpath for you.</li>
+  <li>You&#8217;re most likely including the Storm jars inside your topology jar. When packaging your topology jar, don&#8217;t include the Storm jars as Storm will put those on the classpath for you.</li>
 </ul>
 
-
-<h3>"NoSuchMethodError" when running storm jar</h3>
+<h3 id="nosuchmethoderror-when-running-storm-jar">&#8220;NoSuchMethodError&#8221; when running storm jar</h3>
 
 <p>Symptoms:</p>
 
 <ul>
-<li>When running storm jar, you get a cryptic "NoSuchMethodError"</li>
+  <li>When running storm jar, you get a cryptic &#8220;NoSuchMethodError&#8221;</li>
 </ul>
 
-
 <p>Solution:</p>
 
 <ul>
-<li>You're deploying your topology with a different version of Storm than you built your topology against. Make sure the storm client you use comes from the same version as the version you compiled your topology against.</li>
+  <li>You&#8217;re deploying your topology with a different version of Storm than you built your topology against. Make sure the storm client you use comes from the same version as the version you compiled your topology against.</li>
 </ul>
 
-
-<h3>Kryo ConcurrentModificationException</h3>
+<h3 id="kryo-concurrentmodificationexception">Kryo ConcurrentModificationException</h3>
 
 <p>Symptoms:</p>
 
 <ul>
-<li>At runtime, you get a stack trace like the following:</li>
+  <li>At runtime, you get a stack trace like the following:</li>
 </ul>
 
-
-<pre><code>java.lang.RuntimeException: java.util.ConcurrentModificationException
-    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
-    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
-    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
-    at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
-    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
-    at clojure.lang.AFn.run(AFn.java:24)
-    at java.lang.Thread.run(Thread.java:679)
+<p><code>
+java.lang.RuntimeException: java.util.ConcurrentModificationException
+	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
+	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
+	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
+	at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
+	at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
+	at clojure.lang.AFn.run(AFn.java:24)
+	at java.lang.Thread.run(Thread.java:679)
 Caused by: java.util.ConcurrentModificationException
-    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:390)
-    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:409)
-    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:408)
-    at java.util.HashMap.writeObject(HashMap.java:1016)
-    at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
-    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-    at java.lang.reflect.Method.invoke(Method.java:616)
-    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:959)
-    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
-    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
-    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
-    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
-    at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
-    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
-    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
-    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
-    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
-    at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
-</code></pre>
+	at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:390)
+	at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:409)
+	at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:408)
+	at java.util.HashMap.writeObject(HashMap.java:1016)
+	at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
+	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
+	at java.lang.reflect.Method.invoke(Method.java:616)
+	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:959)
+	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
+	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
+	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
+	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
+	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
+	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
+	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
+	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
+	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
+	at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
+</code></p>
 
-<p>Solution:</p>
+<p>Solution: </p>
 
 <ul>
-<li>This means that you're emitting a mutable object as an output tuple. Everything you emit into the output collector must be immutable. What's happening is that your bolt is modifying the object while it is being serialized to be sent over the network.</li>
+  <li>This means that you&#8217;re emitting a mutable object as an output tuple. Everything you emit into the output collector must be immutable. What&#8217;s happening is that your bolt is modifying the object while it is being serialized to be sent over the network.</li>
 </ul>
 
-
-<h3>NullPointerException from deep inside Storm</h3>
+<h3 id="nullpointerexception-from-deep-inside-storm">NullPointerException from deep inside Storm</h3>
 
 <p>Symptoms:</p>
 
 <ul>
-<li>You get a NullPointerException that looks something like:</li>
+  <li>You get a NullPointerException that looks something like:</li>
 </ul>
 
-
-<pre><code>java.lang.RuntimeException: java.lang.NullPointerException
+<p><code>
+java.lang.RuntimeException: java.lang.NullPointerException
     at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
     at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
     at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
@@ -242,15 +229,14 @@ Caused by: java.lang.NullPointerExceptio
     at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
     at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
     ... 6 more
-</code></pre>
+</code></p>
 
 <p>Solution:</p>
 
 <ul>
-<li>This is caused by having multiple threads issue methods on the <code>OutputCollector</code>. All emits, acks, and fails must happen on the same thread. One subtle way this can happen is if you make a <code>IBasicBolt</code> that emits on a separate thread. <code>IBasicBolt</code>'s automatically ack after execute is called, so this would cause multiple threads to use the <code>OutputCollector</code> leading to this exception. When using a basic bolt, all emits must happen in the same thread that runs <code>execute</code>.</li>
+  <li>This is caused by having multiple threads issue methods on the <code>OutputCollector</code>. All emits, acks, and fails must happen on the same thread. One subtle way this can happen is if you make a <code>IBasicBolt</code> that emits on a separate thread. <code>IBasicBolt</code>&#8217;s automatically ack after execute is called, so this would cause multiple threads to use the <code>OutputCollector</code> leading to this exception. When using a basic bolt, all emits must happen in the same thread that runs <code>execute</code>.</li>
 </ul>
 
-
 </div>
 </div>
 <div id="clear"></div></div>