You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/22 23:04:15 UTC
[1/2] storm git commit: Merge branch 'wip-trident-javadoc' of
https://github.com/ptgoetz/storm into STORM-1214
Repository: storm
Updated Branches:
refs/heads/1.x-branch f9b106f91 -> 130510094
Merge branch 'wip-trident-javadoc' of https://github.com/ptgoetz/storm into STORM-1214
STORM-1214: add javadoc for Trident Streams and Operations
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3374a6c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3374a6c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3374a6c4
Branch: refs/heads/1.x-branch
Commit: 3374a6c4b570f28b4bea20353a418c811663fbb2
Parents: f9b106f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 22 15:47:49 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 22 15:49:11 2016 -0600
----------------------------------------------------------------------
.../jvm/org/apache/storm/trident/Stream.java | 156 +++++++++++++++++--
.../storm/trident/operation/Assembly.java | 20 +++
.../storm/trident/operation/BaseOperation.java | 15 ++
.../apache/storm/trident/operation/Filter.java | 24 +++
.../storm/trident/operation/Function.java | 68 ++++++++
.../storm/trident/operation/Operation.java | 20 +++
.../trident/operation/TridentCollector.java | 26 ++++
.../storm/trident/operation/builtin/Debug.java | 7 +
.../trident/operation/builtin/FilterNull.java | 4 +
.../storm/trident/operation/builtin/FirstN.java | 5 +
.../storm/trident/operation/builtin/Negate.java | 28 ++++
11 files changed, 356 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index f9f6210..fb2497a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -57,6 +57,23 @@ import org.apache.storm.trident.state.StateSpec;
import org.apache.storm.trident.state.StateUpdater;
import org.apache.storm.trident.util.TridentUtils;
+/**
+ * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
+ * as a series of small batches. A stream is partitioned accross the nodes in the cluster, and operations are
+ * applied to a stream in parallel accross each partition.
+ *
+ * There are five types of operations that can be performed on streams in Trident
+ *
+ * 1. **Partiton-Local Operations** - Operations that are applied locally to each partition and do not involve network
+ * transfer
+ * 2. **Repartitioning Operations** - Operations that change how tuples are partitioned across tasks(thus causing
+ * network transfer), but do not change the content of the stream.
+ * 3. **Aggregation Operations** - Operations that *may* repartition a stream (thus causing network transfer)
+ * 4. **Grouping Operations** - Operations that may repartition a stream on specific fields and group together tuples whose
+ * fields values are equal.
+ * 5. **Merge and Join Operations** - Operations that combine different streams together.
+ *
+ */
// TODO: need to be able to replace existing fields with the function fields (like Cascading Fields.REPLACE)
public class Stream implements IAggregatableStream {
Node _node;
@@ -68,61 +85,160 @@ public class Stream implements IAggregatableStream {
_node = node;
_name = name;
}
-
+
+ /**
+ * Applies a label to the stream. Naming a stream will append the label to the name of the bolt(s) created by
+ * Trident and will be visible in the Storm UI.
+ *
+ * @param name - The label to apply to the stream
+ * @return
+ */
public Stream name(String name) {
return new Stream(_topology, name, _node);
}
-
+
+ /**
+ * Applies a parallelism hint to a stream.
+ *
+ * @param hint
+ * @return
+ */
public Stream parallelismHint(int hint) {
_node.parallelismHint = hint;
return this;
}
-
+
+ /**
+ * Filters out fields from a stream, resulting in a Stream containing only the fields specified by `keepFields`.
+ *
+ * For example, if you had a Stream `mystream` containing the fields `["a", "b", "c","d"]`, calling"
+ *
+ * ```java
+ * mystream.project(new Fields("b", "d"))
+ * ```
+ *
+ * would produce a stream containing only the fields `["b", "d"]`.
+ *
+ *
+ * @param keepFields The fields in the Stream to keep
+ * @return
+ */
public Stream project(Fields keepFields) {
projectionValidation(keepFields);
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
}
+ /**
+ * ## Grouping Operation
+ *
+ * @param fields
+ * @return
+ */
public GroupedStream groupBy(Fields fields) {
projectionValidation(fields);
return new GroupedStream(this, fields);
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * @param fields
+ * @return
+ */
public Stream partitionBy(Fields fields) {
projectionValidation(fields);
return partition(Grouping.fields(fields.toList()));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * @param partitioner
+ * @return
+ */
public Stream partition(CustomStreamGrouping partitioner) {
return partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * Use random round robin algorithm to evenly redistribute tuples across all target partitions
+ *
+ * @return
+ */
public Stream shuffle() {
return partition(Grouping.shuffle(new NullStruct()));
}
+ /**
+ * ## Repartitioning Operation
+ *
+ * Use random round robin algorithm to evenly redistribute tuples across all target partitions, with a preference
+ * for local tasks.
+ *
+ * @return
+ */
public Stream localOrShuffle() {
return partition(Grouping.local_or_shuffle(new NullStruct()));
}
+
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
+ * @return
+ */
public Stream global() {
// use this instead of storm's built in one so that we can specify a singleemitbatchtopartition
// without knowledge of storm's internals
return partition(new GlobalGrouping());
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * All tuples in the batch are sent to the same partition. Different batches in the stream may go to different
+ * partitions.
+ *
+ * @return
+ */
public Stream batchGlobal() {
// the first field is the batch id
return partition(new IndexHashGrouping(0));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do
+ * a stateQuery on every partition of data.
+ *
+ * @return
+ */
public Stream broadcast() {
return partition(Grouping.all(new NullStruct()));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * @return
+ */
public Stream identityPartition() {
return partition(new IdentityGrouping());
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * This method takes in a custom partitioning function that implements
+ * {@link org.apache.storm.grouping.CustomStreamGrouping}
+ *
+ * @param grouping
+ * @return
+ */
public Stream partition(Grouping grouping) {
if(_node instanceof PartitionNode) {
return each(new Fields(), new TrueFilter()).partition(grouping);
@@ -130,22 +246,28 @@ public class Stream implements IAggregatableStream {
return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, _name, getOutputFields(), grouping));
}
}
-
+
+ /**
+ * Applies an `Assembly` to this `Stream`.
+ *
+ * @see org.apache.storm.trident.operation.Assembly
+ * @param assembly
+ * @return
+ */
public Stream applyAssembly(Assembly assembly) {
return assembly.apply(this);
}
-
+
@Override
public Stream each(Fields inputFields, Function function, Fields functionFields) {
projectionValidation(inputFields);
return _topology.addSourcedNode(this,
new ProcessorNode(_topology.getUniqueStreamId(),
- _name,
- TridentUtils.fieldsConcat(getOutputFields(), functionFields),
- functionFields,
- new EachProcessor(inputFields, function)));
+ _name,
+ TridentUtils.fieldsConcat(getOutputFields(), functionFields),
+ functionFields,
+ new EachProcessor(inputFields, function)));
}
-
//creates brand new tuples with brand new fields
@Override
public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
index 0d55804..c74a968 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Assembly.java
@@ -20,6 +20,26 @@ package org.apache.storm.trident.operation;
import org.apache.storm.trident.Stream;
+/**
+ * The `Assembly` interface provides a means to encapsulate logic applied to a {@link org.apache.storm.trident.Stream}.
+ *
+ * Usage:
+ *
+ * ```java
+ * Stream mystream = ...;
+ * Stream assemblyStream = mystream.applyAssembly(myAssembly);
+ * ```
+ *
+ * @see org.apache.storm.trident.Stream
+ * @see org.apache.storm.trident.operation.builtin.FirstN
+ *
+ */
public interface Assembly {
+ /**
+ * Applies the `Assembly` to a given {@link org.apache.storm.trident.Stream}
+ *
+ * @param input
+ * @return
+ */
Stream apply(Stream input);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java b/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
index 4098e00..f1cf809 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/BaseOperation.java
@@ -19,12 +19,27 @@ package org.apache.storm.trident.operation;
import java.util.Map;
+/**
+ * Convenience implementation of the {@link org.apache.storm.trident.operation.Operation} interface.
+ *
+ * Provides no-op implementations of the `prepare()` and `cleanup()` methods.
+ */
public class BaseOperation implements Operation {
+ /**
+ * No-op implementation.
+ * @param conf the Storm configuration map
+ * @param context the operation context which provides information such as the number of partitions in the stream,
+ * and the current partition index. It also provides methods for registering operation-specific
+ * metrics.
+ */
@Override
public void prepare(Map conf, TridentOperationContext context) {
}
+ /**
+ * No-op implemnation.
+ */
@Override
public void cleanup() {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
index d8ab95a..7673ca2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Filter.java
@@ -19,6 +19,30 @@ package org.apache.storm.trident.operation;
import org.apache.storm.trident.tuple.TridentTuple;
+import java.util.Map;
+
+/**
+ * Filters take in a tuple as input and decide whether or not to keep that tuple or not.
+ *
+ * If the `isKeep()` method of a Filter returns `false` for a tuple, that tuple will be filtered out of the Stream
+ *
+ *
+ * ### Configuration
+ * If your `Filter` implementation has configuration requirements, you will typically want to extend
+ * {@link org.apache.storm.trident.operation.BaseFilter} and override the
+ * {@link org.apache.storm.trident.operation.Operation#prepare(Map, TridentOperationContext)} method to perform your custom
+ * initialization.
+
+ *
+ * @see org.apache.storm.trident.Stream
+ */
public interface Filter extends EachOperation {
+
+ /**
+ * Determines if a tuple should be filtered out of a stream
+ *
+ * @param tuple the tuple being evaluated
+ * @return `false` to drop the tuple, `true` to keep the tuple
+ */
boolean isKeep(TridentTuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
index 08b1680..b33a440 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Function.java
@@ -19,6 +19,74 @@ package org.apache.storm.trident.operation;
import org.apache.storm.trident.tuple.TridentTuple;
+import java.util.Map;
+
+/**
+ * A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple
+ * are appended to the original input tuple in the stream. If a function emits no tuples, the original input tuple is
+ * filtered out. Otherwise, the input tuple is duplicated for each output tuple.
+ *
+ * For example, if you have the following function:
+ *
+ * ```java
+ * public class MyFunction extends BaseFunction {
+ * public void execute(TridentTuple tuple, TridentCollector collector) {
+ * for(int i=0; i < tuple.getInteger(0); i++) {
+ * collector.emit(new Values(i));
+ * }
+ * }
+ * }
+ *
+ * ```
+ *
+ * Now suppose you have a stream in the variable `mystream` with the fields `["a", "b", "c"]` with the following tuples:
+ *
+ * ```
+ * [1, 2, 3]
+ * [4, 1, 6]
+ * [3, 0, 8]
+ * ```
+ * If you had the following code in your topology definition:
+ *
+ * ```java
+ * mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
+ * ```
+ *
+ * The resulting tuples would have the fields `["a", "b", "c", "d"]` and look like this:
+ *
+ * ```
+ * [1, 2, 3, 0]
+ * [1, 2, 3, 1]
+ * [4, 1, 6, 0]
+ * ```
+ *
+ * In this case, the parameter `new Fields("b")` tells Trident that you would like to select the field "b" as input
+ * to the function, and that will be the only field in the Tuple passed to the `execute()` method. The value of "b" in
+ * the first tuple (2) causes the for loop to execute twice, so 2 tuples are emitted. similarly the second tuple causes
+ * one tuple to be emitted. For the third tuple, the value of 0 causes the `for` loop to be skipped, so nothing is
+ * emitted and the incoming tuple is filtered out of the stream.
+ *
+ * ### Configuration
+ * If your `Function` implementation has configuration requirements, you will typically want to extend
+ * {@link org.apache.storm.trident.operation.BaseFunction} and override the
+ * {@link org.apache.storm.trident.operation.Operation#prepare(Map, TridentOperationContext)} method to perform your custom
+ * initialization.
+ *
+ * ### Performance Considerations
+ * Because Trident Functions perform logic on individual tuples -- as opposed to batches -- it is advisable
+ * to avoid expensive operations such as database operations in a Function, if possible. For data store interactions
+ * it is better to use a {@link org.apache.storm.trident.state.State} or
+ * {@link org.apache.storm.trident.state.QueryFunction} implementation since Trident states operate on batch partitions
+ * and can perform bulk updates to a database.
+ *
+ *
+ */
public interface Function extends EachOperation {
+ /**
+ * Performs the function logic on an individual tuple and emits 0 or more tuples.
+ *
+ * @param tuple The incoming tuple
+ * @param collector A collector instance that can be used to emit tuples
+ */
void execute(TridentTuple tuple, TridentCollector collector);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java b/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
index bf3d4d0..878a5c2 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/Operation.java
@@ -20,7 +20,27 @@ package org.apache.storm.trident.operation;
import java.io.Serializable;
import java.util.Map;
+/**
+ * Parent interface for Trident `Filter`s and `Function`s.
+ *
+ * `Operation` defines two lifecycle methods for Trident components. The `prepare()` method is called once when the
+ * `Operation` is first initialized. The `cleanup()` method is called in local mode when the local cluster is
+ * being shut down. In distributed mode, the `cleanup()` method is not guaranteed to be called in every situation, but
+ * Storm will make a best effort call `cleanup()` whenever possible.
+ */
public interface Operation extends Serializable {
+ /**
+ * Called when the `Operation` is first initialized.
+ * @param conf the Storm configuration map
+ * @param context the operation context which provides information such as the number of partitions in the stream,
+ * and the current partition index. It also provides methods for registering operation-specific
+ * metrics.
+ * @see org.apache.storm.trident.operation.TridentOperationContext
+ */
void prepare(Map conf, TridentOperationContext context);
+
+ /**
+ * When running in local mode, called when the local cluster is being shut down.
+ */
void cleanup();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java b/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
index bc02389..7392c9c 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/TridentCollector.java
@@ -20,7 +20,33 @@ package org.apache.storm.trident.operation;
import java.util.List;
+/**
+ * Interface for publishing tuples to a stream and reporting exceptions (to be displayed in Storm UI).
+ *
+ * Trident components that have the ability to emit tuples to a stream are passed an instance of this
+ * interface.
+ *
+ * For example, to emit a new tuple to a stream, you would do something like the following:
+ *
+ * ```java
+ * collector.emit(new Values("a", "b", "c"));
+ * ```
+ * @see org.apache.storm.trident.Stream
+ * @see org.apache.storm.tuple.Values
+ */
public interface TridentCollector {
+ /**
+ * Emits a tuple to a Stream
+ * @param values a list of values of which the tuple will be composed
+ */
void emit(List<Object> values);
+
+ /**
+ * Reports an error. The corresponding stack trace will be visible in the Storm UI.
+ *
+ * Note that calling this method does not alter the processing of a batch. To explicitly fail a batch and trigger
+ * a replay, components should throw {@link org.apache.storm.topology.FailedException}.
+ * @param t The instance of the error (Throwable) being reported.
+ */
void reportError(Throwable t);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 91a3293..07c5ae4 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -20,6 +20,9 @@ package org.apache.storm.trident.operation.builtin;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
+/**
+ * Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`.
+ */
public class Debug extends BaseFilter {
private final String name;
@@ -27,6 +30,10 @@ public class Debug extends BaseFilter {
name = "DEBUG: ";
}
+ /**
+ * Creates a `Debug` filter with a string identifier.
+ * @param name
+ */
public Debug(String name) {
this.name = "DEBUG(" + name + "): ";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
index 4047936..5567e4c 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FilterNull.java
@@ -20,6 +20,10 @@ package org.apache.storm.trident.operation.builtin;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
+/**
+ * Simple `Filter` implementation that filters out any tuples that have fields with a value of `null`.
+ *
+ */
public class FilterNull extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
index dd719dc..5db6f9d 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/FirstN.java
@@ -28,6 +28,11 @@ import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
+/**
+ *
+ * An {@link org.apache.storm.trident.operation.Assembly} implementation
+ *
+ */
public class FirstN implements Assembly {
Aggregator _agg;
http://git-wip-us.apache.org/repos/asf/storm/blob/3374a6c4/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
index 9269aee..32888ad 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Negate.java
@@ -22,6 +22,34 @@ import org.apache.storm.trident.operation.Filter;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.tuple.TridentTuple;
+/**
+ * A `Filter` implementation that inverts another delegate `Filter`.
+ *
+ * The `Negate.isKeep()` method simply returns the opposite of the delegate's `isKeep()` method:
+ *
+ * ```java
+ * public boolean isKeep(TridentTuple tuple) {
+ * return !this.delegate.isKeep(tuple);
+ * }
+ * ```
+ *
+ * The `Negate` filter is useful for dividing a Stream in two based on some boolean condition.
+ *
+ * Suppose we had a Stream named `userStream` containing information about users, and a custom `Filter` implementation,
+ * `RegisteredUserFilter` that filtered out unregistered users. We could divide the `userStream` Stream into two
+ * separate Streams -- one for registered users, and one for unregistered users -- by doing the following:
+ *
+ * ```java
+ * Stream userStream = ...
+ *
+ * Filter registeredFilter = new ResisteredUserFilter();
+ * Filter unregisteredFilter = new Negate(registeredFilter);
+ *
+ * Stream registeredUserStream = userStream.each(userStream.getOutputFields(), registeredFilter);
+ * Stream unregisteredUserStream = userStream.each(userStream.getOutputFields(), unregisteredFilter);
+ * ```
+ *
+ */
public class Negate implements Filter {
Filter _delegate;
[2/2] storm git commit: Added STORM-1214 to Changelog
Posted by bo...@apache.org.
Added STORM-1214 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/13051009
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/13051009
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/13051009
Branch: refs/heads/1.x-branch
Commit: 1305100949583ee5b627bd7ec1a3706060b6fdd9
Parents: 3374a6c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 22 15:48:17 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 22 15:49:45 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/13051009/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e2db183..0ef5f44 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1214: add javadoc for Trident Streams and Operations
* STORM-1450: Fix minor bugs and refactor code in ResourceAwareScheduler
* STORM-1452: Fixes profiling/debugging out of the box
* STORM-1406: Add MQTT Support