You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/25 18:27:43 UTC

[beam] branch master updated: [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema (#15763)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 39613ea  [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema (#15763)
39613ea is described below

commit 39613ea89fee6ae888095fcb5ac9ca5fd6d687f9
Author: Melissa Pashniak <me...@google.com>
AuthorDate: Mon Oct 25 11:26:28 2021 -0700

    [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema (#15763)
    
    * [BEAM-11758] Update basics page: Aggregation, Runner, UDF, Schema
    
    * Address most review feedback
    
    * Fix type in intro list
    
    * Address remaining two feedback comments
---
 .../www/site/content/en/documentation/basics.md    | 205 ++++++++++++++++-----
 .../content/en/documentation/programming-guide.md  |  17 ++
 website/www/site/static/images/aggregation.png     | Bin 0 -> 14065 bytes
 3 files changed, 171 insertions(+), 51 deletions(-)

diff --git a/website/www/site/content/en/documentation/basics.md b/website/www/site/content/en/documentation/basics.md
index bf7f20d..cb79e5d 100644
--- a/website/www/site/content/en/documentation/basics.md
+++ b/website/www/site/content/en/documentation/basics.md
@@ -17,10 +17,9 @@ limitations under the License.
 
 # Basics of the Beam model
 
-Suppose you have a data processing engine that can pretty easily process graphs
-of operations. You want to integrate it with the Beam ecosystem to get access
-to other languages, great event time processing, and a library of connectors.
-You need to know the core vocabulary:
+Apache Beam is a unified model for defining both batch and streaming
+data-parallel processing pipelines. To get started with Beam, you'll need to
+understand an important set of core concepts:
 
  * [_Pipeline_](#pipeline) - A pipeline is a user-constructed graph of
    transformations that defines the desired data processing operations.
@@ -30,16 +29,22 @@ You need to know the core vocabulary:
    data processing operation, or a step, in your pipeline. A transform is
    applied to zero or more `PCollection` objects, and produces zero or more
    `PCollection` objects.
- * _SDK_ - A language-specific library for pipeline authors (we often call them
-   "users" even though we have many kinds of users) to build transforms,
-   construct their pipelines and submit them to a runner
- * _Runner_ - You are going to write a piece of software called a runner that
-   takes a Beam pipeline and executes it using the capabilities of your data
-   processing engine.
-
-These concepts may be very similar to your processing engine's concepts. Since
-Beam's design is for cross-language operation and reusable libraries of
-transforms, there are some special features worth highlighting.
+ * [_Aggregation_](#aggregation) - Aggregation is computing a value from
+   multiple (1 or more) input elements.
+ * [_User-defined function (UDF)_](#user-defined-function-udf) - Some Beam
+   operations allow you to run user-defined code as a way to configure the
+   transform.
+ * [_Schema_](#schema) - A schema is a language-independent type definition for
+   a `PCollection`. The schema for a `PCollection` defines elements of that
+   `PCollection` as an ordered list of named fields.
+ * [_SDK_](/documentation/sdks/java/) - A language-specific library that lets
+   pipeline authors build transforms, construct their pipelines, and submit
+   them to a runner.
+ * [_Runner_](#runner) - A runner runs a Beam pipeline using the capabilities of
+   your chosen data processing engine.
+
+The following sections cover these concepts in more detail and provide links to
+additional documentation.
 
 ### Pipeline
 
@@ -215,45 +220,143 @@ For more information about PCollections, see the following page:
 
  * [Beam Programming Guide: PCollections](/documentation/programming-guide/#pcollections)
 
-### User-Defined Functions (UDFs)
-
-Beam has seven varieties of user-defined function (UDF). A Beam pipeline
-may contain UDFs written in a language other than your runner, or even multiple
-languages in the same pipeline (see the [Runner API](#the-runner-api)) so the
-definitions are language-independent (see the [Fn API](#the-fn-api)).
-
-The UDFs of Beam are:
-
- * _DoFn_ - per-element processing function (used in ParDo)
- * _WindowFn_ - places elements in windows and merges windows (used in Window
-   and GroupByKey)
- * _Source_ - emits data read from external sources, including initial and
-   dynamic splitting for parallelism (used in Read)
- * _ViewFn_ - adapts a materialized PCollection to a particular interface (used
-   in side inputs)
- * _WindowMappingFn_ - maps one element's window to another, and specifies
-   bounds on how far in the past the result window will be (used in side
-   inputs)
- * _CombineFn_ - associative and commutative aggregation (used in Combine and
-   state)
- * _Coder_ - encodes user data; some coders have standard formats and are not really UDFs
-
-The various types of user-defined functions will be described further alongside
-the [_PTransforms_](#ptransforms) that use them.
+### Aggregation
+
+Aggregation is computing a value from multiple (1 or more) input elements. In
+Beam, the primary computational pattern for aggregation is to group all elements
+with a common key and window then combine each group of elements using an
+associative and commutative operation. This is similar to the "Reduce" operation
+in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model, though it is
+enhanced to work with unbounded input streams as well as bounded data sets.
+
+<img src="/images/aggregation.png" alt="Aggregation of elements." width="120px">
+
+*Figure 1: Aggregation of elements. Elements with the same color represent those
+with a common key and window.*
+
+Some simple aggregation transforms include `Count` (computes the count of all
+elements in the aggregation), `Max` (computes the maximum element in the
+aggregation), and `Sum` (computes the sum of all elements in the aggregation).
+
+When elements are grouped and emitted as a bag, the aggregation is known as
+`GroupByKey` (the associative/commutative operation is bag union). In this case,
+the output is no smaller than the input. Often, you will apply an operation such
+as summation, called a `CombineFn`, in which the output is significantly smaller
+than the input. In this case the aggregation is called `CombinePerKey`.
+
+In a real application, you might have millions of keys and/or windows; that is
+why this is still an "embarassingly parallel" computational pattern. In those
+cases where you have fewer keys, you can add parallelism by adding a
+supplementary key, splitting each of your problem's natural keys into many
+sub-keys. After these sub-keys are aggregated, the results can be further
+combined into a result for the original natural key for your problem. The
+associativity of your aggregation function ensures that this yields the same
+answer, but with more parallelism.
+
+When your input is unbounded, the computational pattern of grouping elements by
+key and window is roughly the same, but governing when and how to emit the
+results of aggregation involves three concepts:
+
+ * Windowing, which partitions your input into bounded subsets that can be
+   complete.
+ * Watermarks, which estimate the completeness of your input.
+ * Triggers, which govern when and how to emit aggregated results.
+
+For more information about available aggregation transforms, see the following
+pages:
+
+ * [Beam Programming Guide: Core Beam transforms](/documentation/programming-guide/#core-beam-transforms)
+ * Beam Transform catalog
+   ([Java](/documentation/transforms/java/overview/#aggregation),
+   [Python](/documentation/transforms/python/overview/#aggregation))
+
+### User-defined function (UDF)
+
+Some Beam operations allow you to run user-defined code as a way to configure
+the transform. For example, when using `ParDo`, user-defined code specifies what
+operation to apply to every element. For `Combine`, it specifies how values
+should be combined. By using [cross-language transforms](/documentation/patterns/cross-language/),
+a Beam pipeline can contain UDFs written in a different language, or even
+multiple languages in the same pipeline.
+
+Beam has several varieties of UDFs:
+
+ * [_DoFn_](/programming-guide/#pardo) - per-element processing function (used
+   in `ParDo`)
+ * [_WindowFn_](/programming-guide/#setting-your-pcollections-windowing-function) -
+   places elements in windows and merges windows (used in `Window` and
+   `GroupByKey`)
+ * [_ViewFn_](/documentation/programming-guide/#side-inputs) - adapts a
+   materialized `PCollection` to a particular interface (used in side inputs)
+ * [_WindowMappingFn_](/documentation/programming-guide/#side-inputs-windowing) -
+   maps one element's window to another, and specifies bounds on how far in the
+   past the result window will be (used in side inputs)
+ * [_CombineFn_](/documentation/programming-guide/#combine) - associative and
+   commutative aggregation (used in `Combine` and state)
+ * [_Coder_](/documentation/programming-guide/#data-encoding-and-type-safety) -
+   encodes user data; some coders have standard formats and are not really UDFs
+
+Each language SDK has its own idiomatic way of expressing the user-defined
+functions in Beam, but there are common requirements. When you build user code
+for a Beam transform, you should keep in mind the distributed nature of
+execution. For example, there might be many copies of your function running on a
+lot of different machines in parallel, and those copies function independently,
+without communicating or sharing state with any of the other copies. Each copy
+of your user code function might be retried or run multiple times, depending on
+the pipeline runner and the processing backend that you choose for your
+pipeline. Beam also supports stateful processing through the
+[stateful processing API](/blog/stateful-processing/).
+
+For more information about user-defined functions, see the following pages:
+
+ * [Requirements for writing user code for Beam transforms](/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms)
+ * [Beam Programming Guide: ParDo](/documentation/programming-guide/#pardo)
+ * [Beam Programming Guide: WindowFn](/programming-guide/#setting-your-pcollections-windowing-function)
+ * [Beam Programming Guide: CombineFn](/documentation/programming-guide/#combine)
+ * [Beam Programming Guide: Coder](/documentation/programming-guide/#data-encoding-and-type-safety)
+ * [Beam Programming Guide: Side inputs](/documentation/programming-guide/#side-inputs)
+
+### Schema
+
+A schema is a language-independent type definition for a `PCollection`. The
+schema for a `PCollection` defines elements of that `PCollection` as an ordered
+list of named fields. Each field has a name, a type, and possibly a set of user
+options.
+
+In many cases, the element type in a `PCollection` has a structure that can be
+introspected. Some examples are JSON, Protocol Buffer, Avro, and database row
+objects. All of these formats can be converted to Beam Schemas. Even within a
+SDK pipeline, Simple Java POJOs (or equivalent structures in other languages)
+are often used as intermediate types, and these also have a clear structure that
+can be inferred by inspecting the class. By understanding the structure of a
+pipeline’s records, we can provide much more concise APIs for data processing.
+
+Beam provides a collection of transforms that operate natively on schemas.  For
+example, [Beam SQL](/documentation/dsls/sql/overview/) is a common transform
+that operates on schemas. These transforms allow selections and aggregations in
+terms of named schema fields. Another advantage of schemas is that they allow
+referencing of element fields by name. Beam provides a selection syntax for
+referencing fields, including nested and repeated fields.
+
+For more information about schemas, see the following pages:
+
+ * [Beam Programming Guide: Schemas](/documentation/programming-guide/#schemas)
+ * [Schema Patterns](/documentation/patterns/schema/)
 
 ### Runner
 
-The term "runner" is used for a couple of things. It generally refers to the
-software that takes a Beam pipeline and executes it somehow. Often, this is the
-translation code that you write. It usually also includes some customized
-operators for your data processing engine, and is sometimes used to refer to
-the full stack.
+A Beam runner runs a Beam pipeline on a specific platform. Most runners are
+translators or adapters to massively parallel big data processing systems, such
+as Apache Flink, Apache Spark, Google Cloud Dataflow, and more. For example, the
+Flink runner translates a Beam pipeline into a Flink job. The Direct Runner runs
+pipelines locally so you can test, debug, and validate that your pipeline
+adheres to the Apache Beam model as closely as possible.
+
+For an up-to-date list of Beam runners and which features of the Apache Beam
+model they support, see the runner
+[capability matrix](/documentation/runners/capability-matrix/).
 
-A runner has just a single method `run(Pipeline)`. From here on, I will often
-use code font for proper nouns in our APIs, whether or not the identifiers
-match across all SDKs.
+For more information about runners, see the following pages:
 
-The `run(Pipeline)` method should be asynchronous and results in a
-PipelineResult which generally will be a job descriptor for your data
-processing engine, providing methods for checking its status, canceling it, and
-waiting for it to terminate.
+ * [Choosing a Runner](/documentation/#choosing-a-runner)
+ * [Beam Capability Matrix](/documentation/runners/capability-matrix/)
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 9d3cac9..bde55f6 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -526,6 +526,10 @@ workers across a cluster may execute instances of your user code in parallel.
 The user code running on each worker generates the output elements that are
 ultimately added to the final output `PCollection` that the transform produces.
 
+> Aggregation is an important concept to understand when learning about Beam's
+> transforms. For an introduction to aggregation, see the Basics of the Beam
+> model [Aggregation section](/documentation/basics/#aggregation).
+
 The Beam SDKs contain a number of different transforms that you can apply to
 your pipeline's `PCollection`s. These include general-purpose core transforms,
 such as [ParDo](#pardo) or [Combine](#combine). There are also pre-written
@@ -1274,6 +1278,19 @@ function. More complex combination operations might require you to create a
 <span class="language-java language-py">subclass of</span> `CombineFn`
 that has an accumulation type distinct from the input/output type.
 
+The associativity and commutativity of a `CombineFn` allows runners to
+automatically apply some optimizations:
+
+ * **Combiner lifting**: This is the most significant optimization. Input
+   elements are combined per key and window before they are shuffled, so the
+   volume of data shuffled might be reduced by many orders of magnitude. Another
+   term for this optimization is "mapper-side combine."
+ * **Incremental combining**: When you have a `CombineFn` that reduces the data
+   size by a lot, it is useful to combine elements as they emerge from a
+   streaming shuffle. This spreads out the cost of doing combines over the time
+   that your streaming computation might be idle. Incremental combining also
+   reduces the storage of intermediate accumulators.
+
 ##### 4.2.4.1. Simple combinations using simple functions {#simple-combines}
 
 The following example code shows a simple combine function.
diff --git a/website/www/site/static/images/aggregation.png b/website/www/site/static/images/aggregation.png
new file mode 100755
index 0000000..c26cc9f
Binary files /dev/null and b/website/www/site/static/images/aggregation.png differ