You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2018/03/09 23:08:33 UTC

[beam-site] 01/02: Update SQL doc to match new APIs

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 315acb1b25fd573d126160409284dd7ec46750cd
Author: akedin <ke...@google.com>
AuthorDate: Fri Mar 2 11:16:18 2018 -0800

    Update SQL doc to match new APIs
    
    BeamSql.querySimple() and queryMulti() were combined into query().
    BeamRecord was renamed to Row. Factory methods and builders were added to it.
---
 src/documentation/dsls/sql.md | 337 ++++++++++++++++++++++++++++--------------
 1 file changed, 226 insertions(+), 111 deletions(-)

diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md
index 2f6fafc..a6289dc 100644
--- a/src/documentation/dsls/sql.md
+++ b/src/documentation/dsls/sql.md
@@ -7,104 +7,145 @@ permalink: /documentation/dsls/sql/
 
 # Beam SQL
 
-* TOC
-{:toc}
-
 This page describes the implementation of Beam SQL, and how to simplify a Beam pipeline with DSL APIs.
 
 ## 1. Overview {#overview}
 
-SQL is a well-adopted standard to process data with concise syntax. With DSL APIs (currently available only in Java), now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline.
+SQL is a well-adopted standard to process data with concise syntax. With DSL APIs (currently available only in Java), now `PCollections` can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransforms` can be mixed in the same pipeline.
 
 There are two main pieces to the SQL DSL API:
 
-* [BeamRecord]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/BeamRecord.html): a new data type used to define composite records (i.e., rows) that consist of multiple, named columns of primitive data types. All SQL DSL queries must be made against collections of type `PCollection<BeamRecord>`. Note that `BeamRecord` itself is not SQL-specific, however, and may also be used in pipelines that do not utilize SQL.
-* [BeamSql]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/extensions/sql/BeamSql.html): the interface for creating `PTransforms` from SQL queries.
+* [BeamSql]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/extensions/sql/BeamSql.html): the interface for creating `PTransforms` from SQL queries;
+* [Row]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/Row.html) contains named columns with corresponding data types. Beam SQL queries can be made only against collections of type `PCollection<Row>`;
 
 We'll look at each of these below.
 
 ## 2. Usage of DSL APIs {#usage}
 
-### BeamRecord
+### Row
 
-Before applying a SQL query to a `PCollection`, the data in the collection must be in `BeamRecord` format. A `BeamRecord` represents a single, immutable row in a Beam SQL `PCollection`. The names and types of the fields/columns in the record are defined by its associated [BeamRecordType]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/BeamRecordType.html); for SQL queries, you should use the [BeamRecordSqlType]({{ site.baseurl [...]
+Before applying a SQL query to a `PCollection`, the data in the collection must be in `Row` format. A `Row` represents a single, immutable record in a Beam SQL `PCollection`. The names and types of the fields/columns in the row are defined by its associated [RowType]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/RowType.html).
+For SQL queries, you should use the [RowSqlType.builder()]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/extensions/sql/RowSqlType.html) to create `RowTypes`, it allows creating schemas with all supported SQL types (see [Data Types](#data-types) for more details on supported primitive data types).
 
 
-A `PCollection<BeamRecord>` can be created explicitly or implicitly:
+A `PCollection<Row>` can be obtained multiple ways, for example:
 
-Explicitly:
-  * **From in-memory data** (typically for unit testing). In this case, the record type and coder must be specified explicitly:
-    ```
+  * **From in-memory data** (typically for unit testing).
+
+    **Note:** you have to explicitly specify the `Row` coder. In this example we're doing it by calling `Create.of(..).withCoder()`:
+
+    ```java
     // Define the record type (i.e., schema).
-    List<String> fieldNames = Arrays.asList("appId", "description", "rowtime");
-    List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP);
-    BeamRecordSqlType appType = BeamRecordSqlType.create(fieldNames, fieldTypes);
+    RowType appType = 
+        RowSqlType
+          .builder()
+          .withIntegerField("appId")
+          .withVarcharField("description")
+          .withTimestampField("rowtime")
+          .build();
 
     // Create a concrete row with that type.
-    BeamRecord row = new BeamRecord(nameType, 1, "Some cool app", new Date());
-
-    //create a source PCollection containing only that row.
-    PCollection<BeamRecord> testApps = PBegin
-        .in(p)
-        .apply(Create.of(row)
-                     .withCoder(nameType.getRecordCoder()));
-    ```
-  * **From a `PCollection<T>`** where `T` is not already a `BeamRecord`, by applying a `PTransform` that converts input records to `BeamRecord` format:
+    Row row = 
+        Row
+          .withRowType(appType)
+          .addValues(1, "Some cool app", new Date())
+          .build();
+
+    // Create a source PCollection containing only that row
+    PCollection<Row> testApps = 
+        PBegin
+          .in(p)
+          .apply(Create
+                    .of(row)
+                    .withCoder(appType.getRowCoder()));
     ```
+  * **From a `PCollection<T>` of records of some other type**  (i.e.  `T` is not already a `Row`), by applying a `ParDo` that converts input records to `Row` format.
+
+    **Note:** you have to manually set the coder of the result by calling `setCoder(appType.getRowCoder())`:
+    ```java
     // An example POJO class.
     class AppPojo {
-      ...
-      public final Integer appId;
-      public final String description;
-      public final Date timestamp;
+      Integer appId;
+      String description;
+      Date timestamp;
     }
 
-    // Acquire a collection of Pojos somehow.
+    // Acquire a collection of POJOs somehow.
     PCollection<AppPojo> pojos = ...
 
-    // Convert them to BeamRecords with the same schema as defined above via a DoFn.
-    PCollection<BeamRecord> apps = pojos.apply(
-        ParDo.of(new DoFn<AppPojo, BeamRecord>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            c.output(new BeamRecord(appType, pojo.appId, pojo.description, pojo.timestamp));
-          }
-        }));
+    // Convert them to Rows with the same schema as defined above via a DoFn.
+    PCollection<Row> apps = pojos
+      .apply(
+          ParDo.of(new DoFn<AppPojo, Row>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              // Get the current POJO instance
+              AppPojo pojo = c.element();
+
+              // Create a Row with the appType schema 
+              // and values from the current POJO
+              Row appRow = 
+                    Row
+                      .withRowType(appType)
+                      .addValues(
+                        pojo.appId, 
+                        pojo.description, 
+                        pojo.timestamp)
+                      .build();
+
+              // Output the Row representing the current POJO
+              c.output(appRow);
+            }
+          }))
+      .setCoder(appType.getRowCoder());
     ```
 
+  * **As an output of another `BeamSql` query**. Details in the next section.
 
-Implicitly:
-* **As the result of a `BeamSql` `PTransform`** applied to a `PCollection<BeamRecord>` (details in the next section).
-
-Once you have a `PCollection<BeamRecord>` in hand, you may use the `BeamSql` APIs to apply SQL queries to it.
+Once you have a `PCollection<Row>` in hand, you may use the `BeamSql` APIs to apply SQL queries to it.
 
 ### BeamSql
 
-`BeamSql` provides two methods for generating a `PTransform` from a SQL query, both of which are equivalent except for the number of inputs they support:
-
-* `BeamSql.query()`, which may be applied to a single `PCollection`. The input collection must be referenced via the table name `PCOLLECTION` in the query:
-  ```
-  PCollection<BeamRecord> filteredNames = testApps.apply(
-      BeamSql.query("SELECT appId, description, rowtime FROM PCOLLECTION WHERE id=1"));
-  ```
-* `BeamSql.queryMulti()`, which may be applied to a `PCollectionTuple` containing one or more tagged `PCollection<BeamRecord>`s. The tuple tag for each `PCollection` in the tuple defines the table name that may used to query it. Note that table names are bound to the specific `PCollectionTuple`, and thus are only valid in the context of queries applied to it.
-  ```
-  // Create a reviews PCollection to join to our apps PCollection.
-  BeamRecordSqlType reviewType = BeamRecordSqlType.create(
-    Arrays.asList("appId", "reviewerId", "rating", "rowtime"),
-    Arrays.asList(Types.INTEGER, Types.INTEGER, Types.FLOAT, Types.TIMESTAMP));
-  PCollection<BeamRecord> reviews = ... [records w/ reviewType schema] ...
-
-  // Compute the # of reviews and average rating per app via a JOIN.
-  PCollectionTuple namesAndFoods = PCollectionTuple.of(
-      new TupleTag<BeamRecord>("Apps"), apps),
-      new TupleTag<BeamRecord>("Reviews"), reviews));
-  PCollection<BeamRecord> output = namesAndFoods.apply(
-      BeamSql.queryMulti("SELECT Names.appId, COUNT(Reviews.rating), AVG(Reviews.rating)
-                          FROM Apps INNER JOIN Reviews ON Apps.appId == Reviews.appId"));
-  ```
-
-Both methods wrap the back-end details of parsing/validation/assembling, and deliver a Beam SDK style API that can express simple TABLE_FILTER queries up to complex queries containing JOIN/GROUP_BY etc.
+`BeamSql.query(queryString)` method is the only API to create a `PTransform` from a string representation of the SQL query. You can apply this `PTransform` to either a single `PCollection` or a `PCollectionTuple` which holds multiple `PCollections`:
+
+  * when applying to a single `PCollection` it can be referenced via the table name `PCOLLECTION` in the query:
+    ```java
+    PCollection<Row> filteredNames = testApps.apply(
+        BeamSql.query(
+          "SELECT appId, description, rowtime "
+            + "FROM PCOLLECTION "
+            + "WHERE id=1"));
+    ```
+  * when applying to a `PCollectionTuple`, the tuple tag for each `PCollection` in the tuple defines the table name that may be used to query it. Note that table names are bound to the specific `PCollectionTuple`, and thus are only valid in the context of queries applied to it.  
+
+    For example, you can join two `PCollections`:  
+    ```java
+    // Create the schema for reviews
+    RowType reviewType = 
+        RowSqlType.
+          .withIntegerField("appId")
+          .withIntegerField("reviewerId")
+          .withFloatField("rating")
+          .withTimestampField("rowtime")
+          .build();
+    
+    // Obtain the reviews records with this schema
+    PCollection<Row> reviewsRows = ...
+
+    // Create a PCollectionTuple containing both PCollections.
+    // TupleTags IDs will be used as table names in the SQL query
+    PCollectionTuple namesAndFoods = PCollectionTuple.of(
+        new TupleTag<>("Apps"), appsRows), // appsRows from the previous example
+        new TupleTag<>("Reviews"), reviewsRows));
+
+    // Compute the total number of reviews 
+    // and average rating per app 
+    // by joining two PCollections
+    PCollection<Row> output = namesAndFoods.apply(
+        BeamSql.query(
+            "SELECT Names.appId, COUNT(Reviews.rating), AVG(Reviews.rating)"
+                + "FROM Apps INNER JOIN Reviews ON Apps.appId == Reviews.appId"));
+    ```
 
 [BeamSqlExample](https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java) in the code repository shows basic usage of both APIs.
 
@@ -167,60 +208,114 @@ groupItem:
 
 ### 3.1. Supported Features {#features}
 
-**1. aggregations;**
-
-Beam SQL supports aggregation functions with group_by in global_window, fixed_window, sliding_window and session_window. A field with type `TIMESTAMP` is required to specify fixed_window/sliding_window/session_window. The field is used as event timestamp for rows. See below for several examples:
-
+#### 3.1.1 Aggregations {#features-aggregations}
+
+Major standard aggregation functions are supported:
+ * `COUNT`
+ * `MAX`
+ * `MIN`
+ * `SUM`
+ * `AVG`
+ * `VAR_POP`
+ * `VAR_SAMP`
+ * `COVAR_POP`
+ * `COVAR_SAMP`
+
+**Note:** `DISTINCT` aggregation is not supported yet.
+
+#### 3.1.2 Windowing {#features-windowing}
+
+Beam SQL supports windowing functions specified in `GROUP BY` clause. `TIMESTAMP` field is required in this case. It is used as event timestamp for rows. 
+
+Supported windowing functions:
+* `TUMBLE`, or fixed windows. Example of how define a fixed window with duration of 1 hour:
+``` 
+    SELECT f_int, COUNT(*) 
+    FROM PCOLLECTION 
+    GROUP BY 
+      f_int,
+      TUMBLE(f_timestamp, INTERVAL '1' HOUR)
+```
+* `HOP`, or sliding windows. Example of how to define a sliding windows for every 30 minutes with 1 hour duration:
+```
+    SELECT f_int, COUNT(*)
+    FROM PCOLLECTION 
+    GROUP BY 
+      f_int, 
+      HOP(f_timestamp, INTERVAL '30' MINUTE, INTERVAL '1' HOUR)
+```
+* `SESSION`, session windows. Example of how to define a session window with 5 minutes gap duration:
+```
+    SELECT f_int, COUNT(*) 
+    FROM PCOLLECTION 
+    GROUP BY 
+      f_int, 
+      SESSION(f_timestamp, INTERVAL '5' MINUTE)
 ```
-//fixed window, one hour in duration
-SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, TUMBLE(f_timestamp, INTERVAL '1' HOUR)
 
-//sliding window, one hour in duration and 30 minutes period
-SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)
+**Note:** when no windowing function is specified in the query, then windowing strategy of the input `PCollections` is unchanged by the SQL query. If windowing function is specified in the query, then the windowing function of the `PCollection` is updated accordingly, but trigger stays unchanged.
 
-//session window, with 5 minutes gap duration
-SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, SESSION(f_timestamp, INTERVAL '5' MINUTE)
-```
 
-Note:
+#### 3.1.3 Joins {#features-joins}
 
-1. distinct aggregation is not supported yet.
-2. the default trigger is `Repeatedly.forever(AfterWatermark.pastEndOfWindow())`;
-3. when `time` field in `HOP(dateTime, slide, size [, time ])`/`TUMBLE(dateTime, interval [, time ])`/`SESSION(dateTime, interval [, time ])` is specified, a lateFiring trigger is added as
+#### 3.1.3.1 Overview
 
-```
-Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
-        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
-```		
+Supported `JOIN` types in Beam SQL:
+* `INNER`, `LEFT OUTER`, `RIGHT OUTER`;
+* Only equijoins (where join condition is an equality check) are supported.
 
-**2. Join (inner, left_outer, right_outer);**
+Unsupported `JOIN` types in Beam SQL:
+* `CROSS JOIN` is not supported (full cartesian product with no `ON` clause);
+* `FULL OUTER JOIN` is not supported (combination of `LEFT OUTER` and `RIGHT OUTER` joins);
 
 The scenarios of join can be categorized into 3 cases:
 
-1. BoundedTable JOIN BoundedTable
-2. UnboundedTable JOIN UnboundedTable
-3. BoundedTable JOIN UnboundedTable
+1. Bounded input `JOIN` bounded input;
+2. Unbounded input `JOIN` unbounded input;
+3. Unbounded input `JOIN` bounded input;
+
+Each of these scenarios is described below:
+
+#### 3.1.3.1 Bounded JOIN Bounded {#join-bounded-bounded}
+
+Standard join implementation is used. All elements from one input are matched with all elements from another input. Due to the fact that both inputs are bounded, no windowing or triggering is involved.
+
+#### 3.1.3.2 Unbounded JOIN Unbounded {#join-unbounded-unbounded}
+
+Standard join implementation is used. All elements from one input are matched with all elements from another input.
+
+**Windowing and Triggering**
+
+Following properties must be satisfied when joining unbounded inputs:
+ * inputs must have compatible windows, otherwise `IllegalArgumentException` will be thrown;
+ * triggers on each input should only fire once per window. Currently this means that the only supported trigger in this case is `DefaultTrigger` with zero allowed lateness. Using any other trigger will result in `UnsupportedOperationException` thrown;
+
+This means that inputs are joined per-window. That is, when the trigger fires (only once), then join is performed on all elements in the current window in both inputs. This allows to reason about what kind of output is going to be produced.
 
-For case 1 and case 2, a standard join is utilized as long as the windowFn of the both sides match. For case 3, sideInput is utilized to implement the join. So far there are some constraints:
+**Note:** similarly to `GroupByKeys` `JOIN` will update triggers using `Trigger.continuationTrigger()`. Other aspects of the inputs' windowing strategies remain unchanged.
+
+#### 3.1.3.3 Unbounded JOIN Bounded {#join-unbounded-bounded}
+
+For this type of `JOIN` bounded input is treated as a side-input by the implementation.
+
+This means that 
 
-* Only equal-join is supported, CROSS JOIN is not supported;
-* FULL OUTER JOIN is not supported;
-* If it's a LEFT OUTER JOIN, the unbounded table should on the left side; If it's a RIGHT OUTER JOIN, the unbounded table should on the right side;
 * window/trigger is inherented from upstreams, which should be consistent;
 
-**3. User Defined Function (UDF) and User Defined Aggregate Function (UDAF);**
+
+#### 3.1.4 User Defined Function (UDF) and User Defined Aggregate Function (UDAF) {#features-udfs-udafs}
 
 If the required function is not available, developers can register their own UDF(for scalar function) and UDAF(for aggregation function).
 
-**create and specify User Defined Function (UDF)**
+##### **3.1.4.1 Create and specify User Defined Function (UDF)**
 
 A UDF can be 1) any Java method that takes zero or more scalar fields and return one scalar value, or 2) a `SerializableFunction`. Below is an example of UDF and how to use it in DSL:
 
-```
+```java
 /**
  * A example UDF for test.
  */
-public static class CubicInteger implements BeamSqlUdf{
+public static class CubicInteger implements BeamSqlUdf {
   public static Integer eval(Integer input){
     return input * input * input;
   }
@@ -236,19 +331,30 @@ public static class CubicIntegerFn implements SerializableFunction<Integer, Inte
   }
 }
 
-// register and call in SQL
-String sql = "SELECT f_int, cubic1(f_int) as cubicvalue1, cubic2(f_int) as cubicvalue2 FROM PCOLLECTION WHERE f_int = 2";
+// Define a SQL query which calls the above UDFs
+String sql = 
+    "SELECT f_int, cubic1(f_int), cubic2(f_int)"
+      + "FROM PCOLLECTION "
+      + "WHERE f_int = 2";
+
+// Create and apply the PTransform representing the query.
+// Register the UDFs used in the query by calling '.registerUdf()' with 
+// either a class which implements BeamSqlUdf or with 
+// an instance of the SerializableFunction;
 PCollection<BeamSqlRow> result =
-    input.apply("udfExample",
-        BeamSql.simpleQuery(sql).withUdf("cubic1", CubicInteger.class)
-		                        .withUdf("cubic2", new CubicIntegerFn()));
+    input.apply(
+        "udfExample",
+        BeamSql
+            .query(sql)
+            .registerUdf("cubic1", CubicInteger.class)
+            .registerUdf("cubic2", new CubicIntegerFn())
 ```
 
-**create and specify User Defined Aggregate Function (UDAF)**
+##### **3.1.4.2 Create and specify User Defined Aggregate Function (UDAF)**
 
-Beam SQL can accept a `CombineFn` as UDAF. Here's an example of UDAF:
+Beam SQL can accept a `CombineFn` as UDAF. Registration is similar to the UDF example above:
 
-```
+```java
 /**
  * UDAF(CombineFn) for test, which returns the sum of square.
  */
@@ -277,18 +383,27 @@ public static class SquareSum extends CombineFn<Integer, Integer, Integer> {
   public Integer extractOutput(Integer accumulator) {
     return accumulator;
   }
-
 }
 
-//register and call in SQL
-String sql = "SELECT f_int1, squaresum(f_int2) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2";
+// Define a SQL query which calls the above UDAF
+String sql = 
+    "SELECT f_int1, squaresum(f_int2) "
+      + "FROM PCOLLECTION "
+      + "GROUP BY f_int2";
+      
+// Create and apply the PTransform representing the query.
+// Register the UDAFs used in the query by calling '.registerUdaf()' by 
+// providing it an instance of the CombineFn
 PCollection<BeamSqlRow> result =
-    input.apply("udafExample",
-        BeamSql.simpleQuery(sql).withUdaf("squaresum", new SquareSum()));
+    input.apply(
+        "udafExample",
+        BeamSql
+            .query(sql)
+            .registerUdaf("squaresum", new SquareSum()));
 ```
 
 ### 3.2. Data Types {#data-types}
-Each type in Beam SQL maps to a Java class to holds the value in `BeamRecord`. The following table lists the relation between SQL types and Java classes, which are supported in current repository:
+Each type in Beam SQL maps to a Java class to holds the value in `Row`. The following table lists the relation between SQL types and Java classes, which are supported in current repository:
 
 | SQL Type | Java class |
 | ---- | ---- |

-- 
To stop receiving notification emails like this one, please contact
mergebot-role@apache.org.