You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2017/08/21 18:49:37 UTC

[beam-site] 08/10: update to latest API/usages

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 2a52cb3af842addcb08e554d21671f9feb1d58a4
Author: mingmxu <mi...@ebay.com>
AuthorDate: Thu Aug 17 17:16:05 2017 -0700

    update to latest API/usages
---
 src/_posts/2017-07-21-sql-dsl.md |  33 +++++++------
 src/documentation/dsls/sql.md    | 102 ++++++++++++++++++++-------------------
 2 files changed, 70 insertions(+), 65 deletions(-)

diff --git a/src/_posts/2017-07-21-sql-dsl.md b/src/_posts/2017-07-21-sql-dsl.md
index 4aa97d1..95df3f8 100644
--- a/src/_posts/2017-07-21-sql-dsl.md
+++ b/src/_posts/2017-07-21-sql-dsl.md
@@ -13,42 +13,45 @@ Beam SQL DSL provides the capability to execute standard SQL queries using Beam
 <!--more-->
 
 # <a name="overview"></a>1. Overview
-SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate into a composite Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in the same pipeline.
+SQL is a well-adopted standard to process data with concise syntax. With DSL APIs, now `PCollection`s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage [Apache Calcite](http://calcite.apache.org/) to parse and optimize SQL queries, then translate it into a _composite_ Beam `PTransform`. In this way, both SQL and normal Beam `PTransform`s can be mixed in one pipeline.
 
 # <a name="usage"></a>2. Usage of DSL APIs 
-The DSL interface (`BeamSql.query()` and `BeamSql.simpleQuery()`), is the only endpoint exposed to developers. It wraps the back-end details of parsing/validation/assembling, to deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. 
+`BeamSql` is the only interface(with two methods `BeamSql.query()` and `BeamSql.simpleQuery()`) for developers. It wraps the back-end details of parsing/validation/assembling, and deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. 
 
-*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s, and `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`.
+*Note*, the two methods are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s; `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`.
 
 [BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java) in code repository shows the usage of both APIs:
 
 ```
 //Step 1. create a source PCollection with Create.of();
-PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(new BeamSqlRowCoder(type)));
+BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
+...
+
+PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row...)
+    .withCoder(type.getRecordCoder()));
 
 //Step 2. (Case 1) run a simple SQL query over input PCollection with BeamSql.simpleQuery;
-PCollection<BeamSqlRow> outputStream = inputTable.apply(
-    BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));
+PCollection<BeamRecord> outputStream = inputTable.apply(
+    BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
 
 
-//Step 2. (Case 2) run the query with BeamSql.query
-PCollection<BeamSqlRow> outputStream2 =
-    PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable)
-        .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1"));
+//Step 2. (Case 2) run the query with BeamSql.query over result PCollection of (case 1);
+PCollection<BeamRecord> outputStream2 =
+    PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
+        .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
 ```
 
-In Step 1, a `PCollection<BeamSqlRow>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamSqlRow>` is beyond the scope of Beam SQL DSL. 
+In Step 1, a `PCollection<BeamRecord>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamRecord>` is beyond the scope of Beam SQL DSL. 
 
-Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is an example to run a query with `BeamSql.query()`. A Table name used in the query is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in yo [...]
+Step 2(Case 1) shows the usage to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is another example to run a query with `BeamSql.query()`. A Table name is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in your query.
 
 # <a name="functionality"></a>4. Functionality in Beam SQL
 Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded `PCollection` as well. 
 
-Note that, SQL support is not fully completed. Queries that include unsupported features would cause a UnsupportedOperationException.
+Note that, SQL support is not fully completed. Queries that include unsupported features would cause an `UnsupportedOperationException`.
 
 ## <a name="features"></a>4.1. Supported Features
-The following features are supported in current repository (this chapter will be updated continually).
+The following features are supported in current repository:
 
 1. filter clauses;
 2. data field projections;
diff --git a/src/documentation/dsls/sql.md b/src/documentation/dsls/sql.md
index 34f26ae..bfcbd1c 100644
--- a/src/documentation/dsls/sql.md
+++ b/src/documentation/dsls/sql.md
@@ -54,39 +54,42 @@ pCollection.apply(BeamSqlFilter...)
 ```
 
 # <a name="usage"></a>3. Usage of DSL APIs 
-The DSL interface (`BeamSql.query()` and `BeamSql.simpleQuery()`), is the only endpoint exposed to developers. It wraps the back-end details of parsing/validation/assembling, to deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. 
+`BeamSql` is the only interface(with two methods `BeamSql.query()` and `BeamSql.simpleQuery()`) for developers. It wraps the back-end details of parsing/validation/assembling, and deliver a Beam SDK style API that can take either simple TABLE_FILTER queries or complex queries containing JOIN/GROUP_BY etc. 
 
-*Note*, the two APIs are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s, and `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`.
+*Note*, the two methods are equivalent in functionality, `BeamSql.query()` applies on a `PCollectionTuple` with one or many input `PCollection`s; `BeamSql.simpleQuery()` is a simplified API which applies on single `PCollection`.
 
 [BeamSqlExample](https://github.com/apache/beam/blob/DSL_SQL/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java) in code repository shows the usage of both APIs:
 
 ```
 //Step 1. create a source PCollection with Create.of();
-PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(new BeamSqlRowCoder(type)));
+BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
+...
+
+PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row...)
+    .withCoder(type.getRecordCoder()));
 
 //Step 2. (Case 1) run a simple SQL query over input PCollection with BeamSql.simpleQuery;
-PCollection<BeamSqlRow> outputStream = inputTable.apply(
-    BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));
+PCollection<BeamRecord> outputStream = inputTable.apply(
+    BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
 
 
-//Step 2. (Case 2) run the query with BeamSql.query
-PCollection<BeamSqlRow> outputStream2 =
-    PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_B"), inputTable)
-        .apply(BeamSql.query("select c2, c3 from TABLE_B where c1=1"));
+//Step 2. (Case 2) run the query with BeamSql.query over result PCollection of (case 1);
+PCollection<BeamRecord> outputStream2 =
+    PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
+        .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2"));
 ```
 
-In Step 1, a `PCollection<BeamSqlRow>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamSqlRow>` is beyond the scope of Beam SQL DSL. 
+In Step 1, a `PCollection<BeamRecord>` is prepared as the source dataset. The work to generate a queriable `PCollection<BeamRecord>` is beyond the scope of Beam SQL DSL. 
 
-Step 2(Case 1) shows the way to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is an example to run a query with `BeamSql.query()`. A Table name used in the query is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in yo [...]
+Step 2(Case 1) shows the usage to run a query with `BeamSql.simpleQuery()`, be aware that the input `PCollection` is named with a fixed table name __PCOLLECTION__. Step 2(Case 2) is another example to run a query with `BeamSql.query()`. A Table name is specified when adding `PCollection` to `PCollectionTuple`. As each call of either `BeamSql.query()` or `BeamSql.simpleQuery()` has its own schema repository, developers need to include all `PCollection`s that would be used in your query.
 
 # <a name="functionality"></a>4. Functionality in Beam SQL
 Just as the unified model for both bounded and unbounded data in Beam, SQL DSL provides the same functionalities for bounded and unbounded `PCollection` as well. 
 
-Note that, SQL support is not fully completed. Queries that include unsupported features would cause a UnsupportedOperationException.
+Note that, SQL support is not fully completed. Queries that include unsupported features would cause an `UnsupportedOperationException`.
 
 ## <a name="features"></a>4.1. Supported Features
-The following features are supported in current repository (this chapter will be updated continually).
+The following features are supported in current repository:
 
 **1. filter clauses;**
 
@@ -94,7 +97,7 @@ The following features are supported in current repository (this chapter will be
 
 **3. aggregations;**
 
-Beam SQL supports aggregation functions COUNT/SUM/MAX/MIN/AVG with group_by in global_window, fixed_window, sliding_window and session_window. A field with type `TIMESTAMP` is required to specify fixed_window/sliding_window/session window, which is used as event timestamp for rows. See below for several examples:
+Beam SQL supports aggregation functions with group_by in global_window, fixed_window, sliding_window and session_window. A field with type `TIMESTAMP` is required to specify fixed_window/sliding_window/session_window. The field is used as event timestamp for rows. See below for several examples:
 
 ```
 //fixed window, one hour in duration
@@ -126,12 +129,12 @@ The scenarios of join can be categorized into 3 cases:
 2. UnboundedTable JOIN UnboundedTable
 3. BoundedTable JOIN UnboundedTable
 
-For case 1 and case 2, a standard join is utilized as long as the windowFn of the both sides match. For case 3, sideInput is utilized to implement the join. So there are some constraints:
+For case 1 and case 2, a standard join is utilized as long as the windowFn of the both sides match. For case 3, sideInput is utilized to implement the join. So far there are some constraints:
 
 * Only equal-join is supported, CROSS JOIN is not supported;
 * FULL OUTER JOIN is not supported;
 * If it's a LEFT OUTER JOIN, the unbounded table should on the left side; If it's a RIGHT OUTER JOIN, the unbounded table should on the right side;
-* The trigger is inherented from upstream, which should be consistent;
+* window/trigger is inherented from upstreams, which should be consistent;
 
 **5. built-in SQL functions**
 
@@ -141,7 +144,7 @@ If the required function is not available, developers can register their own UDF
 
 **create and specify User Defined Function (UDF)**
 
-A UDF can be any Java method that takes zero or more scalar fields, and return one scalar value. Below is an example of UDF and how to use it in DSL:
+A UDF can be 1) any Java method that takes zero or more scalar fields and return one scalar value, or 2) a `SerializableFunction`. Below is an example of UDF and how to use it in DSL:
 
 ```
 /**
@@ -153,44 +156,45 @@ public static class CubicInteger implements BeamSqlUdf{
   }
 }
 
+/**
+ * Another example UDF with {@link SerializableFunction}.
+ */
+public static class CubicIntegerFn implements SerializableFunction<Integer, Integer> {
+  @Override
+  public Integer apply(Integer input) {
+    return input * input * input;
+  }
+}
+
 // register and call in SQL
-String sql = "SELECT f_int, cubic(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
+String sql = "SELECT f_int, cubic1(f_int) as cubicvalue1, cubic2(f_int) as cubicvalue2 FROM PCOLLECTION WHERE f_int = 2";
 PCollection<BeamSqlRow> result =
     input.apply("udfExample",
-        BeamSql.simpleQuery(sql).withUdf("cubic", CubicInteger.class));
+        BeamSql.simpleQuery(sql).withUdf("cubic1", CubicInteger.class)
+		                        .withUdf("cubic2", new CubicIntegerFn()));
 ```
 
 **create and specify User Defined Aggregate Function (UDAF)**
 
-A UDAF aggregates a set of grouped scalar values, and output a single scalar value. To create a UDAF function, it's required to extend `org.apache.beam.dsls.sql.schema.BeamSqlUdaf<InputT, AccumT, OutputT>`, which defines 4 methods to process an aggregation:
-
-1. init(), to create an initial accumulate value;
-2. add(), to apply a new value to existing accumulate value;
-3. merge(), to merge accumulate values from parallel operators;
-4. result(), to generate the final result from accumulate value;
-
-Here's an example of UDAF:
+Beam SQL can accept a `CombineFn` as UDAF. Here's an example of UDAF:
 
 ```
 /**
- * UDAF for test, which returns the sum of square.
+ * UDAF(CombineFn) for test, which returns the sum of square.
  */
-public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
-  public SquareSum() {
-  }
-  
+public static class SquareSum extends CombineFn<Integer, Integer, Integer> {
   @Override
-  public Integer init() {
+  public Integer createAccumulator() {
     return 0;
   }
-  
+
   @Override
-  public Integer add(Integer accumulator, Integer input) {
+  public Integer addInput(Integer accumulator, Integer input) {
     return accumulator + input * input;
   }
-  
+
   @Override
-  public Integer merge(Iterable<Integer> accumulators) {
+  public Integer mergeAccumulators(Iterable<Integer> accumulators) {
     int v = 0;
     Iterator<Integer> ite = accumulators.iterator();
     while (ite.hasNext()) {
@@ -200,20 +204,21 @@ public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> {
   }
 
   @Override
-  public Integer result(Integer accumulator) {
+  public Integer extractOutput(Integer accumulator) {
     return accumulator;
   }
+
 }
 
 //register and call in SQL
 String sql = "SELECT f_int1, squaresum(f_int2) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2";
 PCollection<BeamSqlRow> result =
     input.apply("udafExample",
-        BeamSql.simpleQuery(sql).withUdaf("squaresum", SquareSum.class));
+        BeamSql.simpleQuery(sql).withUdaf("squaresum", new SquareSum()));
 ```
  
-## <a name="beamsqlrow"></a>4.2. Intro of BeamSqlRow
-`BeamSqlRow`, encoded/decoded by `BeamSqlRowCoder`, represents a single, implicitly structured data item in a Beam SQL compatible `PCollection`. Similar as _row_ in the context of relational database, each `BeamSqlRow` consists of named columns with fixed types(see [4.3. Data Types](#data-type)).
+## <a name="beamsqlrow"></a>4.2. Intro of BeamRecord
+`BeamRecord`, described by `BeamRecordType`(extended `BeamRecordSqlType` in Beam SQL) and encoded/decoded by `BeamRecordCoder`, represents a single, immutable row in a Beam SQL `PCollection`. Similar as _row_ in relational database, each `BeamRecord` consists of named columns with fixed types(see [4.3. Data Types](#data-type)).
 
 A Beam SQL `PCollection` can be created from an external source, in-memory data or derive from another SQL query. For `PCollection`s from external source or in-memory data, it's required to specify coder explcitly; `PCollection` derived from SQL query has the coder set already. Below is one example:
 
@@ -221,19 +226,16 @@ A Beam SQL `PCollection` can be created from an external source, in-memory data
 //define the input row format
 List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
 List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
-BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes);
-BeamSqlRow row = new BeamSqlRow(type);
-row.addField(0, 1);
-row.addField(1, "row");
-row.addField(2, 1.0);
+BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
+BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
 
 //create a source PCollection with Create.of();
-PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
-    .withCoder(new BeamSqlRowCoder(type)));
+PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
+    .withCoder(type.getRecordCoder()));
 ```
  
 ## <a name="data-type"></a>4.3. Data Types
-Each type in Beam SQL maps to a Java class to holds the value in `BeamSqlRow`. The following table lists the relation between SQL types and Java classes, which are supported in current repository:
+Each type in Beam SQL maps to a Java class to holds the value in `BeamRecord`. The following table lists the relation between SQL types and Java classes, which are supported in current repository:
 
 | SQL Type | Java class |
 | ---- | ---- |

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.