You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2017/08/24 20:00:15 UTC

[beam-site] branch mergebot updated (f969e19 -> d00f31d)

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


    from f969e19  This closes #295
     add 7b767fe  Prepare repository for deployment.
     new 658a0ed  [BEAM-2800] Revamp sections 1 and 2 of DSL SQL docs: - Expand BeamRecord docs - Expand examples
     new 95c0193  Remove TODO
     new d00f31d  This closes #299

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/documentation/programming-guide/index.html |   4 +-
 src/documentation/dsls/sql.md                      | 125 ++++++++++++++-------
 2 files changed, 86 insertions(+), 43 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].

[beam-site] 03/03: This closes #299

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit d00f31d424342233dcf7804a5ebe16a1de4b3a28
Merge: 7b767fe 95c0193
Author: Mergebot <me...@apache.org>
AuthorDate: Thu Aug 24 20:00:00 2017 +0000

    This closes #299

 src/documentation/dsls/sql.md | 125 ++++++++++++++++++++++++++++--------------
 1 file changed, 84 insertions(+), 41 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam-site] 02/03: Remove TODO

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 95c01933d3d42c8dc79cd262c27355eb9a93ae6c
Author: Tyler Akidau <ta...@apache.org>
AuthorDate: Thu Aug 24 12:13:50 2017 -0700

    Remove TODO
---
 src/documentation/dsls/sql.md | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md
index ac55647..ce893cd 100644
--- a/src/documentation/dsls/sql.md
+++ b/src/documentation/dsls/sql.md
@@ -52,10 +52,6 @@ Explicitly:
         .apply(Create.of(row)
                      .withCoder(nameType.getRecordCoder()));
     ```
-  * **From an external source**, if the external data are already encoded with the appropriate `BeamRecordCoder`.
-    ```
-    TODO: example. Is this even possible currently?
-    ```
   * **From a `PCollection<T>`** where `T` is not already a `BeamRecord`, by applying a `PTransform` that converts input records to `BeamRecord` format:
     ```
     // An example POJO class.

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam-site] 01/03: [BEAM-2800] Revamp sections 1 and 2 of DSL SQL docs: - Expand BeamRecord docs - Expand examples

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 658a0ed50a54d116101998b2dd62fb95772066fc
Author: Tyler Akidau <ta...@apache.org>
AuthorDate: Thu Aug 24 10:36:12 2017 -0700

    [BEAM-2800] Revamp sections 1 and 2 of DSL SQL docs:
    - Expand BeamRecord docs
    - Expand examples
---
 src/documentation/dsls/sql.md | 129 ++++++++++++++++++++++++++++--------------
 1 file changed, 88 insertions(+), 41 deletions(-)

diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md
index 4f25fe4..ac55647 100644
--- a/src/documentation/dsls/sql.md
+++ b/src/documentation/dsls/sql.md
@@ -8,9 +8,8 @@ permalink: /documentation/dsls/sql/
 * [2. Usage of DSL APIs](#usage)
 * [3. Functionality in Beam SQL](#functionality)
   * [3.1. Supported Features](#features)
-  * [3.2. Intro of BeamSqlRow](#beamsqlrow)
-  * [3.3. Data Types](#data-type)
-  * [3.4. built-in SQL functions](#built-in-functions)
+  * [3.2. Data Types](#data-type)
+  * [3.3. built-in SQL functions](#built-in-functions)
 * [4. The Internal of Beam SQL](#internal-of-sql)
 
 This page describes the implementation of Beam SQL, and how to simplify a Beam pipeline with DSL APIs.
@@ -18,38 +17,103 @@ This page describes the implementation of Beam SQL, and how to simplify a Beam p
 > Note, Beam SQL hasn't been merged to master branch yet(being developed with branch [DSL_SQL](https://github.com/apache/beam/tree/DSL_SQL)), but is coming soon.
 
 # <a name="overview"></a>1. Overview
-SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline.
+SQL is a well-adopted standard to process data with concise syntax. With DSL APIs (currently available only in Java), now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline.
 
+There are two main pieces to the SQL DSL API:
+
+* [BeamRecord]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/BeamRecord.html): a new data type used to define composite records (i.e., rows) that consist of multiple, named columns of primitive data types. All SQL DSL queries must be made against collections of type `PCollection<BeamRecord>`. Note that `BeamRecord` itself is not SQL-specific, however, and may also be used in pipelines that do not utilize SQL.
+* [BeamSql]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/extensions/sql/BeamSql.html): the interface for creating `PTransforms` from SQL queries.
+
+We'll look at each of these below.
 
 # <a name="usage"></a>2. Usage of DSL APIs 
-`BeamSql` is the only interface(with two methods `BeamSql.query()` and `BeamSql.simpleQuery()`) for developers. It wraps the back-end details of parsing/validation/assembling, and deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. 
 
-*Note*, the two methods are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s; `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`.
+## BeamRecord
+
+Before applying a SQL query to a `PCollection`, the data in the collection must be in `BeamRecord` format. A `BeamRecord` represents a single, immutable row in a Beam SQL `PCollection`. The names and types of the fields/columns in the record are defined by its associated [BeamRecordType]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/values/BeamRecordType.html); for SQL queries, you should use the [BeamRecordSqlType]({{ site.baseurl [...]
+
+
+A `PCollection<BeamRecord>` can be created explicitly or implicitly:
+
+Explicitly:
+  * **From in-memory data** (typically for unit testing). In this case, the record type and coder must be specified explicitly:
+    ```
+    // Define the record type (i.e., schema).
+    List<String> fieldNames = Arrays.asList("appId", "description", "rowtime");
+    List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP);
+    BeamRecordSqlType appType = BeamRecordSqlType.create(fieldNames, fieldTypes);
+
+    // Create a concrete row with that type.
+    BeamRecord row = new BeamRecord(nameType, 1, "Some cool app", new Date());
+
+    //create a source PCollection containing only that row.
+    PCollection<BeamRecord> testApps = PBegin
+        .in(p)
+        .apply(Create.of(row)
+                     .withCoder(nameType.getRecordCoder()));
+    ```
+  * **From an external source**, if the external data are already encoded with the appropriate `BeamRecordCoder`.
+    ```
+    TODO: example. Is this even possible currently?
+    ```
+  * **From a `PCollection<T>`** where `T` is not already a `BeamRecord`, by applying a `PTransform` that converts input records to `BeamRecord` format:
+    ```
+    // An example POJO class.
+    class AppPojo {
+      ...
+      public final Integer appId;
+      public final String description;
+      public final Date timestamp;
+    }
+
+    // Acquire a collection of Pojos somehow.
+    PCollection<AppPojo> pojos = ...
 
-[BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java) in code repository shows the usage of both APIs:
+    // Convert them to BeamRecords with the same schema as defined above via a DoFn.
+    PCollection<BeamRecord> apps = pojos.apply(
+        ParDo.of(new DoFn<AppPojo, BeamRecord>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            c.output(new BeamRecord(appType, pojo.appId, pojo.description, pojo.timestamp));
+          }
+        }));
+    ```
 
-```
-//Step 1. create a source PCollection with Create.of();
-BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
-...
 
-PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row...)
-    .withCoder(type.getRecordCoder()));
+Implicitly:
+* **As the result of a `BeamSql` `PTransform`** applied to a `PCollection<BeamRecord>` (details in the next section).
 
-//Step 2. (Case 1) run a simple SQL query over input PCollection with BeamSql.simpleQuery;
-PCollection<BeamRecord> outputStream = inputTable.apply(
-    BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+Once you have a `PCollection<BeamRecord>` in hand, you may use the `BeamSql` APIs to apply SQL queries to it.
 
+## BeamSql
 
-//Step 2. (Case 2) run the query with BeamSql.query over result PCollection of (case 1);
-PCollection<BeamRecord> outputStream2 =
-    PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
-        .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
-```
+`BeamSql` provides two methods for generating a `PTransform` from a SQL query, both of which are equivalent except for the number of inputs they support:
 
-In Step 1, a `PCollection<BeamRecord>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamRecord>` is beyond the scope of Beam SQL DSL. 
+* `BeamSql.query()`, which may be applied to a single `PCollection`. The input collection must be referenced via the table name `PCOLLECTION` in the query: 
+  ```
+  PCollection<BeamRecord> filteredNames = testApps.apply(
+      BeamSql.query("SELECT appId, description, rowtime FROM PCOLLECTION WHERE id=1"));
+  ```
+* `BeamSql.queryMulti()`, which may be applied to a `PCollectionTuple` containing one or more tagged `PCollection<BeamRecord>`s. The tuple tag for each `PCollection` in the tuple defines the table name that may used to query it. Note that table names are bound to the specific `PCollectionTuple`, and thus are only valid in the context of queries applied to it.
+  ```
+  // Create a reviews PCollection to join to our apps PCollection.
+  BeamRecordSqlType reviewType = BeamRecordSqlType.create(
+    Arrays.asList("appId", "reviewerId", "rating", "rowtime"),
+    Arrays.asList(Types.INTEGER, Types.INTEGER, Types.FLOAT, Types.TIMESTAMP));
+  PCollection<BeamRecord> reviews = ... [records w/ reviewType schema] ...
 
-Step 2(Case 1) shows the usage to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is another example to run a query with `BeamSql.query()`. A Table name is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in your query.
+  // Compute the # of reviews and average rating per app via a JOIN.
+  PCollectionTuple namesAndFoods = PCollectionTuple.of(
+      new TupleTag<BeamRecord>("Apps"), apps),
+      new TupleTag<BeamRecord>("Reviews"), reviews));
+  PCollection<BeamRecord> output = namesAndFoods.apply(
+      BeamSql.queryMulti("SELECT Names.appId, COUNT(Reviews.rating), AVG(Reviews.rating)
+                          FROM Apps INNER JOIN Reviews ON Apps.appId == Reviews.appId"));
+  ```
+
+Both methods wrap the back-end details of parsing/validation/assembling, and deliver a Beam SDK style API that can express simple TABLE_FILTER queries up to complex queries containing JOIN/GROUP_BY etc. 
+
+[BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java) in the code repository shows basic usage of both APIs.
 
 # <a name="functionality"></a>3. Functionality in Beam SQL
 Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded `PCollection` as well. 
@@ -184,24 +248,7 @@ PCollection<BeamSqlRow> result =
     input.apply("udafExample",
         BeamSql.simpleQuery(sql).withUdaf("squaresum", new SquareSum()));
 ```
- 
-## <a name="beamsqlrow"></a>3.2. Intro of BeamRecord
-`BeamRecord`, described by `BeamRecordType`(extended `BeamRecordSqlType` in Beam SQL) and encoded/decoded by `BeamRecordCoder`, represents a single, immutable row in a Beam SQL `PCollection`. Similar as _row_ in relational database, each `BeamRecord` consists of named columns with fixed types(see [4.3. Data Types](#data-type)).
-
-A Beam SQL `PCollection` can be created from an external source, in-memory data or derive from another SQL query. For `PCollection`s from external source or in-memory data, it's required to specify coder explcitly; `PCollection` derived from SQL query has the coder set already. Below is one example:
-
-```
-//define the input row format
-List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
-List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
-BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
-BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
-
-//create a source PCollection with Create.of();
-PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(type.getRecordCoder()));
-```
- 
+  
 ## <a name="data-type"></a>3.3. Data Types
 Each type in Beam SQL maps to a Java class to holds the value in `BeamRecord`. The following table lists the relation between SQL types and Java classes, which are supported in current repository:
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.