You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/02/21 19:02:30 UTC

[1/2] calcite git commit: Add Orinoco schema (streaming retail data), accessible from Quidem scripts

Repository: calcite
Updated Branches:
  refs/heads/master f4b4ee115 -> 884f01066


Add Orinoco schema (streaming retail data), accessible from Quidem scripts


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/cbbaddf5
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/cbbaddf5
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/cbbaddf5

Branch: refs/heads/master
Commit: cbbaddf5449d7547e7550d08caaadf36537cd7fd
Parents: f4b4ee1
Author: Julian Hyde <jh...@apache.org>
Authored: Fri Feb 19 13:38:00 2016 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sun Feb 21 01:42:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/calcite/test/CalciteAssert.java  |  9 +++-
 .../java/org/apache/calcite/test/JdbcTest.java  |  6 +++
 .../org/apache/calcite/test/StreamTest.java     | 56 ++++++++++++++++++--
 core/src/test/resources/sql/agg.iq              | 31 +++++++++++
 4 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/cbbaddf5/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
index 03db1b9..e15664f 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
@@ -678,6 +678,12 @@ public class CalciteAssert {
     case LINGUAL:
       return rootSchema.add("SALES",
           new ReflectiveSchema(new JdbcTest.LingualSchema()));
+    case ORINOCO:
+      final SchemaPlus orinoco = rootSchema.add("ORINOCO", new AbstractSchema());
+      orinoco.add("ORDERS",
+          new StreamTest.OrdersHistoryTable(
+              StreamTest.OrdersStreamTableFactory.getRowList()));
+      return orinoco;
     case POST:
       final SchemaPlus post = rootSchema.add("POST", new AbstractSchema());
       post.add("EMP",
@@ -1581,7 +1587,8 @@ public class CalciteAssert {
     JDBC_SCOTT,
     SCOTT,
     LINGUAL,
-    POST
+    POST,
+    ORINOCO
   }
 
   /** Converts a {@link ResultSet} to string. */

http://git-wip-us.apache.org/repos/asf/calcite/blob/cbbaddf5/core/src/test/java/org/apache/calcite/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcTest.java b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
index fc4ca15..d7b635c 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
@@ -4846,6 +4846,12 @@ public class JdbcTest {
                           new ReflectiveSchemaTest.CatchallSchema()))
                   .connect();
             }
+            if (name.equals("orinoco")) {
+              return CalciteAssert.that()
+                  .with(CalciteAssert.SchemaSpec.ORINOCO)
+                  .withDefaultSchema("ORINOCO")
+                  .connect();
+            }
             if (name.equals("seq")) {
               final Connection connection = CalciteAssert.that()
                   .withSchema("s", new AbstractSchema())

http://git-wip-us.apache.org/repos/asf/calcite/blob/cbbaddf5/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 649db3d..99403bd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -261,6 +261,33 @@ public class StreamTest {
                 "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
   }
 
+  @Ignore
+  @Test public void testTumbleViaOver() {
+    String sql = "WITH HourlyOrderTotals (rowtime, productId, c, su) AS (\n"
+        + "  SELECT FLOOR(rowtime TO HOUR),\n"
+        + "    productId,\n"
+        + "    COUNT(*),\n"
+        + "    SUM(units)\n"
+        + "  FROM Orders\n"
+        + "  GROUP BY FLOOR(rowtime TO HOUR), productId)\n"
+        + "SELECT STREAM rowtime,\n"
+        + "  productId,\n"
+        + "  SUM(su) OVER w AS su,\n"
+        + "  SUM(c) OVER w AS c\n"
+        + "FROM HourlyTotals\n"
+        + "WINDOW w AS (\n"
+        + "  ORDER BY rowtime\n"
+        + "  PARTITION BY productId\n"
+        + "  RANGE INTERVAL '2' HOUR PRECEDING)\n";
+    String sql2 = ""
+        + "SELECT STREAM rowtime, productId, SUM(units) AS su, COUNT(*) AS c\n"
+        + "FROM Orders\n"
+        + "GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR)";
+    // sql and sql2 should give same result
+    CalciteAssert.model(STREAM_JOINS_MODEL)
+        .query(sql);
+  }
+
   private Function<ResultSet, Void> startsWith(String... rows) {
     final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
     return new Function<ResultSet, Void>() {
@@ -289,7 +316,7 @@ public class StreamTest {
    * Base table for the Orders table. Manages the base schema used for the test tables and common
    * functions.
    */
-  private abstract static class BaseOrderStreamTable implements ScannableTable, StreamableTable {
+  private abstract static class BaseOrderStreamTable implements ScannableTable {
     protected final RelProtoDataType protoRowType = new RelProtoDataType() {
       public RelDataType apply(RelDataTypeFactory a0) {
         return a0.builder()
@@ -325,6 +352,10 @@ public class StreamTest {
 
     public Table create(SchemaPlus schema, String name,
         Map<String, Object> operand, RelDataType rowType) {
+      return new OrdersTable(getRowList());
+    }
+
+    public static ImmutableList<Object[]> getRowList() {
       final Object[][] rows = {
         {ts(10, 15, 0), 1, "paint", 10},
         {ts(10, 24, 15), 2, "paper", 5},
@@ -332,16 +363,17 @@ public class StreamTest {
         {ts(10, 58, 0), 4, "paint", 3},
         {ts(11, 10, 0), 5, "paint", 3}
       };
-      return new OrdersTable(ImmutableList.copyOf(rows));
+      return ImmutableList.copyOf(rows);
     }
 
-    private Object ts(int h, int m, int s) {
+    private static Object ts(int h, int m, int s) {
       return DateTimeUtils.unixTimestamp(2015, 2, 15, h, m, s);
     }
   }
 
   /** Table representing the ORDERS stream. */
-  public static class OrdersTable extends BaseOrderStreamTable {
+  public static class OrdersTable extends BaseOrderStreamTable
+      implements StreamableTable {
     private final ImmutableList<Object[]> rows;
 
     public OrdersTable(ImmutableList<Object[]> rows) {
@@ -386,7 +418,8 @@ public class StreamTest {
   /**
    * Table representing an infinitely larger ORDERS stream.
    */
-  public static class InfiniteOrdersTable extends BaseOrderStreamTable {
+  public static class InfiniteOrdersTable extends BaseOrderStreamTable
+      implements StreamableTable {
     public Enumerable<Object[]> scan(DataContext root) {
       return Linq4j.asEnumerable(new Iterable<Object[]>() {
         @Override public Iterator<Object[]> iterator() {
@@ -412,6 +445,19 @@ public class StreamTest {
     }
   }
 
+  /** Table representing the history of the ORDERS stream. */
+  public static class OrdersHistoryTable extends BaseOrderStreamTable {
+    private final ImmutableList<Object[]> rows;
+
+    public OrdersHistoryTable(ImmutableList<Object[]> rows) {
+      this.rows = rows;
+    }
+
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(rows);
+    }
+  }
+
   /**
    * Mocks a simple relation to use for stream joining test.
    */

http://git-wip-us.apache.org/repos/asf/calcite/blob/cbbaddf5/core/src/test/resources/sql/agg.iq
----------------------------------------------------------------------
diff --git a/core/src/test/resources/sql/agg.iq b/core/src/test/resources/sql/agg.iq
index 3b1f58e..9390b02 100644
--- a/core/src/test/resources/sql/agg.iq
+++ b/core/src/test/resources/sql/agg.iq
@@ -1493,6 +1493,37 @@ EnumerableCalc(expr#0..2=[{inputs}], JOB=[$t0], SUM_SAL=[$t2], DEPTNO=[$t1])
 !plan
 !}
 
+!use orinoco
+
+# FLOOR to achieve a 2-hour window
+select floor(rowtime to hour) as rowtime, count(*) as c
+from Orders
+group by floor(rowtime to hour);
++---------------------+---+
+| ROWTIME             | C |
++---------------------+---+
+| 2015-02-15 10:00:00 | 4 |
+| 2015-02-15 11:00:00 | 1 |
++---------------------+---+
+(2 rows)
+
+!ok
+
+# FLOOR applied to intervals, to achieve a 2-hour window
+select rowtime, count(*) as c
+from (
+  select timestamp '1970-1-1 0:0:0' + (floor(timestamp '1970-1-1 0:0:0' + ((rowtime - timestamp '1970-1-1 0:0:0') second) / 2 to hour) - timestamp '1970-1-1 0:0:0') second * 2 as rowtime
+  from Orders)
+group by rowtime;
++---------------------+---+
+| ROWTIME             | C |
++---------------------+---+
+| 2015-02-15 10:00:00 | 5 |
++---------------------+---+
+(1 row)
+
+!ok
+
 # [CALCITE-729] IndexOutOfBoundsException in ROLLUP query on JDBC data source
 !use jdbc_scott
 select deptno, job, count(*) as c


[2/2] calcite git commit: [CALCITE-1090] Revise Streaming SQL specification

Posted by jh...@apache.org.
[CALCITE-1090] Revise Streaming SQL specification


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/884f0106
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/884f0106
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/884f0106

Branch: refs/heads/master
Commit: 884f01066fcc31c0fc10b80e3d891101e4f315a3
Parents: cbbaddf
Author: Julian Hyde <jh...@apache.org>
Authored: Fri Feb 19 13:38:55 2016 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sun Feb 21 01:47:40 2016 -0800

----------------------------------------------------------------------
 site/README.md            |   2 +-
 site/_docs/stream.md      | 586 +++++++++++++++++++++++++++++++++--------
 site/img/pie-chart.png    | Bin 0 -> 13257 bytes
 site/img/window-types.png | Bin 0 -> 19177 bytes
 4 files changed, 478 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/884f0106/site/README.md
----------------------------------------------------------------------
diff --git a/site/README.md b/site/README.md
index 8f78140..ea3f212 100644
--- a/site/README.md
+++ b/site/README.md
@@ -27,7 +27,7 @@ This directory contains the code for the Apache Calcite web site,
 1. `cd site`
 2. `svn co https://svn.apache.org/repos/asf/calcite/site target`
 3. `sudo apt-get install rubygems ruby2.1-dev zlib1g-dev` (linux)
-4. `sudo gem install bundler github-pages jekyll`
+4. `sudo gem install bundler github-pages jekyll jekyll-oembed`
 5. `bundle install`
 
 ## Add javadoc

http://git-wip-us.apache.org/repos/asf/calcite/blob/884f0106/site/_docs/stream.md
----------------------------------------------------------------------
diff --git a/site/_docs/stream.md b/site/_docs/stream.md
index c43d89f..f66fbce 100644
--- a/site/_docs/stream.md
+++ b/site/_docs/stream.md
@@ -25,6 +25,11 @@ limitations under the License.
 Calcite has extended SQL and relational algebra in order to support
 streaming queries.
 
+* TOC
+{:toc}
+
+## Introduction
+
 Streams are collections to records that flow continuously, and forever.
 Unlike tables, they are not typically stored on disk, but flow over the
 network and are held for short periods of time in memory.
@@ -110,10 +115,12 @@ a streaming query on a table, or a relational query on a stream, Calcite gives
 an error:
 
 {% highlight sql %}
-> SELECT * FROM Shipments;
+SELECT * FROM Shipments;
+
 ERROR: Cannot convert stream 'SHIPMENTS' to a table
 
-> SELECT STREAM * FROM Products;
+SELECT STREAM * FROM Products;
+
 ERROR: Cannot convert table 'PRODUCTS' to a stream
 {% endhighlight %}
 
@@ -173,34 +180,46 @@ differences are:
 * What defines the "window", the set of rows that contribute to a given output row?
 * Is the result a stream or a relation?
 
+There are various window types:
+
+* tumbling window (GROUP BY)
+* hopping window (multi GROUP BY)
+* sliding window (window functions)
+* cascading window (window functions)
+
+and the following diagram shows the kinds of query in which to use them:
+
+![Window types]({{ site.baseurl }}/img/window-types.png)
+
 First we'll look a *tumbling window*, which is defined by a streaming
 `GROUP BY`. Here is an example:
 
 {% highlight sql %}
-SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
+SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,
   productId,
   COUNT(*) AS c,
   SUM(units) AS units
 FROM Orders
-GROUP BY FLOOR(rowtime TO HOUR), productId;
+GROUP BY CEIL(rowtime TO HOUR), productId;
 
   rowtime | productId |       c | units
 ----------+-----------+---------+-------
- 10:00:00 |        30 |       2 |    24
- 10:00:00 |        10 |       1 |     1
- 10:00:00 |        20 |       1 |     7
- 11:00:00 |        10 |       3 |    11
- 11:00:00 |        40 |       1 |    12
+ 11:00:00 |        30 |       2 |    24
+ 11:00:00 |        10 |       1 |     1
+ 11:00:00 |        20 |       1 |     7
+ 12:00:00 |        10 |       3 |    11
+ 12:00:00 |        40 |       1 |    12
 {% endhighlight %}
 
 The result is a stream. At 11 o'clock, Calcite emits a sub-total for every
-`productId` that had an order since 10 o'clock. At 12 o'clock, it will emit
+`productId` that had an order since 10 o'clock, timestamped 11 o'clock.
+At 12 o'clock, it will emit
 the orders that occurred between 11:00 and 12:00. Each input row contributes to
 only one output row.
 
 How did Calcite know that the 10:00:00 sub-totals were complete at 11:00:00,
 so that it could emit them? It knows that `rowtime` is increasing, and it knows
-that `FLOOR(rowtime TO HOUR)` is also increasing. So, once it has seen a row
+that `CEIL(rowtime TO HOUR)` is also increasing. So, once it has seen a row
 at or after 11:00:00, it will never see a row that will contribute to a 10:00:00
 total.
 
@@ -217,11 +236,12 @@ Calcite is
 not able to make progress, and it will not allow the query:
 
 {% highlight sql %}
-> SELECT STREAM productId,
->   COUNT(*) AS c,
->   SUM(units) AS units
-> FROM Orders
-> GROUP BY productId;
+SELECT STREAM productId,
+  COUNT(*) AS c,
+  SUM(units) AS units
+FROM Orders
+GROUP BY productId;
+
 ERROR: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
 {% endhighlight %}
 
@@ -229,7 +249,150 @@ Monotonic and quasi-monotonic columns need to be declared in the schema.
 The monotonicity is
 enforced when records enter the stream and assumed by queries that read from
 that stream. We recommend that you give each stream a timestamp column called
-`rowtime`, but you can declare others, `orderId`, for example.
+`rowtime`, but you can declare others to be monotonic, `orderId`, for example.
+
+We discuss punctuation, watermarks, and other ways of making progress
+<a href="#punctuation">below</a>.
+
+# Tumbling windows, improved
+
+The previous example of tumbling windows was easy to write because the window
+was one hour. For intervals that are not a whole time unit, say 2 hours or
+2 hours and 17 minutes, you cannot use `CEIL`, and the expression gets more
+complicated.
+
+Calcite supports an alternative syntax for tumbling windows:
+
+{% highlight sql %}
+SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
+  productId,
+  COUNT(*) AS c,
+  SUM(units) AS units
+FROM Orders
+GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
+
+  rowtime | productId |       c | units
+----------+-----------+---------+-------
+ 11:00:00 |        30 |       2 |    24
+ 11:00:00 |        10 |       1 |     1
+ 11:00:00 |        20 |       1 |     7
+ 12:00:00 |        10 |       3 |    11
+ 12:00:00 |        40 |       1 |    12
+{% endhighlight %}
+
+As you can see, it returns the same results as the previous query. The `TUMBLE`
+function returns a grouping key that is the same for all the rows that will end
+up in a given summary row; the `TUMBLE_END` function takes the same arguments
+and returns the time at which that window ends;
+there is also a `TUMBLE_START` function.
+
+`TUMBLE` has an optional parameter to align the window.
+In the following example,
+we use a 30 minute interval and 0:12 as the alignment time,
+so the query emits summaries at 12 and 42 minutes past each hour:
+
+{% highlight sql %}
+SELECT STREAM
+  TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
+  productId,
+  COUNT(*) AS c,
+  SUM(units) AS units
+FROM Orders
+GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
+  productId;
+
+  rowtime | productId |       c | units
+----------+-----------+---------+-------
+ 10:42:00 |        30 |       2 |    24
+ 10:42:00 |        10 |       1 |     1
+ 10:42:00 |        20 |       1 |     7
+ 11:12:00 |        10 |       2 |     7
+ 11:12:00 |        40 |       1 |    12
+ 11:42:00 |        10 |       1 |     4
+{% endhighlight %}
+
+# Hopping windows
+
+Hopping windows are a generalization of tumbling windows that allow data to
+be kept in a window for a longer than the emit interval.
+
+For example, the following query emits a row timestamped 11:00 containing data
+from 08:00 to 11:00 (or 10:59.9 if we're being pedantic),
+and a row timestamped 12:00 containing data from 09:00
+to 12:00.
+
+{% highlight sql %}
+SELECT STREAM
+  HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) AS rowtime,
+  COUNT(*) AS c,
+  SUM(units) AS units
+FROM Orders
+GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR);
+
+  rowtime |        c | units
+----------+----------+-------
+ 11:00:00 |        4 |    27
+ 12:00:00 |        8 |    50
+{% endhighlight %}
+
+In this query, because the retain period is 3 times the emit period, every input
+row contributes to exactly 3 output rows. Imagine that the `HOP` function
+generates a collection of group keys for incoming row, and places its values
+in the accumulators of each of those group keys. For example,
+`HOP(10:18:00, INTERVAL '1' HOUR, INTERVAL '3')` generates 3 periods
+
+```[08:00, 09:00)
+[09:00, 10:00)
+[10:00, 11:00)
+```
+
+This raises the possibility of allowing user-defined partitioning functions
+for users who are not happy with the built-in functions `HOP` and `TUMBLE`.
+
+We can build complex complex expressions such as an exponentially decaying
+moving average:
+
+{% highlight sql %}
+SELECT STREAM HOP_END(rowtime),
+  productId,
+  SUM(unitPrice * EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
+   / SUM(EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
+FROM Orders
+GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' HOUR),
+  productId
+{% endhighlight %}
+
+Emits:
+
+* a row at `11:00:00` containing rows in `[10:00:00, 11:00:00)`;
+* a row at `11:00:01` containing rows in `[10:00:01, 11:00:01)`.
+
+The expression weighs recent orders more heavily than older orders.
+Extending the window from 1 hour to 2 hours or 1 year would have
+virtually no effect on the accuracy of the result (but use more memory
+and compute).
+
+Note that we use `HOP_START` inside an aggregate function (`SUM`) because it
+is a value that is constant for all rows within a sub-total. This
+would not be allowed for typical aggregate functions (`SUM`, `COUNT`
+etc.).
+
+If you are familiar with `GROUPING SETS`, you may notice that partitioning
+functions can be seen as a generalization of `GROUPING SETS`, in that they
+allow an input row to contribute to multiple sub-totals.
+The auxiliary functions for `GROUPING SETS`,
+such as `GROUPING()` and `GROUP_ID`,
+can be used inside aggregate functions, so it is not surprising that
+`HOP_START` and `HOP_END` can be used in the same way.
+
+# GROUPING SETS
+
+`GROUPING SETS` is valid for a streaming query provided that every
+grouping set contains a monotonic or quasi-monotonic expression.
+
+`CUBE` and `ROLLUP` are not valid for streaming query, because they will
+produce at least one grouping set that aggregates everything (like
+`GROUP BY ()`).
 
 # Filtering after aggregation
 
@@ -237,10 +400,10 @@ As in standard SQL, you can apply a `HAVING` clause to filter rows emitted by
 a streaming `GROUP BY`:
 
 {% highlight sql %}
-SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
+SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
   productId
 FROM Orders
-GROUP BY FLOOR(rowtime TO HOUR), productId
+GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId
 HAVING COUNT(*) > 2 OR SUM(units) > 10;
 
   rowtime | productId
@@ -258,12 +421,12 @@ sub-query:
 {% highlight sql %}
 SELECT STREAM rowtime, productId
 FROM (
-  SELECT FLOOR(rowtime TO HOUR) AS rowtime,
+  SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
     productId,
     COUNT(*) AS c,
     SUM(units) AS su
   FROM Orders
-  GROUP BY FLOOR(rowtime TO HOUR), productId)
+  GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
 WHERE c > 2 OR su > 10;
 
   rowtime | productId
@@ -286,12 +449,12 @@ possible:
 
 {% highlight sql %}
 CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
-  SELECT FLOOR(rowtime TO HOUR),
+  SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
     productId,
     COUNT(*),
     SUM(units)
   FROM Orders
-  GROUP BY FLOOR(rowtime TO HOUR), productId;
+  GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
 
 SELECT STREAM rowtime, productId
 FROM HourlyOrderTotals
@@ -319,12 +482,12 @@ a view:
 
 {% highlight sql %}
 WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
-  SELECT FLOOR(rowtime TO HOUR),
+  SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
     productId,
     COUNT(*),
     SUM(units)
   FROM Orders
-  GROUP BY FLOOR(rowtime TO HOUR), productId)
+  GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
 SELECT STREAM rowtime, productId
 FROM HourlyOrderTotals
 WHERE c > 2 OR su > 10;
@@ -336,7 +499,7 @@ WHERE c > 2 OR su > 10;
  11:00:00 |        40
 {% endhighlight %}
 
-## Converting between streams and relations
+# Converting between streams and relations
 
 Look back at the definition of the `HourlyOrderTotals` view.
 Is the view a stream or a relation?
@@ -376,63 +539,32 @@ data in an Apache Kafka [<a href="#ref2">2</a>] topic)
 but not all. At run time, Calcite figures out whether there is sufficient
 history to run the query, and if not, gives an error.
 
-## Hopping windows
+# The "pie chart" problem: Relational queries on streams
+
+One particular case where you need to convert a stream to a relation
+occurs in what I call the "pie chart problem". Imagine that you need to
+write a web page with a chart, like the following, that summarizes the
+number of orders for each product over the last hour.
 
-Previously we saw how to define a tumbling window using a `GROUP BY` clause.
-Each record contributed to a single sub-total record, the one containing its
-hour and product id.
+![Pie chart]({{ site.baseurl }}/img/pie-chart.png)
 
-But suppose we want to emit, every hour, the number of each product ordered over
-the past three hours. To do this, we use `SELECT ... OVER` and a sliding window
-to combine multiple tumbling windows.
+But the `Orders` stream only contains a few records, not an hour's summary.
+We need to run a relational query on the history of the stream:
 
 {% highlight sql %}
-SELECT STREAM rowtime,
-  productId,
-  SUM(su) OVER w AS su,
-  SUM(c) OVER w AS c
-FROM HourlyTotals
-WINDOW w AS (
-  ORDER BY rowtime
-  PARTITION BY productId
-  RANGE INTERVAL '2' HOUR PRECEDING)
+SELECT productId, count(*)
+FROM Orders
+WHERE rowtime BETWEEN current_timestamp - INTERVAL '1' HOUR
+              AND current_timestamp;
 {% endhighlight %}
 
-This query uses the `HourlyOrderTotals` view defined previously.
-The 2 hour interval combines the totals timestamped 09:00:00, 10:00:00 and
-11:00:00 for a particular product into a single total timestamped 11:00:00 and
-summarizing orders for that product between 09:00:00 and 12:00:00.
-
-## Limitations of tumbling and hopping windows
-
-In the present syntax, we acknowledge that it is not easy to create certain
-kinds of windows.
-
-First, let's consider tumbling windows over complex periods.
-
-The `FLOOR` and `CEIL` functions make is easy to create a tumbling window that
-emits on a whole time unit (say every hour, or every minute) but less easy to
-emit, say, every 15 minutes. One could imagine an extension to the `FLOOR`
-function that emits unique values on just about any periodic basis (say in 11
-minute intervals starting from midnight of the current day).
+If the history of the `Orders` stream is being spooled to the `Orders` table,
+we can answer the query, albeit at a high cost. Better, if we can tell the
+system to materialize one hour summary into a table,
+maintain it continuously as the stream flows,
+and automatically rewrite queries to use the table.
 
-Next, let's consider hopping windows whose retention period is not a multiple
-of its emission period. Say we want to output, at the top of each hour, the
-orders for the previous 7,007 seconds. If we were to simulate this hopping
-window using a sliding window over a tumbling window, as before, we would have
-to sum lots of 1-second windows (because 3,600 and 7,007 are co-prime).
-This is a lot of effort for both the system and the person writing the query.
-
-Calcite could perhaps solve this generalizing `GROUP BY` syntax, but we would
-be destroying the principle that an input row into a `GROUP BY` appears in
-precisely one output row.
-
-Calcite's SQL extensions for streaming queries are evolving. As we learn more
-about how people wish to query streams, we plan to make the language more
-expressive while remaining compatible with standard SQL and consistent with
-its principles, look and feel.
-
-## Sorting
+# Sorting
 
 The story for `ORDER BY` is similar to `GROUP BY`.
 The syntax looks like regular SQL, but Calcite must be sure that it can deliver
@@ -440,9 +572,9 @@ timely results. It therefore requires a monotonic expression on the leading edge
 of your `ORDER BY` key.
 
 {% highlight sql %}
-SELECT STREAM FLOOR(rowtime TO hour) AS rowtime, productId, orderId, units
+SELECT STREAM CEIL(rowtime TO hour) AS rowtime, productId, orderId, units
 FROM Orders
-ORDER BY FLOOR(rowtime TO hour) ASC, units DESC;
+ORDER BY CEIL(rowtime TO hour) ASC, units DESC;
 
   rowtime | productId | orderId | units
 ----------+-----------+---------+-------
@@ -512,7 +644,7 @@ You only need to add an `ORDER BY` to the outermost query. If you need to,
 say, perform `GROUP BY` after a `UNION ALL`, Calcite will add an `ORDER BY`
 implicitly, in order to make the GROUP BY algorithm possible.
 
-## Table constructor
+# Table constructor
 
 The `VALUES` clause creates an inline table with a given set of rows.
 
@@ -525,7 +657,7 @@ would never return any rows.
 ERROR: Cannot stream VALUES
 {% endhighlight %}
 
-## Sliding windows
+# Sliding windows
 
 Standard SQL features so-called "analytic functions" that can be used in the
 `SELECT` clause. Unlike `GROUP BY`, these do not collapse records. For each
@@ -580,7 +712,7 @@ Some other features of the windowed aggregation syntax:
   (The stream will wait until they have arrived).
 * You can compute order-dependent functions such as `RANK` and median.
 
-## Cascading windows
+# Cascading windows
 
 What if we want a query that returns a result for every record, like a
 sliding window, but resets totals on a fixed time period, like a
@@ -597,9 +729,10 @@ FROM Orders;
 
 It looks similar to a sliding window query, but the monotonic
 expression occurs within the `PARTITION BY` clause of the window. As
-the rowtime moves from from 10:59:59 to 11:00:00, `FLOOR(rowtime TO
-HOUR)` changes from 10:00:00 to 11:00:00, and therefore a new
-partition starts. The first row to arrive in the new hour will start a
+the rowtime moves from from 10:59:59 to 11:00:00,
+`FLOOR(rowtime TO HOUR)` changes from 10:00:00 to 11:00:00,
+and therefore a new partition starts.
+The first row to arrive in the new hour will start a
 new total; the second row will have a total that consists of two rows,
 and so on.
 
@@ -609,47 +742,282 @@ removes all sub-totals for that partition from its internal storage.
 Analytic functions that using cascading and sliding windows can be
 combined in the same query.
 
-## State of the stream
+# Joining streams to tables
+
+There are two kinds of join where streams are concerned: stream-to-table
+join and stream-to-stream join.
+
+A stream-to-table join is straightforward if the contents of the table
+are not changing. This query enriches a stream of orders with
+each product's list price:
+
+{% highlight sql %}
+SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
+  p.name, p.unitPrice
+FROM Orders AS o
+JOIN Products AS p
+  ON o.productId = p.productId;
+
+  rowtime | productId | orderId | units | name   | unitPrice
+----------+-----------+---------+-------+ -------+-----------
+ 10:17:00 |        30 |       5 |     4 | Cheese |        17
+ 10:17:05 |        10 |       6 |     1 | Beer   |      0.25
+ 10:18:05 |        20 |       7 |     2 | Wine   |         6
+ 10:18:07 |        30 |       8 |    20 | Cheese |        17
+ 11:02:00 |        10 |       9 |     6 | Beer   |      0.25
+ 11:04:00 |        10 |      10 |     1 | Beer   |      0.25
+ 11:09:30 |        40 |      11 |    12 | Bread  |       100
+ 11:24:11 |        10 |      12 |     4 | Beer   |      0.25
+{% endhighlight %}
+
+What should happen if the table is changing? For example,
+suppose the unit price of product 10 is increased to 0.35 at 11:00.
+Orders placed before 11:00 should have the old price, and orders
+placed after 11:00 should reflect the new price.
+
+One way to implement this is to have a table that keeps every version
+with a start and end effective date, `ProductVersions` in the following
+example:
+
+{% highlight sql %}
+SELECT STREAM *
+FROM Orders AS o
+JOIN ProductVersions AS p
+  ON o.productId = p.productId
+  AND o.rowtime BETWEEN p.startDate AND p.endDate
+
+  rowtime | productId | orderId | units | productId1 |   name | unitPrice
+----------+-----------+---------+-------+ -----------+--------+-----------
+ 10:17:00 |        30 |       5 |     4 |         30 | Cheese |        17
+ 10:17:05 |        10 |       6 |     1 |         10 | Beer   |      0.25
+ 10:18:05 |        20 |       7 |     2 |         20 | Wine   |         6
+ 10:18:07 |        30 |       8 |    20 |         30 | Cheese |        17
+ 11:02:00 |        10 |       9 |     6 |         10 | Beer   |      0.35
+ 11:04:00 |        10 |      10 |     1 |         10 | Beer   |      0.35
+ 11:09:30 |        40 |      11 |    12 |         40 | Bread  |       100
+ 11:24:11 |        10 |      12 |     4 |         10 | Beer   |      0.35
+{% endhighlight %}
+
+The other way to implement this is to use a database with temporal support
+(the ability to find the contents of the database as it was at any moment
+in the past), and the system needs to know that the `rowtime` column of
+the `Orders` stream corresponds to the transaction timestamp of the
+`Products` table.
+
+For many applications, it is not worth the cost and effort of temporal
+support or a versioned table. It is acceptable to the application that
+the query gives different results when replayed: in this example, on replay,
+all orders of product 10 are assigned the later unit price, 0.35.
+
+# Joining streams to streams
+
+It makes sense to join two streams if the join condition somehow forces
+them to remain a finite distance from one another. In the following query,
+the ship date is within one hour of the order date:
+
+{% highlight sql %}
+SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
+FROM Orders AS o
+JOIN Shipments AS s
+  ON o.orderId = p.orderId
+  AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
+
+  rowtime | productId | orderId | shipTime
+----------+-----------+---------+----------
+ 10:17:00 |        30 |       5 | 10:55:00
+ 10:17:05 |        10 |       6 | 10:20:00
+ 11:02:00 |        10 |       9 | 11:58:00
+ 11:24:11 |        10 |      12 | 11:44:00
+{% endhighlight %}
+
+Note that quite a few orders do not appear, because they did not ship
+within an hour. By the time the system receives order 10, timestamped 11:24:11,
+it has already removed orders up to and including order 8, timestamped 10:18:07,
+from its hash table.
+
+As you can see, the "lock step", tying together monotonic or quasi-monotonic
+columns of the two streams, is necessary for the system to make progress.
+It will refuse to execute a query if it cannot deduce a lock step.
+
+# DML
+
+It's not only queries that make sense against streams;
+it also makes sense to run DML statements (`INSERT`, `UPDATE`, `DELETE`,
+and also their rarer cousins `UPSERT` and `REPLACE`) against streams.
+
+DML is useful because it allows you do materialize streams
+or tables based on streams,
+and therefore save effort when values are used often.
+
+Consider how streaming applications often consist of pipelines of queries,
+each query transforming input stream(s) to output stream(s).
+The component of a pipeline can be a view:
+
+{% highlight sql %}
+CREATE VIEW LargeOrders AS
+SELECT STREAM * FROM Orders WHERE units > 1000;
+{% endhighlight %}
+
+or a standing `INSERT` statement:
+
+{% highlight sql %}
+INSERT INTO LargeOrders
+SELECT STREAM * FROM Orders WHERE units > 1000;
+{% endhighlight %}
+
+These look similar, and in both cases the next step(s) in the pipeline
+can read from `LargeOrders` without worrying how it was populated.
+There is a difference in efficiency: the `INSERT` statement does the 
+same work no matter how many consumers there are; the view does work
+proportional to the number of consumers, and in particular, does no 
+work if there are no consumers.
+
+Other forms of DML make sense for streams. For example, the following
+standing `UPSERT` statement maintains a table that materializes a summary
+of the last hour of orders:
+
+{% highlight sql %}
+UPSERT INTO OrdersSummary
+SELECT STREAM productId,
+  COUNT(*) OVER lastHour AS c
+FROM Orders
+WINDOW lastHour AS (
+  PARTITION BY productId
+  ORDER BY rowtime
+  RANGE INTERVAL '1' HOUR PRECEDING)
+{% endhighlight %}
+
+# Punctuation
+
+Punctuation[<a href="#ref5">5</a>] allows a stream query to make progress
+even if there are not enough values in a monotonic key to push the results out.
+
+(I prefer the term "rowtime bounds",
+and watermarks[<a href="#ref6">6</a>] are a related concept,
+but for these purposes, punctuation will suffice.)
+
+If a stream has punctuation enabled then it may not be sorted but is
+nevertheless sortable. So, for the purposes of semantics, it is sufficient
+to work in terms of sorted streams.
+
+By the way, an out-of-order stream is also sortable if it is *t-sorted*
+(i.e. every record is guaranteed to arrive within *t* seconds of its
+timestamp) or *k-sorted* (i.e. every record is guaranteed to be no more
+than *k* positions out of order). So queries on these streams can be
+planned similarly to queries on streams with punctuation.
+
+And, we often want to aggregate over attributes that are not
+time-based but are nevertheless monotonic. "The number of times a team
+has shifted between winning-state and losing-state" is one such
+monotonic attribute. The system needs to figure out for itself that it
+is safe to aggregate over such an attribute; punctuation does not add
+any extra information.
+
+I have in mind some metadata (cost metrics) for the planner:
+
+1. Is this stream sorted on a given attribute (or attributes)?
+2. Is it possible to sort the stream on a given attribute? (For finite
+   relations, the answer is always "yes"; for streams it depends on the
+   existence of punctuation, or linkage between the attributes and the
+   sort key.)
+3. What latency do we need to introduce in order to perform that sort?
+4. What is the cost (in CPU, memory etc.) of performing that sort?
+
+We already have (1), in [BuiltInMetadata.Collation]({{ site.apiRoot }}/org/apache/calcite/rel/metadata/BuiltInMetadata.Collation.html).
+For (2), the answer is always "true" for finite relations.
+But we'll need to implement (2), (3) and (4) for streams.
+
+# State of the stream
 
 Not all concepts in this article have been implemented in Calcite.
 And others may be implemented in Calcite but not in a particular adapter
-such as Samza SQL [<a href="#ref3">3</a>].
+such as SamzaSQL [<a href="#ref3">3</a>] [<a href="#ref4">4</a>].
+
+## Implemented
 
-### Implemented
-* Streaming SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY
-* FLOOR and CEILING functions
+* Streaming `SELECT`, `WHERE`, `GROUP BY`, `HAVING`, `UNION ALL`, `ORDER BY`
+* `FLOOR` and `CEIL` functions
 * Monotonicity
-* Streaming VALUES is disallowed
+* Streaming `VALUES` is disallowed
 
-### Not implemented
-* Stream-to-stream JOIN
-* Stream-to-table JOIN
+## Not implemented
+
+The following features are presented in this document as if Calcite
+supports them, but in fact it does not (yet). Full support means
+that the reference implementation supports the feature (including
+negative cases) and the TCK tests it.
+
+* Stream-to-stream `JOIN`
+* Stream-to-table `JOIN`
 * Stream on view
-* Streaming UNION ALL with ORDER BY (merge)
+* Streaming `UNION ALL` with `ORDER BY` (merge)
 * Relational query on stream
 * Streaming windowed aggregation (sliding and cascading windows)
-* Check that STREAM in sub-queries and views is ignored
-* Check that streaming ORDER BY cannot have OFFSET or LIMIT
+* Check that `STREAM` in sub-queries and views is ignored
+* Check that streaming `ORDER BY` cannot have `OFFSET` or `LIMIT`
 * Limited history; at run time, check that there is sufficient history
   to run the query.
+* [Quasi-monotonicity](https://issues.apache.org/jira/browse/CALCITE-1096)
+* `HOP` and `TUMBLE` (and auxiliary `HOP_START`, `HOP_END`,
+  `TUMBLE_START`, `TUMBLE_END`) functions
+
+## To do in this document
+
+* Re-visit whether you can stream `VALUES`
+* `OVER` clause to define window on stream
+* Consider whether to allow `CUBE` and `ROLLUP` in streaming queries,
+  with an understanding that some levels of aggregation will never complete
+  (because they have no monotonic expressions) and thus will never be emitted.
+* Fix the `UPSERT` example to remove records for products that have not
+  occurred in the last hour.
+* DML that outputs to multiple streams; perhaps an extension to the standard
+  `REPLACE` statement. 
+
+# Functions
+
+The following functions are not present in standard SQL
+but are defined in streaming SQL.
+
+Scalar functions:
+
+* `FLOOR(dateTime TO intervalType)` rounds a date, time or timestamp value
+  down to a given interval type
+* `CEIL(dateTime TO intervalType)` rounds a date, time or timestamp value
+  up to a given interval type
+
+Partitioning functions:
+
+* `HOP(t, emit, retain)` returns a collection of group keys for a row
+  to be part of a hopping window
+* `HOP(t, emit, retain, align)` returns a collection of group keys for a row
+  to be part of a hopping window with a given alignment
+* `TUMBLE(t, emit)` returns a group key for a row
+  to be part of a tumbling window
+* `TUMBLE(t, emit, align)` returns a group key for a row
+  to be part of a tumbling window with a given alignment
+
+`TUMBLE(t, e)` is equivalent to `TUMBLE(t, e, TIME '00:00:00')`.
+
+`TUMBLE(t, e, a)` is equivalent to `HOP(t, e, e, a)`.
 
-### To do in this document
-* Re-visit whether you can stream VALUES
-* OVER clause to define window on stream
-* Windowed aggregation
-* Punctuation
-* Stream-to-table join
-  * Stream-to-table join where table is changing
-* Stream-to-stream join
-* Relational queries on streams (e.g. "pie chart" query)
-* Diagrams for various window types
+`HOP(t, e, r)` is equivalent to `HOP(t, e, r, TIME '00:00:00')`.
 
-## References
+# References
 
 * [<a name="ref1">1</a>]
-  <a href="http://ilpubs.stanford.edu:8090/758/">Arasu, Arvind and Babu,
-  Shivnath and Widom, Jennifer (2003) The CQL Continuous Query
+  <a href="http://ilpubs.stanford.edu:8090/758/">Arvind Arasu, Shivnath Babu,
+  and Jennifer Widom (2003) The CQL Continuous Query
   Language: Semantic Foundations and Query Execution</a>.
 * [<a name="ref2">2</a>]
   <a href="http://kafka.apache.org/documentation.html">Apache Kafka</a>.
 * [<a name="ref3">3</a>] <a href="http://samza.apache.org">Apache Samza</a>.
+* [<a name="ref4">4</a>] <a href="https://github.com/milinda/samza-sql">SamzaSQL</a>.
+* [<a name="ref5">5</a>]
+  <a href="http://www.whitworth.edu/academic/department/mathcomputerscience/faculty/tuckerpeter/pdf/117896_final.pdf">Peter
+  A. Tucker, David Maier, Tim Sheard, and Leonidas Fegaras (2003) Exploiting
+  Punctuation Semantics in Continuous Data Streams</a>.
+* [<a name="ref6">6</a>]
+  <a href="http://research.google.com/pubs/pub41378.html">Tyler Akidau,
+  Alex Balikov, Kaya Bekiroglu, Slava Chernyak, Josh Haberman, Reuven Lax,
+  Sam McVeety, Daniel Mills, Paul Nordstrom, and Sam Whittle (2013)
+  MillWheel: Fault-Tolerant Stream Processing at Internet Scale</a>.

http://git-wip-us.apache.org/repos/asf/calcite/blob/884f0106/site/img/pie-chart.png
----------------------------------------------------------------------
diff --git a/site/img/pie-chart.png b/site/img/pie-chart.png
new file mode 100644
index 0000000..59e2ddb
Binary files /dev/null and b/site/img/pie-chart.png differ

http://git-wip-us.apache.org/repos/asf/calcite/blob/884f0106/site/img/window-types.png
----------------------------------------------------------------------
diff --git a/site/img/window-types.png b/site/img/window-types.png
new file mode 100644
index 0000000..366c00a
Binary files /dev/null and b/site/img/window-types.png differ