You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/09 23:26:11 UTC

[incubator-heron] branch master updated: Document new streamlet operations in Heron website (#3148)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new c6a7e66  Document new streamlet operations in Heron website (#3148)
c6a7e66 is described below

commit c6a7e66534ff46facd7c6a650252de007ca2f8b1
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Jan 9 15:26:06 2019 -0800

    Document new streamlet operations in Heron website (#3148)
---
 website/content/docs/concepts/streamlet-api.md     | 158 ++++++++++++++++++++-
 .../docs/developers/java/streamlet-api.mmark       | 145 ++++++++++++++++++-
 .../docs/developers/scala/streamlet-api.mmark      | 140 +++++++++++++++++-
 3 files changed, 440 insertions(+), 3 deletions(-)

diff --git a/website/content/docs/concepts/streamlet-api.md b/website/content/docs/concepts/streamlet-api.md
index 4d5774b..3891ae8 100644
--- a/website/content/docs/concepts/streamlet-api.md
+++ b/website/content/docs/concepts/streamlet-api.md
@@ -77,6 +77,7 @@ In this diagram, the **source streamlet** is produced by a random generator that
 The Heron Streamlet API is currently available for:
 
 * [Java](/docs/developers/java/streamlet-api)
+* [Scala](/docs/developers/scala/streamlet-api)
 
 ### The Heron Streamlet API and topologies
 
@@ -147,7 +148,14 @@ Operation | Description
 [union](#filter-operations) | Unifies two streamlets into one, without [windowing](#windowing) or modifying the elements of the two streamlets
 [clone](#clone-operations) | Creates any number of identical copies of a streamlet
 [transform](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) | Modify the elements from an incoming streamlet and update the topology's state
-[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a [time window](#windowing), and in accordance with a reduce function that you apply to all the accumulated values
+[keyBy](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet
+[reduceByKey](#reduce-by-key-operations) | Produces a streamlet of key-value on each key and in accordance with a reduce function that you apply to all the accumulated values
+[reduceByKeyAndWindow](#reduce-by-key-and-window-operations) |  Produces a streamlet of key-value on each key, within a [time window](#windowing), and in accordance with a reduce function that you apply to all the accumulated values
+[countByKey](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key
+[countByKeyAndWindow](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a [time window](#windowing)
+[split](#split-operations) | Split a streamlet into multiple streamlets with different id.
+[withStream](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams
+[applyOperator](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet
 [join](#join-operations) | Joins two separate key-value streamlets into a single streamlet on a key, within a [time window](#windowing), and in accordance with a join function
 [log](#log-operations) | Logs the final streamlet output of the processing graph to stdout
 [toSink](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.
@@ -299,6 +307,60 @@ builder.newSource(() -> "Some string over and over");
         .log();
 ```
 
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .keyBy(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (get the length of each word)
+        word -> workd.length()
+    )
+    // The result is logged
+    .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .reduceByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (each word appears only once, hence the value is always 1)
+        word -> 1,
+        // Reduce operation (a running sum)
+        (x, y) -> x + y
+    )
+    // The result is logged
+    .log();
+```
+
 ### Reduce by key and window operations
 
 You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -335,6 +397,100 @@ builder.newSource(() -> "Mary had a little lamb")
     .log();
 ```
 
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(word -> word)
+    // The result is logged
+    .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](#windowing).
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Window configuration
+        WindowConfig.TumblingCountWindow(50),
+    )
+    // The result is logged
+    .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+    splitter.put("long_word", s -> s.length() >= 4);
+    splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Splits the stream into streams of long and short words
+    .split(splitter)
+    // Choose the stream of the short words
+    .withStream("short_word")
+    // The result is logged
+    .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet.
+
+#### Java example
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Apply user defined operation
+    .applyOperator(new MyBoltOperator())
+    // The result is logged
+    .log();
+```
+
 ### Join operations
 
 Join operations in the Streamlet API take two streamlets (a "left" and a "right" streamlet) and join them together:
diff --git a/website/content/docs/developers/java/streamlet-api.mmark b/website/content/docs/developers/java/streamlet-api.mmark
index e8adc6b..a08dfa3 100644
--- a/website/content/docs/developers/java/streamlet-api.mmark
+++ b/website/content/docs/developers/java/streamlet-api.mmark
@@ -166,7 +166,14 @@ Operation | Description | Example
 [`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
 [`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
 [`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer. | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) |  Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) |  Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
 [`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
 [`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
 [`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
@@ -325,6 +332,56 @@ In this case, the resulting streamlet would consist of an indefinite stream with
 
 > The effect of a join operation is to create a new streamlet *for each key*.
 
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .keyBy(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (get the length of each word)
+        word -> workd.length()
+    )
+    // The result is logged
+    .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .reduceByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Value extractor (each word appears only once, hence the value is always 1)
+        word -> 1,
+        // Reduce operation (a running sum)
+        (x, y) -> x + y
+    )
+    // The result is logged
+    .log();
+```
+
 ### Reduce by key and window operations
 
 You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -359,6 +416,92 @@ Builder builder = Builder.newBuilder()
     .log();
 ```
 
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
+
+```java
+import java.util.Arrays;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(word -> word)
+    // The result is logged
+    .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
+
+```java
+import java.util.Arrays;
+
+import org.apache.heron.streamlet.WindowConfig;
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    .countByKeyAndWindow(
+        // Key extractor (in this case, each word acts as the key)
+        word -> word,
+        // Window configuration
+        WindowConfig.TumblingCountWindow(50),
+    )
+    // The result is logged
+    .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+Map<String, SerializablePredicate<String>> splitter = new HashMap();
+    splitter.put("long_word", s -> s.length() >= 4);
+    splitter.put("short_word", s -> s.length() < 4);
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Splits the stream into streams of long and short words
+    .split(splitter)
+    // Choose the stream of the short words
+    .withStream("short_word")
+    // The result is logged
+    .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
+
+```java
+import java.util.Arrays;
+
+private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
+}
+
+Builder builder = Builder.newBuilder()
+    .newSource(() -> "Mary had a little lamb")
+    // Convert each sentence into individual words
+    .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
+    // Apply user defined operation
+    .applyOperator(new MyBoltOperator())
+    // The result is logged
+    .log();
+```
+
 ### Repartition operations
 
 When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an [...]
diff --git a/website/content/docs/developers/scala/streamlet-api.mmark b/website/content/docs/developers/scala/streamlet-api.mmark
index c031bd6..ba3e3c2 100644
--- a/website/content/docs/developers/scala/streamlet-api.mmark
+++ b/website/content/docs/developers/scala/streamlet-api.mmark
@@ -132,7 +132,14 @@ Operation | Description | Example
 [`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
 [`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
 [`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
-[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet out of two separate key-value streamlets on a key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
+[`reduceByKey`](#reduce-by-key-operations) |  Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
+[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) |  Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
+[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
+[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
+[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
+[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
+[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
 [`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
 [`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
 [`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
@@ -301,6 +308,55 @@ In this case, the resulting streamlet would consist of an indefinite stream with
 
 > The effect of a `join` operation is to create a new streamlet *for each key*.
 
+### Key by operations
+
+Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  .keyBy[String, Int](
+      // Key extractor (in this case, each word acts as the key)
+      (word: String) => word,
+      // Value extractor (get the length of each word)
+      (word: String) => word.length
+  )
+  // The result is logged
+  .log();
+```
+
+### Reduce by key operations
+
+You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
+
+* a key extractor that determines what counts as the key for the streamlet
+* a value extractor that determines which final value is chosen for each element of the streamlet
+* a reduce function that produces a single value for each key in the streamlet
+
+Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  .reduceByKey[String, Int](
+      // Key extractor (in this case, each word acts as the key)
+      (word: String) => word,
+      // Value extractor (each word appears only once, hence the value is always 1)
+      (word: String) => 1,
+      // Reduce operation (a running sum)
+      (x: Int, y: Int) => x + y)
+  // The result is logged
+  .log();
+```
+
 ### Reduce by key and window operations
 
 You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
@@ -334,6 +390,88 @@ builder
   .log();
 ```
 
+### Count by key operations
+
+Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Count the number of occurrences of each word
+  .countByKey[String]((word: String) => word)
+  // The result is logged
+  .log();
+```
+
+### Count by key and window operations
+
+Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Count the number of occurrences of each word within each time window
+  .countByKeyAndWindow[String](
+      (word: String) => word,
+      WindowConfig.TumblingCountWindow(50))
+  // The result is logged
+  .log();
+```
+
+### Split operations
+
+Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Count the number of occurrences of each word within each time window
+  .split(Map(
+      "long_word" -> { word: String => word.length >= 4 },
+      "short_word" -> { word: String => word.length < 4 }
+  ))
+  .withStream("short_word)
+  // The result is logged
+  .log();
+```
+
+### With stream operations
+
+With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
+
+### Apply operator operations
+
+Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
+
+```scala
+val builder = Builder.newBuilder()
+
+private class MyBoltOperator extends MyBolt
+    with IStreamletOperator[String, String] {
+}
+
+builder
+  .newSource(() => "Paco de Lucia is one of the most popular virtuoso")
+  // Convert each sentence into individual words
+  .flatMap[String](_.split(" "))
+  // Apply user defined operation
+  .applyOperator(new MyBoltOperator())
+  // The result is logged
+  .log();
+```
+
 ### Repartition operations
 
 When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an [...]