You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2017/08/21 18:49:32 UTC

[beam-site] 03/10: add intro of BeamSqlRow, trigger settings of aggregation/join,

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit c8ae4c444af7ee37d1ce00383e1ef322ffbc8bb9
Author: mingmxu <mi...@ebay.com>
AuthorDate: Tue Jul 18 16:45:06 2017 -0700

    add intro of BeamSqlRow, trigger settings of aggregation/join,
    
    other misc spelling correction
---
 src/documentation/dsls/sql.md | 67 +++++++++++++++++++++++++++++++------------
 1 file changed, 49 insertions(+), 18 deletions(-)

diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md
index c9cb0f7..8834a93 100644
--- a/src/documentation/dsls/sql.md
+++ b/src/documentation/dsls/sql.md
@@ -9,13 +9,14 @@ permalink: /documentation/dsls/sql/
 * [3. Usage of DSL APIs](#usage)
 * [4. Functionality in Beam SQL](#functionality)
   * [4.1. Supported Features](#features)
-  * [4.2. Data Types](#data-type)
-  * [4.3. built-in SQL functions](#built-in-functions)
+  * [4.2. Intro of BeamSqlRow](#beamsqlrow)
+  * [4.3. Data Types](#data-type)
+  * [4.4. built-in SQL functions](#built-in-functions)
 
 This page describes the implementation of Beam SQL, and how to simplify a Beam pipeline with DSL APIs.
 
 # <a name="overview"></a>1. Overview
-SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverages [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline.
+SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline.
 
 # <a name="internal-of-sql"></a>2. The Internal of Beam SQL
 Figure 1 describes the back-end steps from a SQL statement to a Beam `PTransform`.
@@ -24,14 +25,14 @@ Figure 1 describes the back-end steps from a SQL statement to a Beam `PTransform
 
 **Figure 1** workflow of Beam SQL DSL
 
-Given a PCollection and the query as input, first of all the input PCollection is registered as a table in the schema repository. Then it's processed as:
+Given a `PCollection` and the query as input, first of all the input `PCollection` is registered as a table in the schema repository. Then it's processed as:
 
 1. SQL query is parsed according to grammar to generate a SQL Abstract Syntax Tree;
 2. Validate against table schema, and output a logical plan represented with relational algebras;
 3. Relational rules are applied to convert it to a physical plan, expressed with Beam components. An optimizer is optional to update the plan;
 4. Finally, the Beam physical plan is compiled as a composite `PTransform`;
 
-Here is an example to show a query that filters and projects from a input PCollection:
+Here is an example to show a query that filters and projects from a input `PCollection`:
 
 ```
 SELECT USER_ID, USER_NAME FROM PCOLLECTION WHERE USER_ID = 1
@@ -55,7 +56,7 @@ pCollection.apply(BeamSqlFilter...)
 # <a name="usage"></a>3. Usage of DSL APIs 
 The DSL interface (`BeamSql.query()` and `BeamSql.simpleQuery()`), is the only endpoint exposed to developers. It wraps the back-end details of parsing/validation/assembling, to deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. 
 
-*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s, and `BeamSql.simpleQuery()` is a simplified API which applies on single PCollections.
+*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s, and `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`.
 
 [BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java) in code repository shows the usage of both APIs:
 
@@ -77,10 +78,10 @@ PCollection<BeamSqlRow> outputStream2 =
 
 In Step 1, a `PCollection<BeamSqlRow>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamSqlRow>` is beyond the scope of Beam SQL DSL. 
 
-Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be aware that the input PCollection is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is an example to run a query with `BeamSql.query()`. A Table name used in the query is specified when adding PCollection to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all PCollections that would be used in your query.
+Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is an example to run a query with `BeamSql.query()`. A Table name used in the query is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in yo [...]
 
 # <a name="functionality"></a>4. Functionality in Beam SQL
-Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded PCollection as well. 
+Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded `PCollection` as well. 
 
 Note that, SQL support is not fully completed. Queries that include unsupported features would cause a UnsupportedOperationException.
 
@@ -106,7 +107,16 @@ SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, HOP(f_timestam
 SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, SESSION(f_timestamp, INTERVAL '5' MINUTE)
 ```
 
-Note: distinct aggregation is not supported yet.
+Note: 
+
+1. distinct aggregation is not supported yet.
+2. the default trigger is `Repeatedly.forever(AfterWatermark.pastEndOfWindow())`;
+3. when `time` field in `HOP(dateTime, slide, size [, time ])`/`TUMBLE(dateTime, interval [, time ])`/`SESSION(dateTime, interval [, time ])` is specified, a lateFiring trigger is added as 
+
+```
+Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
+        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
+```		
 
 **4. Join (inner, left_outer, right_outer);**
 
@@ -121,6 +131,7 @@ For case 1 and case 2, a standard join is utilized as long as the windowFn of th
 * Only equal-join is supported, CROSS JOIN is not supported;
 * FULL OUTER JOIN is not supported;
 * If it's a LEFT OUTER JOIN, the unbounded table should on the left side; If it's a RIGHT OUTER JOIN, the unbounded table should on the right side;
+* The trigger is inherented from upstream, which should be consistent;
 
 **5. built-in SQL functions**
 
@@ -136,8 +147,8 @@ A UDF can be any Java method that takes zero or more scalar fields, and return o
 /**
  * A example UDF for test.
  */
-public static class CubicInteger{
-  public static Integer cubic(Integer input){
+public static class CubicInteger implements BeamSqlUdf{
+  public static Integer eval(Integer input){
     return input * input * input;
   }
 }
@@ -146,7 +157,7 @@ public static class CubicInteger{
 String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
 PCollection<BeamSqlRow> result =
     input.apply("udfExample",
-        BeamSql.simpleQuery(sql).withUdf("cubic", CubicInteger.class, "cubic"));
+        BeamSql.simpleQuery(sql).withUdf("cubic", CubicInteger.class));
 ```
 
 **create and specify User Defined Aggregate Function (UDAF)**
@@ -168,17 +179,17 @@ public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
   public SquareSum() {
   }
   
-  // @Override
+  @Override
   public Integer init() {
     return 0;
   }
   
-  // @Override
+  @Override
   public Integer add(Integer accumulator, Integer input) {
     return accumulator + input * input;
   }
   
-  // @Override
+  @Override
   public Integer merge(Iterable<Integer> accumulators) {
     int v = 0;
     Iterator<Integer> ite = accumulators.iterator();
@@ -188,7 +199,7 @@ public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
     return v;
   }
 
-  // @Override
+  @Override
   public Integer result(Integer accumulator) {
     return accumulator;
   }
@@ -201,7 +212,27 @@ PCollection<BeamSqlRow> result =
         BeamSql.simpleQuery(sql).withUdaf("squaresum", SquareSum.class));
 ```
  
-## <a name="data-type"></a>4.2. Data Types
+## <a name="beamsqlrow"></a>4.2. Intro of BeamSqlRow
+`BeamSqlRow`, encoded/decoded by `BeamSqlRowCoder`, represents a single, implicitly structured data item in a Beam SQL compatible `PCollection`. Similar as _row_ in the context of relational database, each `BeamSqlRow` consists of named columns with fixed types(see [4.3. Data Types](#data-type)).
+
+A Beam SQL `PCollection` can be created from an external source, in-memory data or derive from another SQL query. For `PCollection`s from external source or in-memory data, it's required to specify coder explcitly; `PCollection` derived from SQL query has the coder set already. Below is one example:
+
+```
+//define the input row format
+List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
+BeamSqlRow row = new BeamSqlRow(type);
+row.addField(0, 1);
+row.addField(1, "row");
+row.addField(2, 1.0);
+
+//create a source PCollection with Create.of();
+PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
+    .withCoder(new BeamSqlRowCoder(type)));
+```
+ 
+## <a name="data-type"></a>4.3. Data Types
 Each type in Beam SQL maps to a Java class to holds the value in `BeamSqlRow`. The following table lists the relation between SQL types and Java classes, which are supported in current repository:
 
 | SQL Type | Java class |
@@ -217,7 +248,7 @@ Each type in Beam SQL maps to a Java class to holds the value in `BeamSqlRow`. T
 | Types.TIMESTAMP | java.util.Date |
 {:.table}
 
-## <a name="built-in-functions"></a>4.3. built-in SQL functions
+## <a name="built-in-functions"></a>4.4. built-in SQL functions
 
 Beam SQL has implemented lots of build-in functions defined in [Apache Calcite](http://calcite.apache.org). The available functions are listed as below:
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.