You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2019/01/09 23:26:08 UTC

[GitHub] nwangtw closed pull request #3148: Document new streamlet operations in Heron website

nwangtw closed pull request #3148: Document new streamlet operations in Heron website
URL: https://github.com/apache/incubator-heron/pull/3148
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/website/content/docs/concepts/streamlet-api.md b/website/content/docs/concepts/streamlet-api.md
index 4d5774bfb2..3891ae8a1d 100644
--- a/website/content/docs/concepts/streamlet-api.md
+++ b/website/content/docs/concepts/streamlet-api.md
@@ -77,6 +77,7 @@ In this diagram, the **source streamlet** is produced by a random generator that
 The Heron Streamlet API is currently available for:
 
 * [Java](/docs/developers/java/streamlet-api)
+* [Scala](/docs/developers/scala/streamlet-api)
 
 ### The Heron Streamlet API and topologies
 
@@ -147,7 +148,14 @@ Operation | Description
 [union](#filter-operations) | Unifies two streamlets into one, without [windowing](#windowing) or modifying the elements of the two streamlets
 [clone](#clone-operations) | Creates any number of identical copies of a streamlet
 [transform](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) | Modify the elements from an incoming streamlet and update the topology's state
-[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a [time window](#windowing), and in accordance with a reduce function that you apply to all the accumulated values
+[keyBy](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet
+[reduceByKey](#reduce-by-key-operations) | Produces a streamlet of key-value on each key and in accordance with a reduce function that you apply to all the accumulated values
+[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) |  Produces a streamlet of key-value on each key, within a [time window](#windowing), and in accordance with a reduce function that you apply to all the accumulated values
+[countByKey](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key
+[countByKeyAndWindow](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a [time window](#windowing)
+[split](#split-operations) | Split a streamlet into multiple streamlets with different id.
+[withStream](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams
+[applyOperator](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet
 [join](#join-operations) | Joins two separate key-value streamlets into a single streamlet on a key, within a [time window](#windowing), and in accordance with a join function
 [log](#log-operations) | Logs the final streamlet output of the processing graph to stdout
 [toSink](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.
@@ -299,6 +307,60 @@ builder.newSource(() -> "Some string over and over");
         .log();
 ```
 
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .keyBy(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (get the length of each word)
+        word -> workd.length()
+    )
+    // The result is logged
+    .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .reduceByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (each word appears only once, hence the value is always 1)
+        word -> 1,
+        // Reduce operation (a running sum)
+        (x, y) -> x + y
+    )
+    // The result is logged
+    .log();
+```
+
 ### Reduce by key and window operations
 
 You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -335,6 +397,100 @@ builder.newSource(() -> "Mary had a little lamb")
     .log();
 ```
 
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(word -> word)
+    // The result is logged
+    .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](#windowing).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Window configuration
+        WindowConfig.TumblingCountWindow(50),
+    )
+    // The result is logged
+    .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+    splitter.put("long_word", s -> s.length() >= 4);
+    splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Splits the stream into streams of long and short words
+    .split(splitter)
+    // Choose the stream of the short words
+    .withStream("short_word")
+    // The result is logged
+    .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Apply user defined operation
+    .applyOperator(new MyBoltOperator())
+    // The result is logged
+    .log();
+```
+
 ### Join operations
 
 Join operations in the Streamlet API take two streamlets (a "left" and a "right" streamlet) and join them together:
diff --git a/website/content/docs/developers/java/streamlet-api.mmark b/website/content/docs/developers/java/streamlet-api.mmark
index e8adc6b0d1..a08dfa3dae 100644
--- a/website/content/docs/developers/java/streamlet-api.mmark
+++ b/website/content/docs/developers/java/streamlet-api.mmark
@@ -166,7 +166,14 @@ Operation | Description | Example
 [`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
 [`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
 [`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer. | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) |  Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) |  Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
 [`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
 [`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
 [`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
@@ -325,6 +332,56 @@ In this case, the resulting streamlet would consist of an indefinite stream with
 
 > The effect of a join operation is to create a new streamlet *for each key*.
 
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .keyBy(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (get the length of each word)
+        word -> workd.length()
+    )
+    // The result is logged
+    .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .reduceByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (each word appears only once, hence the value is always 1)
+        word -> 1,
+        // Reduce operation (a running sum)
+        (x, y) -> x + y
+    )
+    // The result is logged
+    .log();
+```
+
 ### Reduce by key and window operations
 
 You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -359,6 +416,92 @@ Builder builder = Builder.newBuilder()
     .log();
 ```
 
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(word -> word)
+    // The result is logged
+    .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Window configuration
+        WindowConfig.TumblingCountWindow(50),
+    )
+    // The result is logged
+    .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+    splitter.put("long_word", s -> s.length() >= 4);
+    splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Splits the stream into streams of long and short words
+    .split(splitter)
+    // Choose the stream of the short words
+    .withStream("short_word")
+    // The result is logged
+    .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Apply user defined operation
+    .applyOperator(new MyBoltOperator())
+    // The result is logged
+    .log();
+```
+
 ### Repartition operations
 
 When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an example:
diff --git a/website/content/docs/developers/scala/streamlet-api.mmark b/website/content/docs/developers/scala/streamlet-api.mmark
index c031bd6ee4..ba3e3c2fa6 100644
--- a/website/content/docs/developers/scala/streamlet-api.mmark
+++ b/website/content/docs/developers/scala/streamlet-api.mmark
@@ -132,7 +132,14 @@ Operation | Description | Example
 [`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
 [`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
 [`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) |  Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) |  Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
 [`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
 [`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
 [`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
@@ -301,6 +308,55 @@ In this case, the resulting streamlet would consist of an indefinite stream with
 
 > The effect of a `join` operation is to create a new streamlet *for each key*.
 
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  .keyBy[String, Int](
+      // Key extractor (in this case, each word acts as the key)
+      (word: String) => word,
+      // Value extractor (get the length of each word)
+      (word: String) => word.length
+  )
+  // The result is logged
+  .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  .reduceByKey[String, Int](
+      // Key extractor (in this case, each word acts as the key)
+      (word: String) => word,
+      // Value extractor (each word appears only once, hence the value is always 1)
+      (word: String) => 1,
+      // Reduce operation (a running sum)
+      (x: Int, y: Int) => x + y)
+  // The result is logged
+  .log();
+```
+
 ### Reduce by key and window operations
 
 You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -334,6 +390,88 @@ builder
   .log();
 ```
 
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Count the number of occurrences of each word
+  .countByKey[String]((word: String) => word)
+  // The result is logged
+  .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Count the number of occurrences of each word within each time window
+  .countByKeyAndWindow[String](
+      (word: String) => word,
+      WindowConfig.TumblingCountWindow(50))
+  // The result is logged
+  .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Count the number of occurrences of each word within each time window
+  .split(Map(
+      "long_word" -> { word: String => word.length >= 4 },
+      "short_word" -> { word: String => word.length < 4 }
+  ))
+  .withStream("short_word)
+  // The result is logged
+  .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+private class MyBoltOperator extends MyBolt
+    with IStreamletOperator[String, String] {
+}
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Apply user defined operation
+  .applyOperator(new MyBoltOperator())
+  // The result is logged
+  .log();
+```
+
 ### Repartition operations
 
 When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an example:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services