You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/09 23:26:11 UTC
[incubator-heron] branch master updated: Document new streamlet
operations in Heron website (#3148)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new c6a7e66 Document new streamlet operations in Heron website (#3148)
c6a7e66 is described below
commit c6a7e66534ff46facd7c6a650252de007ca2f8b1
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Jan 9 15:26:06 2019 -0800
Document new streamlet operations in Heron website (#3148)
---
website/content/docs/concepts/streamlet-api.md | 158 ++++++++++++++++++++-
.../docs/developers/java/streamlet-api.mmark | 145 ++++++++++++++++++-
.../docs/developers/scala/streamlet-api.mmark | 140 +++++++++++++++++-
3 files changed, 440 insertions(+), 3 deletions(-)
diff --git a/website/content/docs/concepts/streamlet-api.md b/website/content/docs/concepts/streamlet-api.md
index 4d5774b..3891ae8 100644
--- a/website/content/docs/concepts/streamlet-api.md
+++ b/website/content/docs/concepts/streamlet-api.md
@@ -77,6 +77,7 @@ In this diagram, the **source streamlet** is produced by a random generator that
The Heron Streamlet API is currently available for:
* [Java](/docs/developers/java/streamlet-api)
+* [Scala](/docs/developers/scala/streamlet-api)
### The Heron Streamlet API and topologies
@@ -147,7 +148,14 @@ Operation | Description
[union](#filter-operations) | Unifies two streamlets into one, without [windowing](#windowing) or modifying the elements of the two streamlets
[clone](#clone-operations) | Creates any number of identical copies of a streamlet
[transform](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) | Modify the elements from an incoming streamlet and update the topology's state
-[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a [time window](#windowing), and in accordance with a reduce function that you apply to all the accumulated values
+[keyBy](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet
+[reduceByKey](#reduce-by-key-operations) | Produces a streamlet of key-value on each key and in accordance with a reduce function that you apply to all the accumulated values
+[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) | Produces a streamlet of key-value on each key, within a [time window](#windowing), and in accordance with a reduce function that you apply to all the accumulated values
+[countByKey](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key
+[countByKeyAndWindow](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a [time window](#windowing)
+[split](#split-operations) | Split a streamlet into multiple streamlets with different id.
+[withStream](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams
+[applyOperator](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet
[join](#join-operations) | Joins two separate key-value streamlets into a single streamlet on a key, within a [time window](#windowing), and in accordance with a join function
[log](#log-operations) | Logs the final streamlet output of the processing graph to stdout
[toSink](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.
@@ -299,6 +307,60 @@ builder.newSource(() -> "Some string over and over");
.log();
```
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .keyBy(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (get the length of each word)
+ word -> workd.length()
+ )
+ // The result is logged
+ .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .reduceByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (each word appears only once, hence the value is always 1)
+ word -> 1,
+ // Reduce operation (a running sum)
+ (x, y) -> x + y
+ )
+ // The result is logged
+ .log();
+```
+
### Reduce by key and window operations
You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -335,6 +397,100 @@ builder.newSource(() -> "Mary had a little lamb")
.log();
```
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(word -> word)
+ // The result is logged
+ .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](#windowing).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Window configuration
+ WindowConfig.TumblingCountWindow(50),
+ )
+ // The result is logged
+ .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+ splitter.put("long_word", s -> s.length() >= 4);
+ splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Splits the stream into streams of long and short words
+ .split(splitter)
+ // Choose the stream of the short words
+ .withStream("short_word")
+ // The result is logged
+ .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Apply user defined operation
+ .applyOperator(new MyBoltOperator())
+ // The result is logged
+ .log();
+```
+
### Join operations
Join operations in the Streamlet API take two streamlets (a "left" and a "right" streamlet) and join them together:
diff --git a/website/content/docs/developers/java/streamlet-api.mmark b/website/content/docs/developers/java/streamlet-api.mmark
index e8adc6b..a08dfa3 100644
--- a/website/content/docs/developers/java/streamlet-api.mmark
+++ b/website/content/docs/developers/java/streamlet-api.mmark
@@ -166,7 +166,14 @@ Operation | Description | Example
[`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
[`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
[`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer. | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) | Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
[`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
[`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
[`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
@@ -325,6 +332,56 @@ In this case, the resulting streamlet would consist of an indefinite stream with
> The effect of a join operation is to create a new streamlet *for each key*.
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .keyBy(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (get the length of each word)
+ word -> workd.length()
+ )
+ // The result is logged
+ .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .reduceByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Value extractor (each word appears only once, hence the value is always 1)
+ word -> 1,
+ // Reduce operation (a running sum)
+ (x, y) -> x + y
+ )
+ // The result is logged
+ .log();
+```
+
### Reduce by key and window operations
You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -359,6 +416,92 @@ Builder builder = Builder.newBuilder()
.log();
```
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(word -> word)
+ // The result is logged
+ .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ .countByKeyAndWindow(
+ // Key extractor (in this case, each word acts as the key)
+ word -> word,
+ // Window configuration
+ WindowConfig.TumblingCountWindow(50),
+ )
+ // The result is logged
+ .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+ splitter.put("long_word", s -> s.length() >= 4);
+ splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Splits the stream into streams of long and short words
+ .split(splitter)
+ // Choose the stream of the short words
+ .withStream("short_word")
+ // The result is logged
+ .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+ .newSource(() -> "Mary had a little lamb")
+ // Convert each sentence into individual words
+ .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+ // Apply user defined operation
+ .applyOperator(new MyBoltOperator())
+ // The result is logged
+ .log();
+```
+
### Repartition operations
When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an [...]
diff --git a/website/content/docs/developers/scala/streamlet-api.mmark b/website/content/docs/developers/scala/streamlet-api.mmark
index c031bd6..ba3e3c2 100644
--- a/website/content/docs/developers/scala/streamlet-api.mmark
+++ b/website/content/docs/developers/scala/streamlet-api.mmark
@@ -132,7 +132,14 @@ Operation | Description | Example
[`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
[`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
[`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) | Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
[`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
[`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
[`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
@@ -301,6 +308,55 @@ In this case, the resulting streamlet would consist of an indefinite stream with
> The effect of a `join` operation is to create a new streamlet *for each key*.
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ .keyBy[String, Int](
+ // Key extractor (in this case, each word acts as the key)
+ (word: String) => word,
+ // Value extractor (get the length of each word)
+ (word: String) => word.length
+ )
+ // The result is logged
+ .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ .reduceByKey[String, Int](
+ // Key extractor (in this case, each word acts as the key)
+ (word: String) => word,
+ // Value extractor (each word appears only once, hence the value is always 1)
+ (word: String) => 1,
+ // Reduce operation (a running sum)
+ (x: Int, y: Int) => x + y)
+ // The result is logged
+ .log();
+```
+
### Reduce by key and window operations
You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -334,6 +390,88 @@ builder
.log();
```
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Count the number of occurrences of each word
+ .countByKey[String]((word: String) => word)
+ // The result is logged
+ .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Count the number of occurrences of each word within each time window
+ .countByKeyAndWindow[String](
+ (word: String) => word,
+ WindowConfig.TumblingCountWindow(50))
+ // The result is logged
+ .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Count the number of occurrences of each word within each time window
+ .split(Map(
+ "long_word" -> { word: String => word.length >= 4 },
+ "short_word" -> { word: String => word.length < 4 }
+ ))
+ .withStream("short_word)
+ // The result is logged
+ .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+private class MyBoltOperator extends MyBolt
+ with IStreamletOperator[String, String] {
+}
+
+builder
+ .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+ // Convert each sentence into individual words
+ .flatMap[String](_.split(" "))
+ // Apply user defined operation
+ .applyOperator(new MyBoltOperator())
+ // The result is logged
+ .log();
+```
+
### Repartition operations
When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an [...]