You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/22 23:04:15 UTC

[1/2] storm git commit: Merge branch 'wip-trident-javadoc' of https://github.com/ptgoetz/storm into STORM-1214

Repository: storm
Updated Branches:
  refs/heads/1.x-branch f9b106f91 -> 130510094


Merge branch 'wip-trident-javadoc' of https://github.com/ptgoetz/storm into STORM-1214

STORM-1214: add javadoc for Trident Streams and Operations


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3374a6c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3374a6c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3374a6c4

Branch: refs/heads/1.x-branch
Commit: 3374a6c4b570f28b4bea20353a418c811663fbb2
Parents: f9b106f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 22 15:47:49 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 22 15:49:11 2016 -0600

----------------------------------------------------------------------
 .../jvm/org/apache/storm/trident/Stream.java    | 156 +++++++++++++++++--
 .../storm/trident/operation/Assembly.java       |  20 +++
 .../storm/trident/operation/BaseOperation.java  |  15 ++
 .../apache/storm/trident/operation/Filter.java  |  24 +++
 .../storm/trident/operation/Function.java       |  68 ++++++++
 .../storm/trident/operation/Operation.java      |  20 +++
 .../trident/operation/TridentCollector.java     |  26 ++++
 .../storm/trident/operation/builtin/Debug.java  |   7 +
 .../trident/operation/builtin/FilterNull.java   |   4 +
 .../storm/trident/operation/builtin/FirstN.java |   5 +
 .../storm/trident/operation/builtin/Negate.java |  28 ++++
 11 files changed, 356 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index f9f6210..fb2497a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -57,6 +57,23 @@ import org.apache.storm.trident.state.StateSpec;
 import org.apache.storm.trident.state.StateUpdater;
 import org.apache.storm.trident.util.TridentUtils;
 
+/**
+ * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
+ * as a series of small batches. A stream is partitioned accross the nodes in the cluster, and operations are
+ * applied to a stream in parallel accross each partition.
+ *
+ * There are five types of operations that can be performed on streams in Trident
+ *
+ * 1. **Partiton-Local Operations** - Operations that are applied locally to each partition and do not involve network
+ * transfer
+ * 2. **Repartitioning Operations** - Operations that change how tuples are partitioned across tasks(thus causing
+ * network transfer), but do not change the content of the stream.
+ * 3. **Aggregation Operations** - Operations that *may* repartition a stream (thus causing network transfer)
+ * 4. **Grouping Operations** - Operations that may repartition a stream on specific fields and group together tuples whose
+ * fields values are equal.
+ * 5. **Merge and Join Operations** - Operations that combine different streams together.
+ *
+ */
 // TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
 public class Stream implements IAggregatableStream {
     Node _node;
@@ -68,61 +85,160 @@ public class Stream implements IAggregatableStream {
         _node = node;
         _name = name;
     }
-    
+
+    /**
+     * Applies a label to the stream. Naming a stream will append the label to the name of the bolt(s) created by
+     * Trident and will be visible in the Storm UI.
+     *
+     * @param name - The label to apply to the stream
+     * @return
+     */
     public Stream name(String name) {
         return new Stream(_topology, name, _node);
     }
-    
+
+    /**
+     * Applies a parallelism hint to a stream.
+     *
+     * @param hint
+     * @return
+     */
     public Stream parallelismHint(int hint) {
         _node.parallelismHint = hint;
         return this;
     }
-        
+
+    /**
+     * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
+     *
+     * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
+     *
+     * ```java
+     * mystream.project(new Fields("b", "d"))
+     * ```
+     *
+     * would produce a stream containing only the fields `["b", "d"]`.
+     *
+     *
+     * @param keepFields The fields in the Stream to keep
+     * @return
+     */
     public Stream project(Fields keepFields) {
         projectionValidation(keepFields);
         return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
     }
 
+    /**
+     * ## Grouping Operation
+     *
+     * @param fields
+     * @return
+     */
     public GroupedStream groupBy(Fields fields) {
         projectionValidation(fields);
         return new GroupedStream(this, fields);        
     }
-    
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * @param fields
+     * @return
+     */
     public Stream partitionBy(Fields fields) {
         projectionValidation(fields);
         return partition(Grouping.fields(fields.toList()));
     }
-    
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * @param partitioner
+     * @return
+     */
     public Stream partition(CustomStreamGrouping partitioner) {
         return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
     }
-    
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * Use random round robin algorithm to evenly redistribute tuples across all target partitions
+     *
+     * @return
+     */
     public Stream shuffle() {
         return partition(Grouping.shuffle(new NullStruct()));
     }
 
+    /**
+     * ## Repartitioning Operation
+     *
+     * Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
+     * for local tasks.
+     *
+     * @return
+     */
     public Stream localOrShuffle() {
         return partition(Grouping.local_or_shuffle(new NullStruct()));
     }
+
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
+     * @return
+     */
     public Stream global() {
         // use this instead of storm's built in one so that we can specify a singleemitbatchtopartition
         // without knowledge of storm's internals
         return partition(new GlobalGrouping());
     }
-    
+
+    /**
+     * ## Repartitioning Operation
+     *
+     *  All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
+     *  partitions.
+     *
+     * @return
+     */
     public Stream batchGlobal() {
         // the first field is the batch id
         return partition(new IndexHashGrouping(0));
     }
-        
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
+     * a stateQuery on every partition of data.
+     *
+     * @return
+     */
     public Stream broadcast() {
         return partition(Grouping.all(new NullStruct()));
     }
-    
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * @return
+     */
     public Stream identityPartition() {
         return partition(new IdentityGrouping());
     }
-    
+
+    /**
+     * ## Repartitioning Operation
+     *
+     * This method takes in a custom partitioning function that implements
+     * {@link org.apache.storm.grouping.CustomStreamGrouping}
+     *
+     * @param grouping
+     * @return
+     */
     public Stream partition(Grouping grouping) {
         if(_node instanceof PartitionNode) {
             return each(new Fields(), new TrueFilter()).partition(grouping);
@@ -130,22 +246,28 @@ public class Stream implements IAggregatableStream {
             return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));       
         }
     }
-    
+
+    /**
+     * Applies an `Assembly` to this `Stream`.
+     *
+     * @see org.apache.storm.trident.operation.Assembly
+     * @param assembly
+     * @return
+     */
     public Stream applyAssembly(Assembly assembly) {
         return assembly.apply(this);
     }
-    
+
     @Override
     public Stream each(Fields inputFields, Function function, Fields functionFields) {
         projectionValidation(inputFields);
         return _topology.addSourcedNode(this,
                 new ProcessorNode(_topology.getUniqueStreamId(),
-                    _name,
-                    TridentUtils.fieldsConcat(getOutputFields(), functionFields),
-                    functionFields,
-                    new EachProcessor(inputFields, function)));
+                        _name,
+                        TridentUtils.fieldsConcat(getOutputFields(), functionFields),
+                        functionFields,
+                        new EachProcessor(inputFields, function)));
     }
-
     //creates brand new tuples with brand new fields
     @Override
     public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
index 0d55804..c74a968 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
@@ -20,6 +20,26 @@ package org.apache.storm.trident.operation;
 import org.apache.storm.trident.Stream;
 
 
+/**
+ * The `Assembly` interface provides a means to encapsulate logic applied to a {@link org.apache.storm.trident.Stream}.
+ *
+ * Usage:
+ *
+ * ```java
+ * Stream mystream = ...;
+ * Stream assemblyStream = mystream.applyAssembly(myAssembly);
+ * ```
+ *
+ * @see org.apache.storm.trident.Stream
+ * @see org.apache.storm.trident.operation.builtin.FirstN
+ *
+ */
 public interface Assembly {
+    /**
+     * Applies the `Assembly` to a given {@link org.apache.storm.trident.Stream}
+     *
+     * @param input
+     * @return
+     */
     Stream apply(Stream input);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java b/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
index 4098e00..f1cf809 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
@@ -19,12 +19,27 @@ package org.apache.storm.trident.operation;
 
 import java.util.Map;
 
+/**
+ * Convenience implementation of the {@link org.apache.storm.trident.operation.Operation} interface.
+ *
+ * Provides no-op implementations of the `prepare()` and `cleanup()` methods.
+ */
 public class BaseOperation implements Operation {
 
+    /**
+     * No-op implementation.
+     * @param conf the Storm configuration map
+     * @param context the operation context which provides information such as the number of partitions in the stream,
+     *                and the current partition index. It also provides methods for registering operation-specific
+     *                metrics.
+     */
     @Override
     public void prepare(Map conf, TridentOperationContext context) {
     }
 
+    /**
+     * No-op implemnation.
+     */
     @Override
     public void cleanup() {
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
index d8ab95a..7673ca2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
@@ -19,6 +19,30 @@ package org.apache.storm.trident.operation;
 
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.Map;
+
+/**
+ * Filters take in a tuple as input and decide whether or not to keep that tuple or not.
+ *
+ * If the `isKeep()` method of a Filter returns `false` for a tuple, that tuple will be filtered out of the Stream
+ *
+ *
+ * ### Configuration
+ * If your `Filter` implementation has configuration requirements, you will typically want to extend
+ * {@link org.apache.storm.trident.operation.BaseFilter} and override the
+ * {@link org.apache.storm.trident.operation.Operation#prepare(Map, TridentOperationContext)} method to perform your custom
+ * initialization.
+
+ *
+ * @see org.apache.storm.trident.Stream
+ */
 public interface Filter extends EachOperation {
+
+    /**
+     * Determines if a tuple should be filtered out of a stream
+     *
+     * @param tuple the tuple being evaluated
+     * @return `false` to drop the tuple, `true` to keep the tuple
+     */
     boolean isKeep(TridentTuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
index 08b1680..b33a440 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
@@ -19,6 +19,74 @@ package org.apache.storm.trident.operation;
 
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.Map;
+
+/**
+ * A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple
+ * are appended to the original input tuple in the stream. If a function emits no tuples, the original input tuple is
+ * filtered out. Otherwise, the input tuple is duplicated for each output tuple.
+ *
+ * For example, if you have the following function:
+ *
+ * ```java
+ * public class MyFunction extends BaseFunction {
+ *      public void execute(TridentTuple tuple, TridentCollector collector) {
+ *      for(int i=0; i < tuple.getInteger(0); i++) {
+ *          collector.emit(new Values(i));
+ *      }
+ *    }
+ * }
+ *
+ * ```
+ *
+ * Now suppose you have a stream in the variable `mystream` with the fields `["a", "b", "c"]` with the following tuples:
+ *
+ * ```
+ * [1, 2, 3]
+ * [4, 1, 6]
+ * [3, 0, 8]
+ * ```
+ * If you had the following code in your topology definition:
+ *
+ * ```java
+ * mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
+ * ```
+ *
+ * The resulting tuples would have the fields `["a", "b", "c", "d"]` and look like this:
+ *
+ * ```
+ * [1, 2, 3, 0]
+ * [1, 2, 3, 1]
+ * [4, 1, 6, 0]
+ * ```
+ *
+ * In this case, the parameter `new Fields("b")` tells Trident that you would like to select the field "b" as input
+ * to the function, and that will be the only field in the Tuple passed to the `execute()` method. The value of "b" in
+ * the first tuple (2) causes the for loop to execute twice, so 2 tuples are emitted. similarly the second tuple causes
+ * one tuple to be emitted. For the third tuple, the value of 0 causes the `for` loop to be skipped, so nothing is
+ * emitted and the incoming tuple is filtered out of the stream.
+ *
+ * ### Configuration
+ * If your `Function` implementation has configuration requirements, you will typically want to extend
+ * {@link org.apache.storm.trident.operation.BaseFunction} and override the
+ * {@link org.apache.storm.trident.operation.Operation#prepare(Map, TridentOperationContext)} method to perform your custom
+ * initialization.
+ *
+ * ### Performance Considerations
+ * Because Trident Functions perform logic on individual tuples -- as opposed to batches -- it is advisable
+ * to avoid expensive operations such as database operations in a Function, if possible. For data store interactions
+ * it is better to use a {@link org.apache.storm.trident.state.State} or
+ * {@link org.apache.storm.trident.state.QueryFunction} implementation since Trident states operate on batch partitions
+ * and can perform bulk updates to a database.
+ *
+ *
+ */
 public interface Function extends EachOperation {
+    /**
+     * Performs the function logic on an individual tuple and emits 0 or more tuples.
+     *
+     * @param tuple The incoming tuple
+     * @param collector A collector instance that can be used to emit tuples
+     */
     void execute(TridentTuple tuple, TridentCollector collector);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
index bf3d4d0..878a5c2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
@@ -20,7 +20,27 @@ package org.apache.storm.trident.operation;
 import java.io.Serializable;
 import java.util.Map;
 
+/**
+ * Parent interface for Trident `Filter`s and `Function`s.
+ *
+ * `Operation` defines two lifecycle methods for Trident components. The `prepare()` method is called once when the
+ * `Operation` is first initialized. The `cleanup()` method is called in local mode when the local cluster is
+ * being shut down. In distributed mode, the `cleanup()` method is not guaranteed to be called in every situation, but
+ * Storm will make a best effort call `cleanup()` whenever possible.
+ */
 public interface Operation extends Serializable {
+    /**
+     * Called when the `Operation` is first initialized.
+     * @param conf the Storm configuration map
+     * @param context the operation context which provides information such as the number of partitions in the stream,
+     *                and the current partition index. It also provides methods for registering operation-specific
+     *                metrics.
+     * @see org.apache.storm.trident.operation.TridentOperationContext
+     */
     void prepare(Map conf, TridentOperationContext context);
+
+    /**
+     * When running in local mode, called when the local cluster is being shut down.
+     */
     void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java b/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
index bc02389..7392c9c 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
@@ -20,7 +20,33 @@ package org.apache.storm.trident.operation;
 import java.util.List;
 
 
+/**
+ * Interface for publishing tuples to a stream and reporting exceptions (to be displayed in Storm UI).
+ *
+ * Trident components that have the ability to emit tuples to a stream are passed an instance of this
+ * interface.
+ *
+ * For example, to emit a new tuple to a stream, you would do something like the following:
+ *
+ * ```java
+ *      collector.emit(new Values("a", "b", "c"));
+ * ```
+ * @see org.apache.storm.trident.Stream
+ * @see org.apache.storm.tuple.Values
+ */
 public interface TridentCollector {
+    /**
+     * Emits a tuple to a Stream
+     * @param values a list of values of which the tuple will be composed
+     */
     void emit(List<Object> values);
+
+    /**
+     * Reports an error. The corresponding stack trace will be visible in the Storm UI.
+     *
+     * Note that calling this method does not alter the processing of a batch. To explicitly fail a batch and trigger
+     * a replay, components should throw {@link org.apache.storm.topology.FailedException}.
+     * @param t The instance of the error (Throwable) being reported.
+     */
     void reportError(Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 91a3293..07c5ae4 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -20,6 +20,9 @@ package org.apache.storm.trident.operation.builtin;
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+/**
+ * Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`.
+ */
 public class Debug extends BaseFilter {
     private final String name;
 
@@ -27,6 +30,10 @@ public class Debug extends BaseFilter {
         name = "DEBUG: ";
     }
 
+    /**
+     * Creates a `Debug` filter with a string identifier.
+     * @param name
+     */
     public Debug(String name) {
         this.name = "DEBUG(" + name + "): ";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
index 4047936..5567e4c 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
@@ -20,6 +20,10 @@ package org.apache.storm.trident.operation.builtin;
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+/**
+ * Simple `Filter` implementation that filters out any tuples that have fields with a value of `null`.
+ *
+ */
 public class FilterNull extends BaseFilter {
     @Override
     public boolean isKeep(TridentTuple tuple) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
index dd719dc..5db6f9d 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
@@ -28,6 +28,11 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.tuple.TridentTuple;
 
 
+/**
+ *
+ * An {@link org.apache.storm.trident.operation.Assembly} implementation
+ *
+ */
 public class FirstN implements Assembly {
 
     Aggregator _agg;

http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
index 9269aee..32888ad 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
@@ -22,6 +22,34 @@ import org.apache.storm.trident.operation.Filter;
 import org.apache.storm.trident.operation.TridentOperationContext;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+/**
+ * A `Filter` implementation that inverts another delegate `Filter`.
+ *
+ * The `Negate.isKeep()` method simply returns the opposite of the delegate's `isKeep()` method:
+ *
+ * ```java
+ * public boolean isKeep(TridentTuple tuple) {
+ *      return !this.delegate.isKeep(tuple);
+ * }
+ * ```
+ *
+ * The `Negate` filter is useful for dividing a Stream in two based on some boolean condition.
+ *
+ * Suppose we had a Stream named `userStream` containing information about users, and a custom `Filter` implementation,
+ * `RegisteredUserFilter` that filtered out unregistered users. We could divide the `userStream` Stream into two
+ * separate Streams -- one for registered users, and one for unregistered users -- by doing the following:
+ *
+ * ```java
+ * Stream userStream = ...
+ *
+ * Filter registeredFilter = new ResisteredUserFilter();
+ * Filter unregisteredFilter = new Negate(registeredFilter);
+ *
+ * Stream registeredUserStream = userStream.each(userStream.getOutputFields(), registeredFilter);
+ * Stream unregisteredUserStream = userStream.each(userStream.getOutputFields(), unregisteredFilter);
+ * ```
+ *
+ */
 public class Negate implements Filter {
     
     Filter _delegate;


[2/2] storm git commit: Added STORM-1214 to Changelog

Posted by bo...@apache.org.
Added STORM-1214 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/13051009
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/13051009
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/13051009

Branch: refs/heads/1.x-branch
Commit: 1305100949583ee5b627bd7ec1a3706060b6fdd9
Parents: 3374a6c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 22 15:48:17 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 22 15:49:45 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/13051009/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e2db183..0ef5f44 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1214: add javadoc for Trident Streams and Operations
  * STORM-1450: Fix minor bugs and refactor code in ResourceAwareScheduler
  * STORM-1452: Fixes profiling/debugging out of the box
  * STORM-1406: Add MQTT Support