You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2018/03/21 05:40:26 UTC

[beam-site] branch asf-site updated (7e6ecc9 -> 789d0e2)

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


    from 7e6ecc9  Prepare repository for deployment.
     add 7bc10c7  Update SQL doc to match new APIs
     add 5e2de4d  This closes #397
     new 789d0e2  Prepare repository for deployment.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/documentation/dsls/sql/index.html | 462 +++++++++++++++++++-----------
 src/documentation/dsls/sql.md             | 337 +++++++++++++++-------
 2 files changed, 514 insertions(+), 285 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
mergebot-role@apache.org.

[beam-site] 01/01: Prepare repository for deployment.

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 789d0e25de0cbfeec1414b201fc5821ed90b94fb
Author: Mergebot <me...@apache.org>
AuthorDate: Tue Mar 20 22:40:24 2018 -0700

    Prepare repository for deployment.
---
 content/documentation/dsls/sql/index.html | 462 +++++++++++++++++++-----------
 1 file changed, 288 insertions(+), 174 deletions(-)

diff --git a/content/documentation/dsls/sql/index.html b/content/documentation/dsls/sql/index.html
index 8a002d5..cb2b147 100644
--- a/content/documentation/dsls/sql/index.html
+++ b/content/documentation/dsls/sql/index.html
@@ -132,7 +132,7 @@
   <li><a href="#overview">1. Overview</a></li>
   <li><a href="#usage">2. Usage of DSL APIs</a>
     <ul>
-      <li><a href="#beamrecord">BeamRecord</a></li>
+      <li><a href="#row">Row</a></li>
       <li><a href="#beamsql">BeamSql</a></li>
     </ul>
   </li>
@@ -152,128 +152,159 @@
       <div class="body__contained body__section-nav">
         <h1 id="beam-sql">Beam SQL</h1>
 
-<ul id="markdown-toc">
-  <li><a href="#overview" id="markdown-toc-overview">1. Overview</a></li>
-  <li><a href="#usage" id="markdown-toc-usage">2. Usage of DSL APIs</a>    <ul>
-      <li><a href="#beamrecord" id="markdown-toc-beamrecord">BeamRecord</a></li>
-      <li><a href="#beamsql" id="markdown-toc-beamsql">BeamSql</a></li>
-    </ul>
-  </li>
-  <li><a href="#functionality" id="markdown-toc-functionality">3. Functionality in Beam SQL</a>    <ul>
-      <li><a href="#features" id="markdown-toc-features">3.1. Supported Features</a></li>
-      <li><a href="#data-types" id="markdown-toc-data-types">3.2. Data Types</a></li>
-      <li><a href="#built-in-functions" id="markdown-toc-built-in-functions">3.3. Built-in SQL functions</a></li>
-    </ul>
-  </li>
-  <li><a href="#internals-of-sql" id="markdown-toc-internals-of-sql">4. Internals of Beam SQL</a></li>
-</ul>
-
 <p>This page describes the implementation of Beam SQL, and how to simplify a Beam pipeline with DSL APIs.</p>
 
 <h2 id="overview">1. Overview</h2>
 
-<p>SQL is a well-adopted standard to process data with concise syntax. With DSL APIs (currently available only in Java), now <code class="highlighter-rouge">PCollection</code>s can be queried with standard SQL statements, like a regular table. The DSL APIs leverage <a href="http://calcite.apache.org/">Apache Calcite</a> to parse and optimize SQL queries, then translate into a composite Beam <code class="highlighter-rouge">PTransform</code>. In this way, both SQL and normal Beam <code cla [...]
+<p>SQL is a well-adopted standard to process data with concise syntax. With DSL APIs (currently available only in Java), now <code class="highlighter-rouge">PCollections</code> can be queried with standard SQL statements, like a regular table. The DSL APIs leverage <a href="http://calcite.apache.org/">Apache Calcite</a> to parse and optimize SQL queries, then translate into a composite Beam <code class="highlighter-rouge">PTransform</code>. In this way, both SQL and normal Beam <code cla [...]
 
 <p>There are two main pieces to the SQL DSL API:</p>
 
 <ul>
-  <li><a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam/sdk/values/BeamRecord.html">BeamRecord</a>: a new data type used to define composite records (i.e., rows) that consist of multiple, named columns of primitive data types. All SQL DSL queries must be made against collections of type <code class="highlighter-rouge">PCollection&lt;BeamRecord&gt;</code>. Note that <code class="highlighter-rouge">BeamRecord</code> itself is not SQL-specific, however, and may also be u [...]
-  <li><a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam/sdk/extensions/sql/BeamSql.html">BeamSql</a>: the interface for creating <code class="highlighter-rouge">PTransforms</code> from SQL queries.</li>
+  <li><a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam/sdk/extensions/sql/BeamSql.html">BeamSql</a>: the interface for creating <code class="highlighter-rouge">PTransforms</code> from SQL queries;</li>
+  <li><a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam/sdk/values/Row.html">Row</a> contains named columns with corresponding data types. Beam SQL queries can be made only against collections of type <code class="highlighter-rouge">PCollection&lt;Row&gt;</code>;</li>
 </ul>
 
 <p>We’ll look at each of these below.</p>
 
 <h2 id="usage">2. Usage of DSL APIs</h2>
 
-<h3 id="beamrecord">BeamRecord</h3>
+<h3 id="row">Row</h3>
 
-<p>Before applying a SQL query to a <code class="highlighter-rouge">PCollection</code>, the data in the collection must be in <code class="highlighter-rouge">BeamRecord</code> format. A <code class="highlighter-rouge">BeamRecord</code> represents a single, immutable row in a Beam SQL <code class="highlighter-rouge">PCollection</code>. The names and types of the fields/columns in the record are defined by its associated <a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam [...]
+<p>Before applying a SQL query to a <code class="highlighter-rouge">PCollection</code>, the data in the collection must be in <code class="highlighter-rouge">Row</code> format. A <code class="highlighter-rouge">Row</code> represents a single, immutable record in a Beam SQL <code class="highlighter-rouge">PCollection</code>. The names and types of the fields/columns in the row are defined by its associated <a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam/sdk/values/Ro [...]
+For SQL queries, you should use the <a href="/documentation/sdks/javadoc/2.3.0/index.html?org/apache/beam/sdk/extensions/sql/RowSqlType.html">RowSqlType.builder()</a> to create <code class="highlighter-rouge">RowTypes</code>, it allows creating schemas with all supported SQL types (see <a href="#data-types">Data Types</a> for more details on supported primitive data types).</p>
 
-<p>A <code class="highlighter-rouge">PCollection&lt;BeamRecord&gt;</code> can be created explicitly or implicitly:</p>
+<p>A <code class="highlighter-rouge">PCollection&lt;Row&gt;</code> can be obtained multiple ways, for example:</p>
 
-<p>Explicitly:</p>
 <ul>
-  <li><strong>From in-memory data</strong> (typically for unit testing). In this case, the record type and coder must be specified explicitly:
-    <div class="highlighter-rouge"><pre class="highlight"><code>// Define the record type (i.e., schema).
-List&lt;String&gt; fieldNames = Arrays.asList("appId", "description", "rowtime");
-List&lt;Integer&gt; fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP);
-BeamRecordSqlType appType = BeamRecordSqlType.create(fieldNames, fieldTypes);
-
-// Create a concrete row with that type.
-BeamRecord row = new BeamRecord(nameType, 1, "Some cool app", new Date());
-
-//create a source PCollection containing only that row.
-PCollection&lt;BeamRecord&gt; testApps = PBegin
-    .in(p)
-    .apply(Create.of(row)
-                 .withCoder(nameType.getRecordCoder()));
+  <li>
+    <p><strong>From in-memory data</strong> (typically for unit testing).</p>
+
+    <p><strong>Note:</strong> you have to explicitly specify the <code class="highlighter-rouge">Row</code> coder. In this example we’re doing it by calling <code class="highlighter-rouge">Create.of(..).withCoder()</code>:</p>
+
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Define the record type (i.e., schema).</span>
+<span class="n">RowType</span> <span class="n">appType</span> <span class="o">=</span> 
+    <span class="n">RowSqlType</span>
+      <span class="o">.</span><span class="na">builder</span><span class="o">()</span>
+      <span class="o">.</span><span class="na">withIntegerField</span><span class="o">(</span><span class="s">"appId"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withVarcharField</span><span class="o">(</span><span class="s">"description"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withTimestampField</span><span class="o">(</span><span class="s">"rowtime"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">build</span><span class="o">();</span>
+
+<span class="c1">// Create a concrete row with that type.</span>
+<span class="n">Row</span> <span class="n">row</span> <span class="o">=</span> 
+    <span class="n">Row</span>
+      <span class="o">.</span><span class="na">withRowType</span><span class="o">(</span><span class="n">appType</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">addValues</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="s">"Some cool app"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Date</span><span class="o">())</span>
+      <span class="o">.</span><span class="na">build</span><span class="o">();</span>
+
+<span class="c1">// Create a source PCollection containing only that row</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">testApps</span> <span class="o">=</span> 
+    <span class="n">PBegin</span>
+      <span class="o">.</span><span class="na">in</span><span class="o">(</span><span class="n">p</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Create</span>
+                <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">row</span><span class="o">)</span>
+                <span class="o">.</span><span class="na">withCoder</span><span class="o">(</span><span class="n">appType</span><span class="o">.</span><span class="na">getRowCoder</span><span class="o">()));</span>
 </code></pre>
     </div>
   </li>
-  <li><strong>From a <code class="highlighter-rouge">PCollection&lt;T&gt;</code></strong> where <code class="highlighter-rouge">T</code> is not already a <code class="highlighter-rouge">BeamRecord</code>, by applying a <code class="highlighter-rouge">PTransform</code> that converts input records to <code class="highlighter-rouge">BeamRecord</code> format:
-    <div class="highlighter-rouge"><pre class="highlight"><code>// An example POJO class.
-class AppPojo {
-  ...
-  public final Integer appId;
-  public final String description;
-  public final Date timestamp;
-}
-
-// Acquire a collection of Pojos somehow.
-PCollection&lt;AppPojo&gt; pojos = ...
-
-// Convert them to BeamRecords with the same schema as defined above via a DoFn.
-PCollection&lt;BeamRecord&gt; apps = pojos.apply(
-    ParDo.of(new DoFn&lt;AppPojo, BeamRecord&gt;() {
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        c.output(new BeamRecord(appType, pojo.appId, pojo.description, pojo.timestamp));
-      }
-    }));
+  <li>
+    <p><strong>From a <code class="highlighter-rouge">PCollection&lt;T&gt;</code> of records of some other type</strong>  (i.e.  <code class="highlighter-rouge">T</code> is not already a <code class="highlighter-rouge">Row</code>), by applying a <code class="highlighter-rouge">ParDo</code> that converts input records to <code class="highlighter-rouge">Row</code> format.</p>
+
+    <p><strong>Note:</strong> you have to manually set the coder of the result by calling <code class="highlighter-rouge">setCoder(appType.getRowCoder())</code>:</p>
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// An example POJO class.</span>
+<span class="kd">class</span> <span class="nc">AppPojo</span> <span class="o">{</span>
+  <span class="n">Integer</span> <span class="n">appId</span><span class="o">;</span>
+  <span class="n">String</span> <span class="n">description</span><span class="o">;</span>
+  <span class="n">Date</span> <span class="n">timestamp</span><span class="o">;</span>
+<span class="o">}</span>
+
+<span class="c1">// Acquire a collection of POJOs somehow.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">AppPojo</span><span class="o">&gt;</span> <span class="n">pojos</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="c1">// Convert them to Rows with the same schema as defined above via a DoFn.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">apps</span> <span class="o">=</span> <span class="n">pojos</span>
+  <span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+      <span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">AppPojo</span><span class="o">,</span> <span class="n">Row</span><span class="o">&gt;()</span> <span class="o">{</span>
+        <span class="nd">@ProcessElement</span>
+        <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span>
+          <span class="c1">// Get the current POJO instance</span>
+          <span class="n">AppPojo</span> <span class="n">pojo</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">();</span>
+
+          <span class="c1">// Create a Row with the appType schema </span>
+          <span class="c1">// and values from the current POJO</span>
+          <span class="n">Row</span> <span class="n">appRow</span> <span class="o">=</span> 
+                <span class="n">Row</span>
+                  <span class="o">.</span><span class="na">withRowType</span><span class="o">(</span><span class="n">appType</span><span class="o">)</span>
+                  <span class="o">.</span><span class="na">addValues</span><span class="o">(</span>
+                    <span class="n">pojo</span><span class="o">.</span><span class="na">appId</span><span class="o">,</span> 
+                    <span class="n">pojo</span><span class="o">.</span><span class="na">description</span><span class="o">,</span> 
+                    <span class="n">pojo</span><span class="o">.</span><span class="na">timestamp</span><span class="o">)</span>
+                  <span class="o">.</span><span class="na">build</span><span class="o">();</span>
+
+          <span class="c1">// Output the Row representing the current POJO</span>
+          <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">appRow</span><span class="o">);</span>
+        <span class="o">}</span>
+      <span class="o">}))</span>
+  <span class="o">.</span><span class="na">setCoder</span><span class="o">(</span><span class="n">appType</span><span class="o">.</span><span class="na">getRowCoder</span><span class="o">());</span>
 </code></pre>
     </div>
   </li>
+  <li>
+    <p><strong>As an output of another <code class="highlighter-rouge">BeamSql</code> query</strong>. Details in the next section.</p>
+  </li>
 </ul>
 
-<p>Implicitly:</p>
-<ul>
-  <li><strong>As the result of a <code class="highlighter-rouge">BeamSql</code> <code class="highlighter-rouge">PTransform</code></strong> applied to a <code class="highlighter-rouge">PCollection&lt;BeamRecord&gt;</code> (details in the next section).</li>
-</ul>
-
-<p>Once you have a <code class="highlighter-rouge">PCollection&lt;BeamRecord&gt;</code> in hand, you may use the <code class="highlighter-rouge">BeamSql</code> APIs to apply SQL queries to it.</p>
+<p>Once you have a <code class="highlighter-rouge">PCollection&lt;Row&gt;</code> in hand, you may use the <code class="highlighter-rouge">BeamSql</code> APIs to apply SQL queries to it.</p>
 
 <h3 id="beamsql">BeamSql</h3>
 
-<p><code class="highlighter-rouge">BeamSql</code> provides two methods for generating a <code class="highlighter-rouge">PTransform</code> from a SQL query, both of which are equivalent except for the number of inputs they support:</p>
+<p><code class="highlighter-rouge">BeamSql.query(queryString)</code> method is the only API to create a <code class="highlighter-rouge">PTransform</code> from a string representation of the SQL query. You can apply this <code class="highlighter-rouge">PTransform</code> to either a single <code class="highlighter-rouge">PCollection</code> or a <code class="highlighter-rouge">PCollectionTuple</code> which holds multiple <code class="highlighter-rouge">PCollections</code>:</p>
 
 <ul>
-  <li><code class="highlighter-rouge">BeamSql.query()</code>, which may be applied to a single <code class="highlighter-rouge">PCollection</code>. The input collection must be referenced via the table name <code class="highlighter-rouge">PCOLLECTION</code> in the query:
-    <div class="highlighter-rouge"><pre class="highlight"><code>PCollection&lt;BeamRecord&gt; filteredNames = testApps.apply(
-    BeamSql.query("SELECT appId, description, rowtime FROM PCOLLECTION WHERE id=1"));
+  <li>when applying to a single <code class="highlighter-rouge">PCollection</code> it can be referenced via the table name <code class="highlighter-rouge">PCOLLECTION</code> in the query:
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">filteredNames</span> <span class="o">=</span> <span class="n">testApps</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+    <span class="n">BeamSql</span><span class="o">.</span><span class="na">query</span><span class="o">(</span>
+      <span class="s">"SELECT appId, description, rowtime "</span>
+        <span class="o">+</span> <span class="s">"FROM PCOLLECTION "</span>
+        <span class="o">+</span> <span class="s">"WHERE id=1"</span><span class="o">));</span>
 </code></pre>
     </div>
   </li>
-  <li><code class="highlighter-rouge">BeamSql.queryMulti()</code>, which may be applied to a <code class="highlighter-rouge">PCollectionTuple</code> containing one or more tagged <code class="highlighter-rouge">PCollection&lt;BeamRecord&gt;</code>s. The tuple tag for each <code class="highlighter-rouge">PCollection</code> in the tuple defines the table name that may used to query it. Note that table names are bound to the specific <code class="highlighter-rouge">PCollectionTuple</code>,  [...]
-    <div class="highlighter-rouge"><pre class="highlight"><code>// Create a reviews PCollection to join to our apps PCollection.
-BeamRecordSqlType reviewType = BeamRecordSqlType.create(
-  Arrays.asList("appId", "reviewerId", "rating", "rowtime"),
-  Arrays.asList(Types.INTEGER, Types.INTEGER, Types.FLOAT, Types.TIMESTAMP));
-PCollection&lt;BeamRecord&gt; reviews = ... [records w/ reviewType schema] ...
-
-// Compute the # of reviews and average rating per app via a JOIN.
-PCollectionTuple namesAndFoods = PCollectionTuple.of(
-    new TupleTag&lt;BeamRecord&gt;("Apps"), apps),
-    new TupleTag&lt;BeamRecord&gt;("Reviews"), reviews));
-PCollection&lt;BeamRecord&gt; output = namesAndFoods.apply(
-    BeamSql.queryMulti("SELECT Names.appId, COUNT(Reviews.rating), AVG(Reviews.rating)
-                        FROM Apps INNER JOIN Reviews ON Apps.appId == Reviews.appId"));
+  <li>
+    <p>when applying to a <code class="highlighter-rouge">PCollectionTuple</code>, the tuple tag for each <code class="highlighter-rouge">PCollection</code> in the tuple defines the table name that may be used to query it. Note that table names are bound to the specific <code class="highlighter-rouge">PCollectionTuple</code>, and thus are only valid in the context of queries applied to it.</p>
+
+    <p>For example, you can join two <code class="highlighter-rouge">PCollections</code>:</p>
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Create the schema for reviews</span>
+<span class="n">RowType</span> <span class="n">reviewType</span> <span class="o">=</span> 
+    <span class="n">RowSqlType</span><span class="o">.</span>
+      <span class="o">.</span><span class="na">withIntegerField</span><span class="o">(</span><span class="s">"appId"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withIntegerField</span><span class="o">(</span><span class="s">"reviewerId"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withFloatField</span><span class="o">(</span><span class="s">"rating"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withTimestampField</span><span class="o">(</span><span class="s">"rowtime"</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">build</span><span class="o">();</span>
+    
+<span class="c1">// Obtain the reviews records with this schema</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">reviewsRows</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="c1">// Create a PCollectionTuple containing both PCollections.</span>
+<span class="c1">// TupleTags IDs will be used as table names in the SQL query</span>
+<span class="n">PCollectionTuple</span> <span class="n">namesAndFoods</span> <span class="o">=</span> <span class="n">PCollectionTuple</span><span class="o">.</span><span class="na">of</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">TupleTag</span><span class="o">&lt;&gt;(</span><span class="s">"Apps"</span><span class="o">),</span> <span class="n">appsRows</span><span class="o">),</span> <span class="c1">// appsRows from the previous example</span>
+    <span class="k">new</span> <span class="n">TupleTag</span><span class="o">&lt;&gt;(</span><span class="s">"Reviews"</span><span class="o">),</span> <span class="n">reviewsRows</span><span class="o">));</span>
+
+<span class="c1">// Compute the total number of reviews </span>
+<span class="c1">// and average rating per app </span>
+<span class="c1">// by joining two PCollections</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">namesAndFoods</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+    <span class="n">BeamSql</span><span class="o">.</span><span class="na">query</span><span class="o">(</span>
+        <span class="s">"SELECT Names.appId, COUNT(Reviews.rating), AVG(Reviews.rating)"</span>
+            <span class="o">+</span> <span class="s">"FROM Apps INNER JOIN Reviews ON Apps.appId == Reviews.appId"</span><span class="o">));</span>
 </code></pre>
     </div>
   </li>
 </ul>
 
-<p>Both methods wrap the back-end details of parsing/validation/assembling, and deliver a Beam SDK style API that can express simple TABLE_FILTER queries up to complex queries containing JOIN/GROUP_BY etc.</p>
-
 <p><a href="https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java">BeamSqlExample</a> in the code repository shows basic usage of both APIs.</p>
 
 <h2 id="functionality">3. Functionality in Beam SQL</h2>
@@ -335,134 +366,217 @@ groupItem:
 
 <h3 id="features">3.1. Supported Features</h3>
 
-<p><strong>1. aggregations;</strong></p>
+<h4 id="features-aggregations">3.1.1 Aggregations</h4>
+
+<p>Major standard aggregation functions are supported:</p>
+<ul>
+  <li><code class="highlighter-rouge">COUNT</code></li>
+  <li><code class="highlighter-rouge">MAX</code></li>
+  <li><code class="highlighter-rouge">MIN</code></li>
+  <li><code class="highlighter-rouge">SUM</code></li>
+  <li><code class="highlighter-rouge">AVG</code></li>
+  <li><code class="highlighter-rouge">VAR_POP</code></li>
+  <li><code class="highlighter-rouge">VAR_SAMP</code></li>
+  <li><code class="highlighter-rouge">COVAR_POP</code></li>
+  <li><code class="highlighter-rouge">COVAR_SAMP</code></li>
+</ul>
 
-<p>Beam SQL supports aggregation functions with group_by in global_window, fixed_window, sliding_window and session_window. A field with type <code class="highlighter-rouge">TIMESTAMP</code> is required to specify fixed_window/sliding_window/session_window. The field is used as event timestamp for rows. See below for several examples:</p>
+<p><strong>Note:</strong> <code class="highlighter-rouge">DISTINCT</code> aggregation is not supported yet.</p>
 
-<div class="highlighter-rouge"><pre class="highlight"><code>//fixed window, one hour in duration
-SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, TUMBLE(f_timestamp, INTERVAL '1' HOUR)
+<h4 id="features-windowing">3.1.2 Windowing</h4>
 
-//sliding window, one hour in duration and 30 minutes period
-SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)
+<p>Beam SQL supports windowing functions specified in <code class="highlighter-rouge">GROUP BY</code> clause. <code class="highlighter-rouge">TIMESTAMP</code> field is required in this case. It is used as event timestamp for rows.</p>
 
-//session window, with 5 minutes gap duration
-SELECT f_int, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int, SESSION(f_timestamp, INTERVAL '5' MINUTE)
+<p>Supported windowing functions:</p>
+<ul>
+  <li><code class="highlighter-rouge">TUMBLE</code>, or fixed windows. Example of how define a fixed window with duration of 1 hour:
+    <div class="highlighter-rouge"><pre class="highlight"><code>  SELECT f_int, COUNT(*) 
+  FROM PCOLLECTION 
+  GROUP BY 
+    f_int,
+    TUMBLE(f_timestamp, INTERVAL '1' HOUR)
 </code></pre>
-</div>
+    </div>
+  </li>
+  <li><code class="highlighter-rouge">HOP</code>, or sliding windows. Example of how to define a sliding windows for every 30 minutes with 1 hour duration:
+    <div class="highlighter-rouge"><pre class="highlight"><code>  SELECT f_int, COUNT(*)
+  FROM PCOLLECTION 
+  GROUP BY 
+    f_int, 
+    HOP(f_timestamp, INTERVAL '30' MINUTE, INTERVAL '1' HOUR)
+</code></pre>
+    </div>
+  </li>
+  <li><code class="highlighter-rouge">SESSION</code>, session windows. Example of how to define a session window with 5 minutes gap duration:
+    <div class="highlighter-rouge"><pre class="highlight"><code>  SELECT f_int, COUNT(*) 
+  FROM PCOLLECTION 
+  GROUP BY 
+    f_int, 
+    SESSION(f_timestamp, INTERVAL '5' MINUTE)
+</code></pre>
+    </div>
+  </li>
+</ul>
 
-<p>Note:</p>
+<p><strong>Note:</strong> when no windowing function is specified in the query, then windowing strategy of the input <code class="highlighter-rouge">PCollections</code> is unchanged by the SQL query. If windowing function is specified in the query, then the windowing function of the <code class="highlighter-rouge">PCollection</code> is updated accordingly, but trigger stays unchanged.</p>
 
-<ol>
-  <li>distinct aggregation is not supported yet.</li>
-  <li>the default trigger is <code class="highlighter-rouge">Repeatedly.forever(AfterWatermark.pastEndOfWindow())</code>;</li>
-  <li>when <code class="highlighter-rouge">time</code> field in <code class="highlighter-rouge">HOP(dateTime, slide, size [, time ])</code>/<code class="highlighter-rouge">TUMBLE(dateTime, interval [, time ])</code>/<code class="highlighter-rouge">SESSION(dateTime, interval [, time ])</code> is specified, a lateFiring trigger is added as</li>
-</ol>
+<h4 id="features-joins">3.1.3 Joins</h4>
 
-<div class="highlighter-rouge"><pre class="highlight"><code>Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
-        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
-</code></pre>
-</div>
+<h4 id="3131-overview">3.1.3.1 Overview</h4>
+
+<p>Supported <code class="highlighter-rouge">JOIN</code> types in Beam SQL:</p>
+<ul>
+  <li><code class="highlighter-rouge">INNER</code>, <code class="highlighter-rouge">LEFT OUTER</code>, <code class="highlighter-rouge">RIGHT OUTER</code>;</li>
+  <li>Only equijoins (where join condition is an equality check) are supported.</li>
+</ul>
 
-<p><strong>2. Join (inner, left_outer, right_outer);</strong></p>
+<p>Unsupported <code class="highlighter-rouge">JOIN</code> types in Beam SQL:</p>
+<ul>
+  <li><code class="highlighter-rouge">CROSS JOIN</code> is not supported (full cartesian product with no <code class="highlighter-rouge">ON</code> clause);</li>
+  <li><code class="highlighter-rouge">FULL OUTER JOIN</code> is not supported (combination of <code class="highlighter-rouge">LEFT OUTER</code> and <code class="highlighter-rouge">RIGHT OUTER</code> joins);</li>
+</ul>
 
 <p>The scenarios of join can be categorized into 3 cases:</p>
 
 <ol>
-  <li>BoundedTable JOIN BoundedTable</li>
-  <li>UnboundedTable JOIN UnboundedTable</li>
-  <li>BoundedTable JOIN UnboundedTable</li>
+  <li>Bounded input <code class="highlighter-rouge">JOIN</code> bounded input;</li>
+  <li>Unbounded input <code class="highlighter-rouge">JOIN</code> unbounded input;</li>
+  <li>Unbounded input <code class="highlighter-rouge">JOIN</code> bounded input;</li>
 </ol>
 
-<p>For case 1 and case 2, a standard join is utilized as long as the windowFn of the both sides match. For case 3, sideInput is utilized to implement the join. So far there are some constraints:</p>
+<p>Each of these scenarios is described below:</p>
+
+<h4 id="join-bounded-bounded">3.1.3.1 Bounded JOIN Bounded</h4>
+
+<p>Standard join implementation is used. All elements from one input are matched with all elements from another input. Due to the fact that both inputs are bounded, no windowing or triggering is involved.</p>
+
+<h4 id="join-unbounded-unbounded">3.1.3.2 Unbounded JOIN Unbounded</h4>
+
+<p>Standard join implementation is used. All elements from one input are matched with all elements from another input.</p>
+
+<p><strong>Windowing and Triggering</strong></p>
+
+<p>Following properties must be satisfied when joining unbounded inputs:</p>
+<ul>
+  <li>inputs must have compatible windows, otherwise <code class="highlighter-rouge">IllegalArgumentException</code> will be thrown;</li>
+  <li>triggers on each input should only fire once per window. Currently this means that the only supported trigger in this case is <code class="highlighter-rouge">DefaultTrigger</code> with zero allowed lateness. Using any other trigger will result in <code class="highlighter-rouge">UnsupportedOperationException</code> thrown;</li>
+</ul>
+
+<p>This means that inputs are joined per-window. That is, when the trigger fires (only once), then join is performed on all elements in the current window in both inputs. This allows to reason about what kind of output is going to be produced.</p>
+
+<p><strong>Note:</strong> similarly to <code class="highlighter-rouge">GroupByKeys</code> <code class="highlighter-rouge">JOIN</code> will update triggers using <code class="highlighter-rouge">Trigger.continuationTrigger()</code>. Other aspects of the inputs’ windowing strategies remain unchanged.</p>
+
+<h4 id="join-unbounded-bounded">3.1.3.3 Unbounded JOIN Bounded</h4>
+
+<p>For this type of <code class="highlighter-rouge">JOIN</code> bounded input is treated as a side-input by the implementation.</p>
+
+<p>This means that</p>
 
 <ul>
-  <li>Only equal-join is supported, CROSS JOIN is not supported;</li>
-  <li>FULL OUTER JOIN is not supported;</li>
-  <li>If it’s a LEFT OUTER JOIN, the unbounded table should on the left side; If it’s a RIGHT OUTER JOIN, the unbounded table should on the right side;</li>
   <li>window/trigger is inherented from upstreams, which should be consistent;</li>
 </ul>
 
-<p><strong>3. User Defined Function (UDF) and User Defined Aggregate Function (UDAF);</strong></p>
+<h4 id="features-udfs-udafs">3.1.4 User Defined Function (UDF) and User Defined Aggregate Function (UDAF)</h4>
 
 <p>If the required function is not available, developers can register their own UDF(for scalar function) and UDAF(for aggregation function).</p>
 
-<p><strong>create and specify User Defined Function (UDF)</strong></p>
+<h5 id="3141-create-and-specify-user-defined-function-udf"><strong>3.1.4.1 Create and specify User Defined Function (UDF)</strong></h5>
 
 <p>A UDF can be 1) any Java method that takes zero or more scalar fields and return one scalar value, or 2) a <code class="highlighter-rouge">SerializableFunction</code>. Below is an example of UDF and how to use it in DSL:</p>
 
-<div class="highlighter-rouge"><pre class="highlight"><code>/**
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="cm">/**
  * A example UDF for test.
- */
-public static class CubicInteger implements BeamSqlUdf{
-  public static Integer eval(Integer input){
-    return input * input * input;
-  }
-}
-
-/**
+ */</span>
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CubicInteger</span> <span class="kd">implements</span> <span class="n">BeamSqlUdf</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="kd">static</span> <span class="n">Integer</span> <span class="nf">eval</span><span class="o">(</span><span class="n">Integer</span> <span class="n">input</span><span class="o">){</span>
+    <span class="k">return</span> <span class="n">input</span> <span class="o">*</span> <span class="n">input</span> <span class="o">*</span> <span class="n">input</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="cm">/**
  * Another example UDF with {@link SerializableFunction}.
- */
-public static class CubicIntegerFn implements SerializableFunction&lt;Integer, Integer&gt; {
-  @Override
-  public Integer apply(Integer input) {
-    return input * input * input;
-  }
-}
-
-// register and call in SQL
-String sql = "SELECT f_int, cubic1(f_int) as cubicvalue1, cubic2(f_int) as cubicvalue2 FROM PCOLLECTION WHERE f_int = 2";
-PCollection&lt;BeamSqlRow&gt; result =
-    input.apply("udfExample",
-        BeamSql.simpleQuery(sql).withUdf("cubic1", CubicInteger.class)
-		                        .withUdf("cubic2", new CubicIntegerFn()));
+ */</span>
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CubicIntegerFn</span> <span class="kd">implements</span> <span class="n">SerializableFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">apply</span><span class="o">(</span><span class="n">Integer</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">input</span> <span class="o">*</span> <span class="n">input</span> <span class="o">*</span> <span class="n">input</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// Define a SQL query which calls the above UDFs</span>
+<span class="n">String</span> <span class="n">sql</span> <span class="o">=</span> 
+    <span class="s">"SELECT f_int, cubic1(f_int), cubic2(f_int)"</span>
+      <span class="o">+</span> <span class="s">"FROM PCOLLECTION "</span>
+      <span class="o">+</span> <span class="s">"WHERE f_int = 2"</span><span class="o">;</span>
+
+<span class="c1">// Create and apply the PTransform representing the query.</span>
+<span class="c1">// Register the UDFs used in the query by calling '.registerUdf()' with </span>
+<span class="c1">// either a class which implements BeamSqlUdf or with </span>
+<span class="c1">// an instance of the SerializableFunction;</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">BeamSqlRow</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span>
+    <span class="n">input</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+        <span class="s">"udfExample"</span><span class="o">,</span>
+        <span class="n">BeamSql</span>
+            <span class="o">.</span><span class="na">query</span><span class="o">(</span><span class="n">sql</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">registerUdf</span><span class="o">(</span><span class="s">"cubic1"</span><span class="o">,</span> <span class="n">CubicInteger</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">registerUdf</span><span class="o">(</span><span class="s">"cubic2"</span><span class="o">,</span> <span class="k">new</span> <span class="n">CubicIntegerFn</span><span class="o">())</span>
 </code></pre>
 </div>
 
-<p><strong>create and specify User Defined Aggregate Function (UDAF)</strong></p>
+<h5 id="3142-create-and-specify-user-defined-aggregate-function-udaf"><strong>3.1.4.2 Create and specify User Defined Aggregate Function (UDAF)</strong></h5>
 
-<p>Beam SQL can accept a <code class="highlighter-rouge">CombineFn</code> as UDAF. Here’s an example of UDAF:</p>
+<p>Beam SQL can accept a <code class="highlighter-rouge">CombineFn</code> as UDAF. Registration is similar to the UDF example above:</p>
 
-<div class="highlighter-rouge"><pre class="highlight"><code>/**
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="cm">/**
  * UDAF(CombineFn) for test, which returns the sum of square.
- */
-public static class SquareSum extends CombineFn&lt;Integer, Integer, Integer&gt; {
-  @Override
-  public Integer createAccumulator() {
-    return 0;
-  }
-
-  @Override
-  public Integer addInput(Integer accumulator, Integer input) {
-    return accumulator + input * input;
-  }
-
-  @Override
-  public Integer mergeAccumulators(Iterable&lt;Integer&gt; accumulators) {
-    int v = 0;
-    Iterator&lt;Integer&gt; ite = accumulators.iterator();
-    while (ite.hasNext()) {
-      v += ite.next();
-    }
-    return v;
-  }
-
-  @Override
-  public Integer extractOutput(Integer accumulator) {
-    return accumulator;
-  }
-
-}
-
-//register and call in SQL
-String sql = "SELECT f_int1, squaresum(f_int2) AS `squaresum` FROM PCOLLECTION GROUP BY f_int2";
-PCollection&lt;BeamSqlRow&gt; result =
-    input.apply("udafExample",
-        BeamSql.simpleQuery(sql).withUdaf("squaresum", new SquareSum()));
+ */</span>
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">SquareSum</span> <span class="kd">extends</span> <span class="n">CombineFn</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">createAccumulator</span><span class="o">()</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="mi">0</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">addInput</span><span class="o">(</span><span class="n">Integer</span> <span class="n">accumulator</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">accumulator</span> <span class="o">+</span> <span class="n">input</span> <span class="o">*</span> <span class="n">input</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">mergeAccumulators</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">accumulators</span><span class="o">)</span> <span class="o">{</span>
+    <span class="kt">int</span> <span class="n">v</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">ite</span> <span class="o">=</span> <span class="n">accumulators</span><span class="o">.</span><span class="na">iterator</span><span class="o">();</span>
+    <span class="k">while</span> <span class="o">(</span><span class="n">ite</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+      <span class="n">v</span> <span class="o">+=</span> <span class="n">ite</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
+    <span class="o">}</span>
+    <span class="k">return</span> <span class="n">v</span><span class="o">;</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">extractOutput</span><span class="o">(</span><span class="n">Integer</span> <span class="n">accumulator</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">accumulator</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// Define a SQL query which calls the above UDAF</span>
+<span class="n">String</span> <span class="n">sql</span> <span class="o">=</span> 
+    <span class="s">"SELECT f_int1, squaresum(f_int2) "</span>
+      <span class="o">+</span> <span class="s">"FROM PCOLLECTION "</span>
+      <span class="o">+</span> <span class="s">"GROUP BY f_int2"</span><span class="o">;</span>
+      
+<span class="c1">// Create and apply the PTransform representing the query.</span>
+<span class="c1">// Register the UDAFs used in the query by calling '.registerUdaf()' by </span>
+<span class="c1">// providing it an instance of the CombineFn</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">BeamSqlRow</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span>
+    <span class="n">input</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
+        <span class="s">"udafExample"</span><span class="o">,</span>
+        <span class="n">BeamSql</span>
+            <span class="o">.</span><span class="na">query</span><span class="o">(</span><span class="n">sql</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">registerUdaf</span><span class="o">(</span><span class="s">"squaresum"</span><span class="o">,</span> <span class="k">new</span> <span class="n">SquareSum</span><span class="o">()));</span>
 </code></pre>
 </div>
 
 <h3 id="data-types">3.2. Data Types</h3>
-<p>Each type in Beam SQL maps to a Java class to holds the value in <code class="highlighter-rouge">BeamRecord</code>. The following table lists the relation between SQL types and Java classes, which are supported in current repository:</p>
+<p>Each type in Beam SQL maps to a Java class to holds the value in <code class="highlighter-rouge">Row</code>. The following table lists the relation between SQL types and Java classes, which are supported in current repository:</p>
 
 <table class="table">
   <thead>

-- 
To stop receiving notification emails like this one, please contact
mergebot-role@apache.org.