You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/03/08 00:50:56 UTC

[1/4] incubator-calcite git commit: [CALCITE-611] Method setAggChildKeys should take into account indicator columns of Aggregate operator (Jesus Camacho Rodriguez)

Repository: incubator-calcite
Updated Branches:
  refs/heads/master 49ec28018 -> 0ecd8702a


[CALCITE-611] Method setAggChildKeys should take into account indicator columns of Aggregate operator (Jesus Camacho Rodriguez)

Close apache/incubator-calcite#57


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/c50a6e0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/c50a6e0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/c50a6e0d

Branch: refs/heads/master
Commit: c50a6e0d4fd6c8c28ce1c4715695a582c4818fb2
Parents: 49ec280
Author: Jesus Camacho Rodriguez <jc...@hortonworks.com>
Authored: Fri Mar 6 21:03:21 2015 +0000
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Mar 7 14:35:04 2015 -0800

----------------------------------------------------------------------
 core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java | 3 ++-
 .../java/org/apache/calcite/rel/rules/AggregateStarTableRule.java | 1 +
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c50a6e0d/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
index 24f5c48..721df1b 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
@@ -513,7 +513,8 @@ public class RelMdUtil {
       } else {
         // aggregate column -- set a bit for each argument being
         // aggregated
-        AggregateCall agg = aggCalls.get(bit - aggRel.getGroupCount());
+        AggregateCall agg = aggCalls.get(bit
+            - (aggRel.getGroupCount() + aggRel.getIndicatorCount()));
         for (Integer arg : agg.getArgList()) {
           childKey.set(arg);
         }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/c50a6e0d/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java b/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
index 10dfafc..de7f67f 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/AggregateStarTableRule.java
@@ -161,6 +161,7 @@ public class AggregateStarTableRule extends RelOptRule {
               tileKey.dimensions.cardinality() + tileKey.measures.size(),
               aggregate.getRowType().getFieldCount()) {
             public int getSourceOpt(int source) {
+              assert aggregate.getIndicatorCount() == 0;
               if (source < aggregate.getGroupCount()) {
                 int in = tileKey.dimensions.nth(source);
                 return aggregate.getGroupSet().indexOf(in);


[4/4] incubator-calcite git commit: [CALCITE-602] Streaming queries

Posted by jh...@apache.org.
[CALCITE-602] Streaming queries

Validate and implement streaming queries: streaming scan, project, filter, aggregate, sort.

Implement CEIL and FLOOR functions for date-time and numeric values.

Add CompositeSingleOperandTypeChecker, and make CompositeOperandTypeChecker work for multiple operands.


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0ecd8702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0ecd8702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0ecd8702

Branch: refs/heads/master
Commit: 0ecd8702ab95fd59b7ef2182720d12b2167ae968
Parents: c50a6e0
Author: Julian Hyde <jh...@apache.org>
Authored: Sat Feb 21 18:10:32 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Mar 7 15:33:21 2015 -0800

----------------------------------------------------------------------
 README.md                                       |   1 +
 .../calcite/avatica/util/DateTimeUtils.java     |  69 +++
 core/src/main/codegen/templates/Parser.jj       |  79 ++-
 .../adapter/enumerable/EnumerableTableScan.java |   4 +-
 .../calcite/adapter/enumerable/RexImpTable.java |  77 ++-
 .../org/apache/calcite/jdbc/CalciteSchema.java  |   7 +-
 .../java/org/apache/calcite/model/JsonRoot.java |   1 +
 .../org/apache/calcite/model/JsonStream.java    |  33 ++
 .../org/apache/calcite/model/JsonTable.java     |   8 +-
 .../calcite/prepare/CalcitePrepareImpl.java     |  10 +
 .../apache/calcite/prepare/RelOptTableImpl.java |  45 ++
 .../java/org/apache/calcite/rel/core/Sort.java  |   2 +-
 .../java/org/apache/calcite/rel/stream/Chi.java |  38 ++
 .../org/apache/calcite/rel/stream/Delta.java    |  49 ++
 .../apache/calcite/rel/stream/LogicalChi.java   |  33 ++
 .../apache/calcite/rel/stream/LogicalDelta.java |  61 +++
 .../apache/calcite/rel/stream/StreamRules.java  | 198 +++++++
 .../apache/calcite/rel/stream/package-info.java |  35 ++
 .../apache/calcite/runtime/CalciteResource.java |  18 +
 .../apache/calcite/runtime/SqlFunctions.java    | 110 ++++
 .../java/org/apache/calcite/schema/Schema.java  |   3 +
 .../apache/calcite/schema/StreamableTable.java  |  32 ++
 .../java/org/apache/calcite/sql/SqlKind.java    |  12 +-
 .../apache/calcite/sql/SqlSelectKeyword.java    |   3 +-
 .../calcite/sql/advise/SqlAdvisorValidator.java |   6 +
 .../apache/calcite/sql/fun/SqlCeilFunction.java |  52 --
 .../sql/fun/SqlCollectionTableOperator.java     |  23 +-
 .../calcite/sql/fun/SqlExtractFunction.java     |   4 +-
 .../calcite/sql/fun/SqlFloorFunction.java       |  38 +-
 .../calcite/sql/fun/SqlStdOperatorTable.java    |  23 +-
 .../sql/type/CompositeOperandTypeChecker.java   | 173 ++----
 .../type/CompositeSingleOperandTypeChecker.java | 119 +++++
 .../apache/calcite/sql/type/OperandTypes.java   |  43 +-
 .../calcite/sql/validate/AbstractNamespace.java |   4 +
 .../sql/validate/IdentifierNamespace.java       |  10 +-
 .../calcite/sql/validate/SelectNamespace.java   |   4 +
 .../calcite/sql/validate/SetopNamespace.java    |  34 ++
 .../calcite/sql/validate/SqlModality.java       |  25 +
 .../calcite/sql/validate/SqlValidator.java      |  13 +
 .../calcite/sql/validate/SqlValidatorImpl.java  | 181 ++++++-
 .../sql/validate/SqlValidatorNamespace.java     |   7 +
 .../calcite/sql/validate/SqlValidatorTable.java |   2 +
 .../sql/validate/TableConstructorNamespace.java |   4 +
 .../calcite/sql/validate/TableNamespace.java    |   5 +
 .../sql2rel/RelStructuredTypeFlattener.java     |  10 +
 .../calcite/sql2rel/SqlToRelConverter.java      |   9 +
 .../org/apache/calcite/util/BuiltInMethod.java  |  10 +
 .../calcite/runtime/CalciteResource.properties  |   6 +
 .../calcite/sql/parser/SqlParserTest.java       |  27 +
 .../apache/calcite/sql/test/SqlAdvisorTest.java |   5 +-
 .../calcite/sql/test/SqlOperatorBaseTest.java   | 262 ++++------
 .../org/apache/calcite/test/CalciteSuite.java   |   1 +
 .../apache/calcite/test/MockCatalogReader.java  |  62 ++-
 .../apache/calcite/test/RelOptRulesTest.java    |   2 +-
 .../apache/calcite/test/SqlFunctionsTest.java   |  46 +-
 .../calcite/test/SqlToRelConverterTest.java     |  21 +
 .../apache/calcite/test/SqlValidatorTest.java   | 207 ++++++--
 .../org/apache/calcite/test/StreamTest.java     | 296 +++++++++++
 .../calcite/test/SqlToRelConverterTest.xml      |  46 +-
 doc/REFERENCE.md                                |   6 +-
 doc/STREAM.md                                   | 521 +++++++++++++++++++
 .../java/org/apache/calcite/linq4j/Linq4j.java  |   7 +-
 .../calcite/linq4j/function/Functions.java      |   7 +-
 .../org/apache/calcite/linq4j/tree/Visitor.java |  11 +-
 64 files changed, 2745 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 48d485e..40458ec 100644
--- a/README.md
+++ b/README.md
@@ -198,6 +198,7 @@ For more details, see the <a href="doc/REFERENCE.md">Reference guide</a>.
 * <a href="doc/HOWTO.md">HOWTO</a>
 * <a href="doc/MODEL.md">JSON model</a>
 * <a href="doc/REFERENCE.md">Reference guide</a>
+* <a href="doc/STREAM.md">Streaming SQL</a>
 * <a href="doc/HISTORY.md">Release notes and history</a>
 
 ### Pre-Apache resources

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
index 084c027..8acdf93 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -700,6 +700,66 @@ public class DateTimeUtils {
     }
   }
 
+  public static long unixTimestampFloor(TimeUnitRange range, long timestamp) {
+    int date = (int) (timestamp / MILLIS_PER_DAY);
+    final int f = julianDateFloor(range, date + EPOCH_JULIAN, true);
+    return (long) f * MILLIS_PER_DAY;
+  }
+
+  public static long unixDateFloor(TimeUnitRange range, long date) {
+    return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+  }
+
+  public static long unixTimestampCeil(TimeUnitRange range, long timestamp) {
+    int date = (int) (timestamp / MILLIS_PER_DAY);
+    final int f = julianDateFloor(range, date + EPOCH_JULIAN, false);
+    return (long) f * MILLIS_PER_DAY;
+  }
+
+  public static long unixDateCeil(TimeUnitRange range, long date) {
+    return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+  }
+
+  private static int julianDateFloor(TimeUnitRange range, int julian,
+      boolean floor) {
+    // this shifts the epoch back to astronomical year -4800 instead of the
+    // start of the Christian era in year AD 1 of the proleptic Gregorian
+    // calendar.
+    int j = julian + 32044;
+    int g = j / 146097;
+    int dg = j % 146097;
+    int c = (dg / 36524 + 1) * 3 / 4;
+    int dc = dg - c * 36524;
+    int b = dc / 1461;
+    int db = dc % 1461;
+    int a = (db / 365 + 1) * 3 / 4;
+    int da = db - a * 365;
+
+    // integer number of full years elapsed since March 1, 4801 BC
+    int y = g * 400 + c * 100 + b * 4 + a;
+    // integer number of full months elapsed since the last March 1
+    int m = (da * 5 + 308) / 153 - 2;
+    // number of days elapsed since day 1 of the month
+    int d = da - (m + 4) * 153 / 5 + 122;
+    int year = y - 4800 + (m + 2) / 12;
+    int month = (m + 2) % 12 + 1;
+    int day = d + 1;
+    switch (range) {
+    case YEAR:
+      if (!floor && (month > 1 || day > 1)) {
+        ++year;
+      }
+      return ymdToUnixDate(year, 1, 1);
+    case MONTH:
+      if (!floor && day > 1) {
+        ++month;
+      }
+      return ymdToUnixDate(year, month, 1);
+    default:
+      throw new AssertionError(range);
+    }
+  }
+
   public static int ymdToUnixDate(int year, int month, int day) {
     final int julian = ymdToJulian(year, month, day);
     return julian - EPOCH_JULIAN;
@@ -721,6 +781,15 @@ public class DateTimeUtils {
     return j;
   }
 
+  public static long unixTimestamp(int year, int month, int day, int hour,
+      int minute, int second) {
+    final int date = ymdToUnixDate(year, month, day);
+    return (long) date * MILLIS_PER_DAY
+        + (long) hour * MILLIS_PER_HOUR
+        + (long) minute * MILLIS_PER_MINUTE
+        + (long) second * MILLIS_PER_SECOND;
+  }
+
   //~ Inner Classes ----------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/codegen/templates/Parser.jj
----------------------------------------------------------------------
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index cc9a555..6b62ce3 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -91,6 +91,7 @@ import org.apache.calcite.sql.parser.SqlParserImplFactory;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.parser.SqlParserUtil;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -304,12 +305,12 @@ SqlNode ExtendedBuiltinFunctionCall() :
 /*
 * Parse Floor/Ceil function parameters
 */
-SqlNode FloorCeilOptions( SqlParserPos pos, boolean floorFlag) :
+SqlNode FloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
 {
     SqlNode node;
 }
 {
-    node = StandardFloorCeilOptions( pos, floorFlag)
+    node = StandardFloorCeilOptions(pos, floorFlag)
     {
         return node;
     }
@@ -943,14 +944,18 @@ SqlSelect SqlSelect() :
     }
     SqlSelectKeywords(keywords)
     (
+        <STREAM> {
+            keywords.add(SqlSelectKeyword.STREAM.symbol(getPos()));
+        }
+    )?
+    (
         <DISTINCT> {
             keywords.add(SqlSelectKeyword.DISTINCT.symbol(getPos()));
         }
         |   <ALL> {
             keywords.add(SqlSelectKeyword.ALL.symbol(getPos()));
         }
-        |   E()
-    )
+    )?
     selectList = SelectList()
     <FROM>
     fromClause = FromClause()
@@ -1958,7 +1963,7 @@ SqlNodeList GroupByOpt() :
 {
     <GROUP> { pos = getPos(); }
     <BY> list = GroupingElementList() {
-        return new SqlNodeList(list, pos);
+        return new SqlNodeList(list, pos.plusAll(list));
     }
 |
     {
@@ -2222,19 +2227,16 @@ SqlNodeList OrderBy(boolean accept) :
     SqlParserPos pos;
 }
 {
-    <ORDER>
-    {
+    <ORDER> {
+        pos = getPos();
         if (!accept) {
             // Someone told us ORDER BY wasn't allowed here.  So why
             // did they bother calling us?  To get the correct
             // parser position for error reporting.
-            throw SqlUtil.newContextException(getPos(),
-                RESOURCE.illegalOrderBy());
+            throw SqlUtil.newContextException(pos, RESOURCE.illegalOrderBy());
         }
     }
-    <BY> e = OrderItem()
-    {
-        pos = getPos();
+    <BY> e = OrderItem() {
         list = startList(e);
     }
     (
@@ -2243,7 +2245,7 @@ SqlNodeList OrderBy(boolean accept) :
         LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
     ) *
     {
-        return new SqlNodeList(list, pos);
+        return new SqlNodeList(list, pos.plusAll(list));
     }
 }
 
@@ -2253,29 +2255,22 @@ SqlNodeList OrderBy(boolean accept) :
 SqlNode OrderItem() :
 {
     SqlNode e;
-    SqlParserPos pos;
 }
 {
     e = Expression(ExprContext.ACCEPT_SUBQUERY)
     (
         <ASC>
-        | <DESC>
-        {
-            pos = getPos();
-            e = SqlStdOperatorTable.DESC.createCall(pos, e);
+    |   <DESC> {
+            e = SqlStdOperatorTable.DESC.createCall(getPos(), e);
         }
     )?
     (
-        <NULLS> <FIRST>
-        {
-            pos = getPos();
-            e = SqlStdOperatorTable.NULLS_FIRST.createCall(pos, e);
+        <NULLS> <FIRST> {
+            e = SqlStdOperatorTable.NULLS_FIRST.createCall(getPos(), e);
         }
     |
-        <NULLS> <LAST>
-        {
-            pos = getPos();
-            e = SqlStdOperatorTable.NULLS_LAST.createCall(pos, e);
+        <NULLS> <LAST> {
+            e = SqlStdOperatorTable.NULLS_LAST.createCall(getPos(), e);
         }
     )?
     {
@@ -3864,7 +3859,7 @@ SqlNode BuiltinFunctionCall() :
         }
         <LPAREN>
         unit = TimeUnit()
-        { args = startList(new SqlIntervalQualifier(unit, null,getPos())); }
+        { args = startList(new SqlIntervalQualifier(unit, null, getPos())); }
         <FROM>
         e = Expression(ExprContext.ACCEPT_SUBQUERY) { args.add(e); }
         <RPAREN>
@@ -4183,13 +4178,22 @@ SqlNode StandardFloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
     SqlIdentifier name;
     SqlParserPos overPos = null;
     SqlIdentifier id = null;
-    SqlNode e = null;
-    SqlCall function = null;
-    SqlNodeList args;
+    SqlNode e;
+    List<SqlNode> args;
+    TimeUnit unit;
     boolean over = false;
 }
 {
-    args = ParenthesizedQueryOrCommaList(ExprContext.ACCEPT_SUBQUERY)
+    <LPAREN> e = Expression(ExprContext.ACCEPT_SUBQUERY) {
+        args = startList(e);
+    }
+    (
+        <TO>
+        unit = TimeUnit() {
+            args.add(new SqlIntervalQualifier(unit, null, getPos()));
+        }
+    )?
+    <RPAREN>
     [
         <OVER>
         {
@@ -4205,16 +4209,10 @@ SqlNode StandardFloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
         SqlOperator op = floorFlag
             ? SqlStdOperatorTable.FLOOR
             : SqlStdOperatorTable.CEIL;
-        function =  op.createCall(
-            pos, args.toArray());
+        final SqlCall function =  op.createCall(pos.plus(getPos()), args);
         if (over) {
-            if (id != null) {
-                return SqlStdOperatorTable.OVER.createCall(
-                    overPos, new SqlNode[] {function, id});
-            } else {
-                return SqlStdOperatorTable.OVER.createCall(
-                    overPos, new SqlNode[] { function, e });
-            }
+            return SqlStdOperatorTable.OVER.createCall(overPos, function,
+                Util.first(id, e));
         } else {
             return function;
         }
@@ -4981,6 +4979,7 @@ SqlPostfixOperator PostfixRowOperator() :
     | < STATIC: "STATIC" >
     | < STDDEV_POP: "STDDEV_POP" >
     | < STDDEV_SAMP: "STDDEV_SAMP" >
+    | < STREAM: "STREAM" >
     | < STRUCTURE: "STRUCTURE" >
     | < STYLE: "STYLE" >
     | < SUBCLASS_ORIGIN: "SUBCLASS_ORIGIN" >

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
index ddd3212..ad67052 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
@@ -37,6 +37,7 @@ import org.apache.calcite.schema.FilterableTable;
 import org.apache.calcite.schema.ProjectableFilterableTable;
 import org.apache.calcite.schema.QueryableTable;
 import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.BuiltInMethod;
 
@@ -104,7 +105,8 @@ public class EnumerableTableScan
       }
     } else if (table instanceof ScannableTable
         || table instanceof FilterableTable
-        || table instanceof ProjectableFilterableTable) {
+        || table instanceof ProjectableFilterableTable
+        || table instanceof StreamableTable) {
       return Object[].class;
     } else {
       return Object.class;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index e1fd9b0..29c107f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -17,6 +17,8 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.BlockStatement;
@@ -236,14 +238,20 @@ public class RexImpTable {
     defineMethod(LN, "ln", NullPolicy.STRICT);
     defineMethod(LOG10, "log10", NullPolicy.STRICT);
     defineMethod(ABS, "abs", NullPolicy.STRICT);
-    defineMethod(CEIL, "ceil", NullPolicy.STRICT);
-    defineMethod(FLOOR, "floor", NullPolicy.STRICT);
 
     // datetime
     defineImplementor(DATETIME_PLUS, NullPolicy.STRICT,
         new DatetimeArithmeticImplementor(), false);
     defineMethod(EXTRACT_DATE, BuiltInMethod.UNIX_DATE_EXTRACT.method,
         NullPolicy.STRICT);
+    defineImplementor(FLOOR, NullPolicy.STRICT,
+        new FloorImplementor(BuiltInMethod.FLOOR.method.getName(),
+            BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method,
+            BuiltInMethod.UNIX_DATE_FLOOR.method), false);
+    defineImplementor(CEIL, NullPolicy.STRICT,
+        new FloorImplementor(BuiltInMethod.CEIL.method.getName(),
+            BuiltInMethod.UNIX_TIMESTAMP_CEIL.method,
+            BuiltInMethod.UNIX_DATE_CEIL.method), false);
 
     map.put(IS_NULL, new IsXxxImplementor(null, false));
     map.put(IS_NOT_NULL, new IsXxxImplementor(null, true));
@@ -1402,9 +1410,70 @@ public class RexImpTable {
     }
   }
 
+  /** Implementor for the {@code FLOOR} and {@code CEIL} functions. */
+  private static class FloorImplementor extends MethodNameImplementor {
+    final Method timestampMethod;
+    final Method dateMethod;
+
+    FloorImplementor(String methodName, Method timestampMethod,
+        Method dateMethod) {
+      super(methodName);
+      this.timestampMethod = timestampMethod;
+      this.dateMethod = dateMethod;
+    }
+
+    public Expression implement(RexToLixTranslator translator, RexCall call,
+        List<Expression> translatedOperands) {
+      switch (call.getOperands().size()) {
+      case 1:
+        switch (call.getType().getSqlTypeName()) {
+        case BIGINT:
+        case INTEGER:
+        case SMALLINT:
+        case TINYINT:
+          return translatedOperands.get(0);
+        }
+        return super.implement(translator, call, translatedOperands);
+      case 2:
+        final Type type;
+        final Method floorMethod;
+        switch (call.getType().getSqlTypeName()) {
+        case TIMESTAMP:
+          type = long.class;
+          floorMethod = timestampMethod;
+          break;
+        default:
+          type = int.class;
+          floorMethod = dateMethod;
+        }
+        final ConstantExpression tur =
+            (ConstantExpression) translatedOperands.get(1);
+        final TimeUnitRange timeUnitRange = (TimeUnitRange) tur.value;
+        switch (timeUnitRange) {
+        case YEAR:
+        case MONTH:
+          return Expressions.call(floorMethod, tur,
+              call(translatedOperands, type, TimeUnit.DAY));
+        default:
+          return call(translatedOperands, type, timeUnitRange.startUnit);
+        }
+      default:
+        throw new AssertionError();
+      }
+    }
+
+    private Expression call(List<Expression> translatedOperands, Type type,
+        TimeUnit timeUnit) {
+      return Expressions.call(SqlFunctions.class, methodName,
+          Types.castIfNecessary(type, translatedOperands.get(0)),
+          Types.castIfNecessary(type,
+              Expressions.constant(timeUnit.multiplier)));
+    }
+  }
+
   /** Implementor for a function that generates calls to a given method. */
   private static class MethodImplementor implements NotNullImplementor {
-    private final Method method;
+    protected final Method method;
 
     MethodImplementor(Method method) {
       this.method = method;
@@ -1428,7 +1497,7 @@ public class RexImpTable {
    * <p>Use this, as opposed to {@link MethodImplementor}, if the SQL function
    * is overloaded; then you can use one implementor for several overloads. */
   private static class MethodNameImplementor implements NotNullImplementor {
-    private final String methodName;
+    protected final String methodName;
 
     MethodNameImplementor(String methodName) {
       this.methodName = methodName;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
index 4b34c3e..346b721 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
@@ -16,7 +16,6 @@
  */
 package org.apache.calcite.jdbc;
 
-import org.apache.calcite.linq4j.Linq4j;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.materialize.Lattice;
 import org.apache.calcite.schema.Function;
@@ -522,10 +521,8 @@ public class CalciteSchema {
     public final String name;
 
     public Entry(CalciteSchema schema, String name) {
-      Linq4j.requireNonNull(schema);
-      Linq4j.requireNonNull(name);
-      this.schema = schema;
-      this.name = name;
+      this.schema = Preconditions.checkNotNull(schema);
+      this.name = Preconditions.checkNotNull(name);
     }
 
     /** Returns this object's path. For example ["hr", "emps"]. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonRoot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonRoot.java b/core/src/main/java/org/apache/calcite/model/JsonRoot.java
index 25e1fa3..ba73e5b 100644
--- a/core/src/main/java/org/apache/calcite/model/JsonRoot.java
+++ b/core/src/main/java/org/apache/calcite/model/JsonRoot.java
@@ -32,6 +32,7 @@ import java.util.List;
  *   {@link JsonSchema} (in collection {@link JsonRoot#schemas schemas})
  *     {@link JsonTable} (in collection {@link JsonMapSchema#tables tables})
  *       {@link JsonColumn} (in collection {@link JsonTable#columns columns}
+ *       {@link JsonStream} (in field {@link JsonTable#stream stream}
  *     {@link JsonView}
  *     {@link JsonFunction} (in collection {@link JsonMapSchema#functions functions})
  *     {@link JsonLattice} (in collection {@link JsonSchema#lattices lattices})

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonStream.java b/core/src/main/java/org/apache/calcite/model/JsonStream.java
new file mode 100644
index 0000000..f8d728a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/model/JsonStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.model;
+
+/**
+ * Information about whether a table allows streaming.
+ *
+ * @see org.apache.calcite.model.JsonRoot Description of schema elements
+ * @see org.apache.calcite.model.JsonTable#stream
+ */
+public class JsonStream {
+  /** Whether the table allows streaming. */
+  public boolean stream = true;
+
+  /** Whether the history of the table is available. */
+  public boolean history = false;
+}
+
+// End JsonStream.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonTable.java b/core/src/main/java/org/apache/calcite/model/JsonTable.java
index c0e7d8d..806405b 100644
--- a/core/src/main/java/org/apache/calcite/model/JsonTable.java
+++ b/core/src/main/java/org/apache/calcite/model/JsonTable.java
@@ -18,8 +18,8 @@ package org.apache.calcite.model;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.collect.Lists;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -36,7 +36,11 @@ import java.util.List;
     @JsonSubTypes.Type(value = JsonView.class, name = "view") })
 public abstract class JsonTable {
   public String name;
-  public final List<JsonColumn> columns = new ArrayList<JsonColumn>();
+  public final List<JsonColumn> columns = Lists.newArrayList();
+
+  /** Information about whether the table can be streamed, and if so, whether
+   * the history of the table is also available. */
+  public JsonStream stream;
 
   public abstract void accept(ModelHandler handler);
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
index 2f884c1..e655e67 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.rel.rules.ReduceExpressionsRule;
 import org.apache.calcite.rel.rules.SortProjectTransposeRule;
 import org.apache.calcite.rel.rules.TableScanRule;
 import org.apache.calcite.rel.rules.ValuesReduceRule;
+import org.apache.calcite.rel.stream.StreamRules;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
@@ -154,6 +155,9 @@ public class CalcitePrepareImpl implements CalcitePrepare {
   /** Whether the enumerable convention is enabled. */
   public static final boolean ENABLE_ENUMERABLE = true;
 
+  /** Whether the streaming is enabled. */
+  public static final boolean ENABLE_STREAM = true;
+
   private static final Set<String> SIMPLE_SQLS =
       ImmutableSet.of(
           "SELECT 1",
@@ -338,6 +342,12 @@ public class CalcitePrepareImpl implements CalcitePrepare {
           EnumerableBindable.EnumerableToBindableConverterRule.INSTANCE);
     }
 
+    if (ENABLE_STREAM) {
+      for (RelOptRule rule : StreamRules.RULES) {
+        planner.addRule(rule);
+      }
+    }
+
     // Change the below to enable constant-reduction.
     if (false) {
       for (RelOptRule rule : CONSTANT_REDUCTION_RULES) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
index e47ea5b..8925ede 100644
--- a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
@@ -35,9 +36,11 @@ import org.apache.calcite.schema.ProjectableFilterableTable;
 import org.apache.calcite.schema.QueryableTable;
 import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.sql.SqlAccessType;
+import org.apache.calcite.sql.validate.SqlModality;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
@@ -125,6 +128,9 @@ public class RelOptTableImpl implements Prepare.PreparingTable {
               table.getClass());
         }
       };
+    } else if (table instanceof StreamableTable) {
+      return getClassExpressionFunction(tableEntry,
+          ((StreamableTable) table).stream());
     } else {
       return new Function<Class, Expression>() {
         public Expression apply(Class input) {
@@ -243,14 +249,53 @@ public class RelOptTableImpl implements Prepare.PreparingTable {
     return rowType;
   }
 
+  public boolean supportsModality(SqlModality modality) {
+    switch (modality) {
+    case STREAM:
+      return table instanceof StreamableTable;
+    default:
+      return !(table instanceof StreamableTable);
+    }
+  }
+
   public List<String> getQualifiedName() {
     return names;
   }
 
   public SqlMonotonicity getMonotonicity(String columnName) {
+    final int i = rowType.getFieldNames().indexOf(columnName);
+    if (i >= 0) {
+      for (RelCollation collation : table.getStatistic().getCollations()) {
+        final RelFieldCollation fieldCollation =
+            collation.getFieldCollations().get(0);
+        if (fieldCollation.getFieldIndex() == i) {
+          return monotonicity(fieldCollation.direction);
+        }
+      }
+    }
     return SqlMonotonicity.NOT_MONOTONIC;
   }
 
+  /** Converts a {@link org.apache.calcite.rel.RelFieldCollation.Direction}
+   * value to a {@link org.apache.calcite.sql.validate.SqlMonotonicity}. */
+  private static SqlMonotonicity
+  monotonicity(RelFieldCollation.Direction direction) {
+    switch (direction) {
+    case ASCENDING:
+      return SqlMonotonicity.INCREASING;
+    case STRICTLY_ASCENDING:
+      return SqlMonotonicity.STRICTLY_INCREASING;
+    case DESCENDING:
+      return SqlMonotonicity.DECREASING;
+    case STRICTLY_DESCENDING:
+      return SqlMonotonicity.STRICTLY_DECREASING;
+    case CLUSTERED:
+      return SqlMonotonicity.MONOTONIC;
+    default:
+      throw new AssertionError("unknown: " + direction);
+    }
+  }
+
   public SqlAccessType getAllowedAccess() {
     return SqlAccessType.ALL;
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/core/Sort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Sort.java b/core/src/main/java/org/apache/calcite/rel/core/Sort.java
index 06f9299..022479b 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Sort.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Sort.java
@@ -45,7 +45,7 @@ import java.util.List;
 public abstract class Sort extends SingleRel {
   //~ Instance fields --------------------------------------------------------
 
-  protected final RelCollation collation;
+  public final RelCollation collation;
   protected final ImmutableList<RexNode> fieldExps;
   public final RexNode offset;
   public final RexNode fetch;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/Chi.java b/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
new file mode 100644
index 0000000..36e03c0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+
+/**
+ * Relational operator that converts a stream to a relation.
+ *
+ * <p>Chi is named for the Greek letter &chi; and pronounced 'kai'.
+ *
+ * <p>Chi is the inverse of {@link Delta}. For any relation {@code R},
+ * Chi(Delta(R)) == R.
+ */
+public class Chi extends SingleRel {
+  protected Chi(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    super(cluster, traits, input);
+  }
+}
+
+// End Chi.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/Delta.java b/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
new file mode 100644
index 0000000..ce1417c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.TableScan;
+
+/**
+ * Relational operator that converts a relation to a stream.
+ *
+ * <p>For example, if {@code Orders} is a table, and {@link TableScan}(Orders)
+ * is a relational operator that returns the current contents of the table,
+ * then {@link Delta}(TableScan(Orders)) is a relational operator that returns
+ * all inserts into the table.
+ *
+ * <p>If unrestricted, Delta returns all previous inserts into the table (from
+ * time -&infin; to now) and all future inserts into the table (from now
+ * to +&infin;) and never terminates.
+ */
+public abstract class Delta extends SingleRel {
+  protected Delta(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    super(cluster, traits, input);
+  }
+
+  /** Creates a Delta by parsing serialized output. */
+  protected Delta(RelInput input) {
+    this(input.getCluster(), input.getTraitSet(), input.getInput());
+  }
+}
+
+// End Delta.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java b/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
new file mode 100644
index 0000000..fad29e7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Sub-class of {@link Chi}
+ * not targeted at any particular engine or calling convention.
+ */
+public final class LogicalChi extends Chi {
+  public LogicalChi(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    super(cluster, traits, input);
+  }
+}
+
+// End LogicalChi.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java b/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
new file mode 100644
index 0000000..05de63b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.List;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.stream.Delta}
+ * not targeted at any particular engine or calling convention.
+ */
+public final class LogicalDelta extends Delta {
+  /**
+   * Creates a LogicalDelta.
+   *
+   * <p>Use {@link #create} unless you know what you're doing.
+   *
+   * @param cluster   Cluster that this relational expression belongs to
+   * @param input     Input relational expression
+   */
+  public LogicalDelta(RelOptCluster cluster, RelTraitSet traits,
+      RelNode input) {
+    super(cluster, traits, input);
+  }
+
+  /** Creates a LogicalDelta by parsing serialized output. */
+  public LogicalDelta(RelInput input) {
+    super(input);
+  }
+
+  /** Creates a LogicalDelta. */
+  public static LogicalDelta create(RelNode input) {
+    final RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE);
+    return new LogicalDelta(input.getCluster(), traitSet, input);
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new LogicalDelta(getCluster(), traitSet, sole(inputs));
+  }
+}
+
+// End LogicalDelta.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
new file mode 100644
index 0000000..28b9972
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Rules and relational operators for streaming relational expressions.
+ */
+public class StreamRules {
+  private StreamRules() {}
+
+  public static final ImmutableList<RelOptRule> RULES =
+      ImmutableList.of(
+          new DeltaProjectTransposeRule(),
+          new DeltaFilterTransposeRule(),
+          new DeltaAggregateTransposeRule(),
+          new DeltaSortTransposeRule(),
+          new DeltaUnionTransposeRule(),
+          new DeltaTableScanRule());
+
+  /** Planner rule that pushes a {@link Delta} through a {@link Project}. */
+  public static class DeltaProjectTransposeRule extends RelOptRule {
+    private DeltaProjectTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Project.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Project project = call.rel(1);
+      final LogicalDelta newDelta = LogicalDelta.create(project.getInput());
+      final LogicalProject newProject =
+          LogicalProject.create(newDelta, project.getProjects(),
+              project.getRowType().getFieldNames());
+      call.transformTo(newProject);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through a {@link Filter}. */
+  public static class DeltaFilterTransposeRule extends RelOptRule {
+    private DeltaFilterTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Filter.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Filter filter = call.rel(1);
+      final LogicalDelta newDelta = LogicalDelta.create(filter.getInput());
+      final LogicalFilter newFilter =
+          LogicalFilter.create(newDelta, filter.getCondition());
+      call.transformTo(newFilter);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through an {@link Aggregate}. */
+  public static class DeltaAggregateTransposeRule extends RelOptRule {
+    private DeltaAggregateTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Aggregate.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Aggregate aggregate = call.rel(1);
+      final LogicalDelta newDelta =
+          LogicalDelta.create(aggregate.getInput());
+      final LogicalAggregate newAggregate =
+          LogicalAggregate.create(newDelta, aggregate.indicator,
+              aggregate.getGroupSet(), aggregate.groupSets,
+              aggregate.getAggCallList());
+      call.transformTo(newAggregate);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through an {@link Sort}. */
+  public static class DeltaSortTransposeRule extends RelOptRule {
+    private DeltaSortTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Sort.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Sort sort = call.rel(1);
+      final LogicalDelta newDelta =
+          LogicalDelta.create(sort.getInput());
+      final LogicalSort newSort =
+          LogicalSort.create(newDelta, sort.collation, sort.offset, sort.fetch);
+      call.transformTo(newSort);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through an {@link Union}. */
+  public static class DeltaUnionTransposeRule extends RelOptRule {
+    private DeltaUnionTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Union.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Union union = call.rel(1);
+      final List<RelNode> newInputs = Lists.newArrayList();
+      for (RelNode input : union.getInputs()) {
+        final LogicalDelta newDelta =
+            LogicalDelta.create(input);
+        newInputs.add(newDelta);
+      }
+      final LogicalUnion newUnion = LogicalUnion.create(newInputs, union.all);
+      call.transformTo(newUnion);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} into a {@link TableScan} of a
+   * {@link org.apache.calcite.schema.StreamableTable}.
+   *
+   * <p>Very likely, the stream was only represented as a table for uniformity
+   * with the other relations in the system. The Delta disappears and the stream
+   * can be implemented directly. */
+  public static class DeltaTableScanRule extends RelOptRule {
+    private DeltaTableScanRule() {
+      super(
+          operand(Delta.class,
+              operand(TableScan.class, none())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final TableScan scan = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      final RelOptTable relOptTable = scan.getTable();
+      final StreamableTable streamableTable =
+          relOptTable.unwrap(StreamableTable.class);
+      if (streamableTable != null) {
+        final Table table1 = streamableTable.stream();
+        final RelOptTable relOptTable2 =
+            RelOptTableImpl.create(relOptTable.getRelOptSchema(),
+                relOptTable.getRowType(), table1);
+        final LogicalTableScan newScan =
+            LogicalTableScan.create(cluster, relOptTable2);
+        call.transformTo(newScan);
+      }
+    }
+  }
+}
+
+// End StreamRules.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/package-info.java b/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
new file mode 100644
index 0000000..12c680a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines relational expressions for streaming.
+ *
+ * <h2>Related packages and classes</h2>
+ * <ul>
+ *
+ * <li>Package <code>
+ * <a href="../core/package-summary.html">org.apache.calcite.rel.core</a></code>
+ * contains core relational expressions
+ *
+ * </ul>
+ */
+@PackageMarker
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 842ed7d..245d00a 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -549,6 +549,24 @@ public interface CalciteResource {
 
   @BaseMessage("FilterableTable.scan must not return null")
   ExInst<CalciteException> filterableTableScanReturnedNull();
+
+  @BaseMessage("Cannot convert table ''{0}'' to stream")
+  ExInst<SqlValidatorException> cannotConvertToStream(String tableName);
+
+  @BaseMessage("Cannot convert stream ''{0}'' to relation")
+  ExInst<SqlValidatorException> cannotConvertToRelation(String tableName);
+
+  @BaseMessage("Streaming aggregation requires at least one monotonic expression in GROUP BY clause")
+  ExInst<SqlValidatorException> streamMustGroupByMonotonic();
+
+  @BaseMessage("Streaming ORDER BY must start with monotonic expression")
+  ExInst<SqlValidatorException> streamMustOrderByMonotonic();
+
+  @BaseMessage("Set operator cannot combine streaming and non-streaming inputs")
+  ExInst<SqlValidatorException> streamSetOpInconsistentInputs();
+
+  @BaseMessage("Cannot stream VALUES")
+  ExInst<SqlValidatorException> cannotStreamValues();
 }
 
 // End CalciteResource.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
index fabdf09..77e6827 100644
--- a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
+++ b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
@@ -687,6 +687,116 @@ public class SqlFunctions {
     return bigDecimals[1];
   }
 
+  // FLOOR
+
+  public static double floor(double b0) {
+    return Math.floor(b0);
+  }
+
+  public static BigDecimal floor(BigDecimal b0) {
+    return b0.setScale(0, BigDecimal.ROUND_FLOOR);
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to byte values. */
+  public static byte floor(byte b0, byte b1) {
+    return (byte) floor((int) b0, (int) b1);
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to short values. */
+  public static short floor(short b0, short b1) {
+    return (short) floor((int) b0, (int) b1);
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to int values. */
+  public static int floor(int b0, int b1) {
+    int r = b0 % b1;
+    if (r < 0) {
+      r += b1;
+    }
+    return b0 - r;
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to long values. */
+  public static long floor(long b0, long b1) {
+    long r = b0 % b1;
+    if (r < 0) {
+      r += b1;
+    }
+    return b0 - r;
+  }
+
+  // temporary
+  public static BigDecimal floor(BigDecimal b0, int b1) {
+    return floor(b0, BigDecimal.valueOf(b1));
+  }
+
+  // temporary
+  public static int floor(int b0, BigDecimal b1) {
+    return floor(b0, b1.intValue());
+  }
+
+  public static BigDecimal floor(BigDecimal b0, BigDecimal b1) {
+    final BigDecimal[] bigDecimals = b0.divideAndRemainder(b1);
+    BigDecimal r = bigDecimals[1];
+    if (r.signum() < 0) {
+      r = r.add(b1);
+    }
+    return b0.subtract(r);
+  }
+
+  // CEIL
+
+  public static double ceil(double b0) {
+    return Math.ceil(b0);
+  }
+
+  public static BigDecimal ceil(BigDecimal b0) {
+    return b0.setScale(0, BigDecimal.ROUND_CEILING);
+  }
+
+  /** SQL <code>CEIL</code> operator applied to byte values. */
+  public static byte ceil(byte b0, byte b1) {
+    return floor((byte) (b0 + b1 - 1), b1);
+  }
+
+  /** SQL <code>CEIL</code> operator applied to short values. */
+  public static short ceil(short b0, short b1) {
+    return floor((short) (b0 + b1 - 1), b1);
+  }
+
+  /** SQL <code>CEIL</code> operator applied to int values. */
+  public static int ceil(int b0, int b1) {
+    int r = b0 % b1;
+    if (r > 0) {
+      r -= b1;
+    }
+    return b0 - r;
+  }
+
+  /** SQL <code>CEIL</code> operator applied to long values. */
+  public static long ceil(long b0, long b1) {
+    return floor(b0 + b1 - 1, b1);
+  }
+
+  // temporary
+  public static BigDecimal ceil(BigDecimal b0, int b1) {
+    return ceil(b0, BigDecimal.valueOf(b1));
+  }
+
+  // temporary
+  public static int ceil(int b0, BigDecimal b1) {
+    return ceil(b0, b1.intValue());
+  }
+
+  public static BigDecimal ceil(BigDecimal b0, BigDecimal b1) {
+    final BigDecimal[] bigDecimals = b0.divideAndRemainder(b1);
+    BigDecimal r = bigDecimals[1];
+    if (r.signum() > 0) {
+      r = r.subtract(b1);
+    }
+    return b0.subtract(r);
+  }
+
   // ABS
 
   /** SQL <code>ABS</code> operator applied to byte values. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/schema/Schema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/Schema.java b/core/src/main/java/org/apache/calcite/schema/Schema.java
index ecda690..065ec5d 100644
--- a/core/src/main/java/org/apache/calcite/schema/Schema.java
+++ b/core/src/main/java/org/apache/calcite/schema/Schema.java
@@ -181,6 +181,9 @@ public interface Schema {
      * <p>Used by Apache Phoenix, and others. Must have a single BIGINT column
      * called "$seq". */
     SEQUENCE,
+
+    /** Stream. */
+    STREAM,
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/StreamableTable.java b/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
new file mode 100644
index 0000000..a9dcdf8
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schema;
+
+import org.apache.calcite.rel.stream.Delta;
+
+/**
+ * Table that can be converted to a stream.
+ *
+ * @see Delta
+ */
+public interface StreamableTable extends Table {
+  /** Returns an enumerator over the rows in this Table. Each row is represented
+   * as an array of its column values. */
+  Table stream();
+}
+
+// End StreamableTable.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/SqlKind.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlKind.java b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
index 2f3cf26..5fa715e 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlKind.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
@@ -493,6 +493,16 @@ public enum SqlKind {
   CURRENT_VALUE,
 
   /**
+   * The "FLOOR" function
+   */
+  FLOOR,
+
+  /**
+   * The "CEIL" function
+   */
+  CEIL,
+
+  /**
    * The "TRIM" function.
    */
   TRIM,
@@ -639,7 +649,7 @@ public enum SqlKind {
       EnumSet.complementOf(
           EnumSet.of(
               AS, DESCENDING, CUBE, ROLLUP, GROUPING_SETS, EXTEND,
-              SELECT, JOIN, OTHER_FUNCTION, CAST, TRIM,
+              SELECT, JOIN, OTHER_FUNCTION, CAST, TRIM, FLOOR, CEIL,
               LITERAL_CHAIN, JDBC_FN, PRECEDING, FOLLOWING, ORDER_BY,
               NULLS_FIRST, NULLS_LAST, COLLECTION_TABLE, TABLESAMPLE,
               WITH, WITH_ITEM));

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java b/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
index c016289..1ca4158 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
@@ -23,7 +23,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
  */
 public enum SqlSelectKeyword implements SqlLiteral.SqlSymbol {
   DISTINCT,
-  ALL;
+  ALL,
+  STREAM;
 
   /**
    * Creates a parse-tree node representing an occurrence of this keyword

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java b/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
index b8ddaf4..a7228af 100644
--- a/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
+++ b/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.OverScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlModality;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -194,6 +195,11 @@ public class SqlAdvisorValidator extends SqlValidatorImpl {
     }
   }
 
+  @Override public boolean validateModality(SqlSelect select,
+      SqlModality modality, boolean fail) {
+    return true;
+  }
+
   protected boolean shouldAllowOverRelation() {
     return true; // no reason not to be lenient
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
deleted file mode 100644
index a984c94..0000000
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-import org.apache.calcite.sql.validate.SqlValidatorScope;
-
-/**
- * Support for the CEIL/CEILING builtin function.
- */
-public class SqlCeilFunction extends SqlFunction {
-  //~ Constructors -----------------------------------------------------------
-
-  public SqlCeilFunction() {
-    super(
-        "CEIL",
-        SqlKind.OTHER_FUNCTION,
-        ReturnTypes.ARG0,
-        null,
-        OperandTypes.NUMERIC,
-        SqlFunctionCategory.NUMERIC);
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  public SqlMonotonicity getMonotonicity(SqlCall call,
-      SqlValidatorScope scope) {
-    return scope.getMonotonicity(call.operand(0));
-  }
-}
-
-// End SqlCeilFunction.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
index a8b8535..385cfb5 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
@@ -20,6 +20,7 @@ import org.apache.calcite.sql.SqlFunctionalOperator;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.validate.SqlModality;
 
 /**
  * SqlCollectionTableOperator is the "table function derived table" operator. It
@@ -30,33 +31,19 @@ import org.apache.calcite.sql.type.ReturnTypes;
  * {@link SqlStdOperatorTable#EXPLICIT_TABLE} is a prefix operator.
  */
 public class SqlCollectionTableOperator extends SqlFunctionalOperator {
-  //~ Static fields/initializers ---------------------------------------------
-
-  public static final int MODALITY_RELATIONAL = 1;
-  public static final int MODALITY_STREAM = 2;
-
-  //~ Instance fields --------------------------------------------------------
-
-  private final int modality;
+  private final SqlModality modality;
 
   //~ Constructors -----------------------------------------------------------
 
-  public SqlCollectionTableOperator(String name, int modality) {
-    super(
-        name,
-        SqlKind.COLLECTION_TABLE,
-        200,
-        true,
-        ReturnTypes.ARG0,
-        null,
+  public SqlCollectionTableOperator(String name, SqlModality modality) {
+    super(name, SqlKind.COLLECTION_TABLE, 200, true, ReturnTypes.ARG0, null,
         OperandTypes.ANY);
-
     this.modality = modality;
   }
 
   //~ Methods ----------------------------------------------------------------
 
-  public int getModality() {
+  public SqlModality getModality() {
     return modality;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
index 7e43262..1e870c3 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
@@ -59,9 +59,9 @@ public class SqlExtractFunction extends SqlFunction {
       int leftPrec,
       int rightPrec) {
     final SqlWriter.Frame frame = writer.startFunCall(getName());
-    call.operand(0).unparse(writer, leftPrec, rightPrec);
+    call.operand(0).unparse(writer, 0, 0);
     writer.sep("FROM");
-    call.operand(1).unparse(writer, leftPrec, rightPrec);
+    call.operand(1).unparse(writer, 0, 0);
     writer.endFunCall(frame);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
index bec351c..e9d147f 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
@@ -17,28 +17,33 @@
 package org.apache.calcite.sql.fun;
 
 import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 
+import com.google.common.base.Preconditions;
+
 /**
- * Definition of the "FLOOR" builtin SQL function.
+ * Definition of the "FLOOR" and "CEIL" built-in SQL functions.
  */
-public class SqlFloorFunction extends SqlFunction {
+public class SqlFloorFunction extends SqlMonotonicUnaryFunction {
   //~ Constructors -----------------------------------------------------------
 
-  public SqlFloorFunction() {
-    super(
-        "FLOOR",
-        SqlKind.OTHER_FUNCTION,
-        ReturnTypes.ARG0,
-        null,
-        OperandTypes.NUMERIC,
+  public SqlFloorFunction(SqlKind kind) {
+    super(kind.name(), kind, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null,
+        OperandTypes.or(OperandTypes.NUMERIC_OR_INTERVAL,
+            OperandTypes.sequence(
+                "'" + kind + "(<DATE> TO <TIME_UNIT>)'\n"
+                + "'" + kind + "(<TIME> TO <TIME_UNIT>)'\n"
+                + "'" + kind + "(<TIMESTAMP> TO <TIME_UNIT>)'",
+                OperandTypes.DATETIME,
+                OperandTypes.ANY)),
         SqlFunctionCategory.NUMERIC);
+    Preconditions.checkArgument(kind == SqlKind.FLOOR || kind == SqlKind.CEIL);
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -49,6 +54,19 @@ public class SqlFloorFunction extends SqlFunction {
     // Monotonic iff its first argument is, but not strict.
     return scope.getMonotonicity(call.operand(0)).unstrict();
   }
+
+  @Override public void unparse(SqlWriter writer, SqlCall call, int leftPrec,
+      int rightPrec) {
+    final SqlWriter.Frame frame = writer.startFunCall(getName());
+    if (call.operandCount() == 2) {
+      call.operand(0).unparse(writer, 0, 100);
+      writer.sep("TO");
+      call.operand(1).unparse(writer, 100, 0);
+    } else {
+      call.operand(0).unparse(writer, 0, 0);
+    }
+    writer.endFunCall(frame);
+  }
 }
 
 // End SqlFloorFunction.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 9e5644d..ef7133b 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlModality;
 
 import com.google.common.collect.ImmutableList;
 
@@ -958,9 +959,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
    * {@link #EXPLICIT_TABLE} is a prefix operator.
    */
   public static final SqlSpecialOperator COLLECTION_TABLE =
-      new SqlCollectionTableOperator(
-          "TABLE",
-          SqlCollectionTableOperator.MODALITY_RELATIONAL);
+      new SqlCollectionTableOperator("TABLE", SqlModality.RELATION);
 
   public static final SqlOverlapsOperator OVERLAPS =
       new SqlOverlapsOperator();
@@ -1208,26 +1207,12 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   /**
    * The <code>FLOOR</code> function.
    */
-  public static final SqlFunction FLOOR =
-      new SqlMonotonicUnaryFunction(
-          "FLOOR",
-          SqlKind.OTHER_FUNCTION,
-          ReturnTypes.ARG0_OR_EXACT_NO_SCALE,
-          null,
-          OperandTypes.NUMERIC_OR_INTERVAL,
-          SqlFunctionCategory.NUMERIC);
+  public static final SqlFunction FLOOR = new SqlFloorFunction(SqlKind.FLOOR);
 
   /**
    * The <code>CEIL</code> function.
    */
-  public static final SqlFunction CEIL =
-      new SqlMonotonicUnaryFunction(
-          "CEIL",
-          SqlKind.OTHER_FUNCTION,
-          ReturnTypes.ARG0_OR_EXACT_NO_SCALE,
-          null,
-          OperandTypes.NUMERIC_OR_INTERVAL,
-          SqlFunctionCategory.NUMERIC);
+  public static final SqlFunction CEIL = new SqlFloorFunction(SqlKind.CEIL);
 
   /**
    * The <code>USER</code> function.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java b/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
index e5bb0ce..712c3c4 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
@@ -18,15 +18,15 @@ package org.apache.calcite.sql.type;
 
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperandCountRange;
 import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.util.Util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import java.util.AbstractList;
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  * This class allows multiple existing {@link SqlOperandTypeChecker} rules to be
@@ -75,8 +75,7 @@ import java.util.List;
  * SqlSingleOperandTypeChecker, and signature generation is not supported. For
  * AND composition, only the first rule is used for signature generation.
  */
-public class CompositeOperandTypeChecker
-    implements SqlSingleOperandTypeChecker {
+public class CompositeOperandTypeChecker implements SqlOperandTypeChecker {
   //~ Enums ------------------------------------------------------------------
 
   /** How operands are composed. */
@@ -86,8 +85,9 @@ public class CompositeOperandTypeChecker
 
   //~ Instance fields --------------------------------------------------------
 
-  private final ImmutableList<SqlSingleOperandTypeChecker> allowedRules;
-  private final Composition composition;
+  protected final ImmutableList<? extends SqlOperandTypeChecker> allowedRules;
+  protected final Composition composition;
+  private final String allowedSignatures;
 
   //~ Constructors -----------------------------------------------------------
 
@@ -97,25 +97,30 @@ public class CompositeOperandTypeChecker
    */
   CompositeOperandTypeChecker(
       Composition composition,
-      ImmutableList<SqlSingleOperandTypeChecker> allowedRules) {
-    assert null != allowedRules;
+      ImmutableList<? extends SqlOperandTypeChecker> allowedRules,
+      @Nullable String allowedSignatures) {
+    this.allowedRules = Preconditions.checkNotNull(allowedRules);
+    this.composition = Preconditions.checkNotNull(composition);
+    this.allowedSignatures = allowedSignatures;
     assert allowedRules.size() > 1;
-    this.allowedRules = allowedRules;
-    this.composition = composition;
   }
 
   //~ Methods ----------------------------------------------------------------
 
-  public ImmutableList<SqlSingleOperandTypeChecker> getRules() {
+  public ImmutableList<? extends SqlOperandTypeChecker> getRules() {
     return allowedRules;
   }
 
   public String getAllowedSignatures(SqlOperator op, String opName) {
+    if (allowedSignatures != null) {
+      return allowedSignatures;
+    }
     if (composition == Composition.SEQUENCE) {
-      throw Util.needToImplement("must override getAllowedSignatures");
+      throw new AssertionError(
+          "specify allowedSignatures or override getAllowedSignatures");
     }
     StringBuilder ret = new StringBuilder();
-    for (Ord<SqlSingleOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+    for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
       if (ord.i > 0) {
         ret.append(SqlOperator.NL);
       }
@@ -213,134 +218,66 @@ public class CompositeOperandTypeChecker
     return max;
   }
 
-  public boolean checkSingleOperandType(
+  public boolean checkOperandTypes(
       SqlCallBinding callBinding,
-      SqlNode node,
-      int iFormalOperand,
       boolean throwOnFailure) {
-    assert allowedRules.size() >= 1;
-
-    if (composition == Composition.SEQUENCE) {
-      return allowedRules.get(iFormalOperand).checkSingleOperandType(
-          callBinding, node, 0, throwOnFailure);
-    }
-
-    int typeErrorCount = 0;
-
-    boolean throwOnAndFailure =
-        (composition == Composition.AND)
-            && throwOnFailure;
-
-    for (SqlSingleOperandTypeChecker rule : allowedRules) {
-      if (!rule.checkSingleOperandType(
-          callBinding,
-          node,
-          iFormalOperand,
-          throwOnAndFailure)) {
-        typeErrorCount++;
-      }
+    if (check(callBinding)) {
+      return true;
     }
-
-    boolean ret;
-    switch (composition) {
-    case AND:
-      ret = typeErrorCount == 0;
-      break;
-    case OR:
-      ret = typeErrorCount < allowedRules.size();
-      break;
-    default:
-      // should never come here
-      throw Util.unexpected(composition);
+    if (!throwOnFailure) {
+      return false;
     }
-
-    if (!ret && throwOnFailure) {
-      // In the case of a composite OR, we want to throw an error
-      // describing in more detail what the problem was, hence doing the
-      // loop again.
-      for (SqlSingleOperandTypeChecker rule : allowedRules) {
-        rule.checkSingleOperandType(
-            callBinding,
-            node,
-            iFormalOperand,
-            true);
+    if (composition == Composition.OR) {
+      for (SqlOperandTypeChecker allowedRule : allowedRules) {
+        allowedRule.checkOperandTypes(callBinding, true);
       }
-
-      // If no exception thrown, just throw a generic validation signature
-      // error.
-      throw callBinding.newValidationSignatureError();
     }
 
-    return ret;
+    // If no exception thrown, just throw a generic validation
+    // signature error.
+    throw callBinding.newValidationSignatureError();
   }
 
-  public boolean checkOperandTypes(
-      SqlCallBinding callBinding,
-      boolean throwOnFailure) {
-    int typeErrorCount = 0;
-
-  label:
-    for (Ord<SqlSingleOperandTypeChecker> ord : Ord.zip(allowedRules)) {
-      SqlSingleOperandTypeChecker rule = ord.e;
-
-      switch (composition) {
-      case SEQUENCE:
-        if (ord.i >= callBinding.getOperandCount()) {
-          break label;
-        }
-        if (!rule.checkSingleOperandType(
+  private boolean check(SqlCallBinding callBinding) {
+    switch (composition) {
+    case SEQUENCE:
+      if (callBinding.getOperandCount() != allowedRules.size()) {
+        return false;
+      }
+      for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+        SqlOperandTypeChecker rule = ord.e;
+        if (!((SqlSingleOperandTypeChecker) rule).checkSingleOperandType(
             callBinding,
             callBinding.getCall().operand(ord.i),
             0,
             false)) {
-          typeErrorCount++;
+          return false;
         }
-        break;
-      default:
+      }
+      return true;
+
+    case AND:
+      for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+        SqlOperandTypeChecker rule = ord.e;
         if (!rule.checkOperandTypes(callBinding, false)) {
-          typeErrorCount++;
-          if (composition == Composition.AND) {
-            // Avoid trying other rules in AND if the first one fails.
-            break label;
-          }
-        } else if (composition == Composition.OR) {
-          break label; // true OR any == true, just break
+          // Avoid trying other rules in AND if the first one fails.
+          return false;
         }
-        break;
       }
-    }
+      return true;
 
-    boolean failed;
-    switch (composition) {
-    case AND:
-    case SEQUENCE:
-      failed = typeErrorCount > 0;
-      break;
     case OR:
-      failed = typeErrorCount == allowedRules.size();
-      break;
-    default:
-      throw new AssertionError();
-    }
-
-    if (failed) {
-      if (throwOnFailure) {
-        // In the case of a composite OR, we want to throw an error
-        // describing in more detail what the problem was, hence doing
-        // the loop again.
-        if (composition == Composition.OR) {
-          for (SqlOperandTypeChecker allowedRule : allowedRules) {
-            allowedRule.checkOperandTypes(callBinding, true);
-          }
+      for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+        SqlOperandTypeChecker rule = ord.e;
+        if (rule.checkOperandTypes(callBinding, false)) {
+          return true;
         }
-
-        // If no exception thrown, just throw a generic validation
-        // signature error.
-        throw callBinding.newValidationSignatureError();
       }
       return false;
+
+    default:
+      throw new AssertionError();
     }
-    return true;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java b/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
new file mode 100644
index 0000000..de8c303
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Allows multiple
+ * {@link org.apache.calcite.sql.type.SqlSingleOperandTypeChecker} rules to be
+ * combined into one rule.
+ */
+public class CompositeSingleOperandTypeChecker
+    extends CompositeOperandTypeChecker
+    implements SqlSingleOperandTypeChecker {
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Package private. Use {@link org.apache.calcite.sql.type.OperandTypes#and},
+   * {@link org.apache.calcite.sql.type.OperandTypes#or}.
+   */
+  CompositeSingleOperandTypeChecker(
+      CompositeOperandTypeChecker.Composition composition,
+      ImmutableList<? extends SqlSingleOperandTypeChecker> allowedRules,
+      String allowedSignatures) {
+    super(composition, allowedRules, allowedSignatures);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @SuppressWarnings("unchecked")
+  @Override public ImmutableList<? extends SqlSingleOperandTypeChecker>
+  getRules() {
+    return (ImmutableList<? extends SqlSingleOperandTypeChecker>) allowedRules;
+  }
+
+  public boolean checkSingleOperandType(
+      SqlCallBinding callBinding,
+      SqlNode node,
+      int iFormalOperand,
+      boolean throwOnFailure) {
+    assert allowedRules.size() >= 1;
+
+    final ImmutableList<? extends SqlSingleOperandTypeChecker> rules =
+        getRules();
+    if (composition == Composition.SEQUENCE) {
+      return rules.get(iFormalOperand).checkSingleOperandType(
+          callBinding, node, 0, throwOnFailure);
+    }
+
+    int typeErrorCount = 0;
+
+    boolean throwOnAndFailure =
+        (composition == Composition.AND)
+            && throwOnFailure;
+
+    for (SqlSingleOperandTypeChecker rule : rules) {
+      if (!rule.checkSingleOperandType(
+          callBinding,
+          node,
+          iFormalOperand,
+          throwOnAndFailure)) {
+        typeErrorCount++;
+      }
+    }
+
+    boolean ret;
+    switch (composition) {
+    case AND:
+      ret = typeErrorCount == 0;
+      break;
+    case OR:
+      ret = typeErrorCount < allowedRules.size();
+      break;
+    default:
+      // should never come here
+      throw Util.unexpected(composition);
+    }
+
+    if (!ret && throwOnFailure) {
+      // In the case of a composite OR, we want to throw an error
+      // describing in more detail what the problem was, hence doing the
+      // loop again.
+      for (SqlSingleOperandTypeChecker rule : rules) {
+        rule.checkSingleOperandType(
+            callBinding,
+            node,
+            iFormalOperand,
+            true);
+      }
+
+      // If no exception thrown, just throw a generic validation signature
+      // error.
+      throw callBinding.newValidationSignatureError();
+    }
+
+    return ret;
+  }
+}
+
+// End CompositeSingleOperandTypeChecker.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
index c70d403..10bccbd 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
@@ -72,21 +72,52 @@ public abstract class OperandTypes {
   /**
    * Creates a checker that passes if any one of the rules passes.
    */
+  public static SqlOperandTypeChecker or(SqlOperandTypeChecker... rules) {
+    return new CompositeOperandTypeChecker(
+        CompositeOperandTypeChecker.Composition.OR,
+        ImmutableList.copyOf(rules), null);
+  }
+
+  /**
+   * Creates a single-operand checker that passes if any one of the rules
+   * passes.
+   */
+  public static SqlOperandTypeChecker and(SqlOperandTypeChecker... rules) {
+    return new CompositeOperandTypeChecker(
+        CompositeOperandTypeChecker.Composition.AND,
+        ImmutableList.copyOf(rules), null);
+  }
+
+  /**
+   * Creates a single-operand checker that passes if any one of the rules
+   * passes.
+   */
   public static SqlSingleOperandTypeChecker or(
       SqlSingleOperandTypeChecker... rules) {
-    return new CompositeOperandTypeChecker(
+    return new CompositeSingleOperandTypeChecker(
         CompositeOperandTypeChecker.Composition.OR,
-        ImmutableList.copyOf(rules));
+        ImmutableList.copyOf(rules), null);
   }
 
   /**
-   * Creates a checker that passes if any one of the rules passes.
+   * Creates a single-operand checker that passes if any one of the rules
+   * passes.
    */
   public static SqlSingleOperandTypeChecker and(
       SqlSingleOperandTypeChecker... rules) {
-    return new CompositeOperandTypeChecker(
+    return new CompositeSingleOperandTypeChecker(
         CompositeOperandTypeChecker.Composition.AND,
-        ImmutableList.copyOf(rules));
+        ImmutableList.copyOf(rules), null);
+  }
+
+  /**
+   * Creates an operand checker from a sequence of single-operand checkers.
+   */
+  public static SqlOperandTypeChecker sequence(String allowedSignatures,
+      SqlSingleOperandTypeChecker... rules) {
+    return new CompositeOperandTypeChecker(
+        CompositeOperandTypeChecker.Composition.SEQUENCE,
+        ImmutableList.copyOf(rules), allowedSignatures);
   }
 
   // ----------------------------------------------------------------------
@@ -111,7 +142,7 @@ public abstract class OperandTypes {
   public static final SqlOperandTypeChecker ONE_OR_MORE =
       variadic(SqlOperandCountRanges.from(1));
 
-  private static SqlOperandTypeChecker variadic(
+  public static SqlOperandTypeChecker variadic(
       final SqlOperandCountRange range) {
     return new SqlOperandTypeChecker() {
       public boolean checkOperandTypes(


[3/4] incubator-calcite git commit: [CALCITE-602] Streaming queries

Posted by jh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
index 9dc0178..9bbb11f 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
@@ -180,6 +180,10 @@ abstract class AbstractNamespace implements SqlValidatorNamespace {
     return this;
   }
 
+  public boolean supportsModality(SqlModality modality) {
+    return true;
+  }
+
   public <T> T unwrap(Class<T> clazz) {
     return clazz.cast(this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
index 89d4132..7b46690 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
@@ -199,10 +199,18 @@ public class IdentifierNamespace extends AbstractNamespace {
     return monotonicExprs;
   }
 
-  public SqlMonotonicity getMonotonicity(String columnName) {
+  @Override public SqlMonotonicity getMonotonicity(String columnName) {
     final SqlValidatorTable table = getTable();
     return table.getMonotonicity(columnName);
   }
+
+  @Override public boolean supportsModality(SqlModality modality) {
+    final SqlValidatorTable table = getTable();
+    if (table == null) {
+      return modality == SqlModality.RELATION;
+    }
+    return table.supportsModality(modality);
+  }
 }
 
 // End IdentifierNamespace.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SelectNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SelectNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/SelectNamespace.java
index 1ba7347..a0e0ad2 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SelectNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SelectNamespace.java
@@ -61,6 +61,10 @@ public class SelectNamespace extends AbstractNamespace {
     return rowType;
   }
 
+  @Override public boolean supportsModality(SqlModality modality) {
+    return validator.validateModality(select, modality, false);
+  }
+
   public SqlMonotonicity getMonotonicity(String columnName) {
     final RelDataType rowType = this.getRowTypeSansSystemColumns();
     final int field = SqlTypeUtil.findField(rowType, columnName);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SetopNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SetopNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/SetopNamespace.java
index ac71d01..21a6ec8 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SetopNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SetopNamespace.java
@@ -55,6 +55,40 @@ public class SetopNamespace extends AbstractNamespace {
     return call;
   }
 
+  @Override public SqlMonotonicity getMonotonicity(String columnName) {
+    SqlMonotonicity monotonicity = null;
+    int index = getRowType().getFieldNames().indexOf(columnName);
+    if (index < 0) {
+      return SqlMonotonicity.NOT_MONOTONIC;
+    }
+    for (SqlNode operand : call.getOperandList()) {
+      final SqlValidatorNamespace namespace = validator.getNamespace(operand);
+      monotonicity = combine(monotonicity,
+          namespace.getMonotonicity(
+              namespace.getRowType().getFieldNames().get(index)));
+    }
+    return monotonicity;
+  }
+
+  private SqlMonotonicity combine(SqlMonotonicity m0, SqlMonotonicity m1) {
+    if (m0 == null) {
+      return m1;
+    }
+    if (m1 == null) {
+      return m0;
+    }
+    if (m0 == m1) {
+      return m0;
+    }
+    if (m0.unstrict() == m1) {
+      return m1;
+    }
+    if (m1.unstrict() == m0) {
+      return m0;
+    }
+    return SqlMonotonicity.NOT_MONOTONIC;
+  }
+
   public RelDataType validateImpl() {
     switch (call.getKind()) {
     case UNION:

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SqlModality.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlModality.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlModality.java
new file mode 100644
index 0000000..471ac70
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlModality.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.validate;
+
+/** Relational or streaming. */
+public enum SqlModality {
+  RELATION,
+  STREAM
+}
+
+// End SqlModality.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SqlValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidator.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidator.java
index 10f869d..a02b43f 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidator.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidator.java
@@ -711,6 +711,19 @@ public interface SqlValidator {
    */
   SqlValidatorScope getOverScope(SqlNode node);
 
+  /**
+   * Validates that a query is capable of producing a return of given modality
+   * (relational or streaming).
+   *
+   * @param select Query
+   * @param modality Modality (streaming or relational)
+   * @param fail Whether to throw a user error if does not support required
+   *             modality
+   * @return whether query supports the given modality
+   */
+  boolean validateModality(SqlSelect select, SqlModality modality,
+      boolean fail);
+
   void validateWith(SqlWith with, SqlValidatorScope scope);
 
   void validateWithItem(SqlWithItem withItem);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index adc459c..44147ba 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -16,7 +16,6 @@
  */
 package org.apache.calcite.sql.validate;
 
-import org.apache.calcite.linq4j.Linq4j;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -75,6 +74,7 @@ import org.apache.calcite.sql.util.SqlShuttle;
 import org.apache.calcite.sql.util.SqlVisitor;
 import org.apache.calcite.util.BitString;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Static;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
@@ -132,6 +132,8 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
    */
   public static final String UPDATE_ANON_PREFIX = "SYS$ANON";
 
+  private SqlNode top;
+
   @VisibleForTesting
   public SqlValidatorScope getEmptyScope() {
     return new EmptyScope(this);
@@ -275,14 +277,10 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
       SqlValidatorCatalogReader catalogReader,
       RelDataTypeFactory typeFactory,
       SqlConformance conformance) {
-    Linq4j.requireNonNull(opTab);
-    Linq4j.requireNonNull(catalogReader);
-    Linq4j.requireNonNull(typeFactory);
-    Linq4j.requireNonNull(conformance);
-    this.opTab = opTab;
-    this.catalogReader = catalogReader;
-    this.typeFactory = typeFactory;
-    this.conformance = conformance;
+    this.opTab = Preconditions.checkNotNull(opTab);
+    this.catalogReader = Preconditions.checkNotNull(catalogReader);
+    this.typeFactory = Preconditions.checkNotNull(typeFactory);
+    this.conformance = Preconditions.checkNotNull(conformance);
 
     // NOTE jvs 23-Dec-2003:  This is used as the type for dynamic
     // parameters and null literals until a real type is imposed for them.
@@ -795,6 +793,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
       SqlValidatorScope scope) {
     SqlNode outermostNode = performUnconditionalRewrites(topNode, false);
     cursorSet.add(outermostNode);
+    top = outermostNode;
     if (TRACER.isLoggable(Level.FINER)) {
       TRACER.finer("After unconditional rewrite: " + outermostNode.toString());
     }
@@ -828,6 +827,9 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
     }
 
     validateNamespace(ns);
+    if (node == top) {
+      validateModality(node);
+    }
     validateAccess(
         node,
         ns.getTable(),
@@ -2452,9 +2454,24 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
   }
 
   public boolean isAggregate(SqlSelect select) {
-    return select.getGroup() != null
-        || select.getHaving() != null
-        || getAgg(select) != null;
+    return getAggregate(select) != null;
+  }
+
+  /** Returns the parse tree node (GROUP BY, HAVING, or an aggregate function
+   * call) that causes {@code select} to be an aggregate query, or null if it is
+   * not an aggregate query.
+   *
+   * <p>The node is useful context for error messages. */
+  protected SqlNode getAggregate(SqlSelect select) {
+    SqlNode node = select.getGroup();
+    if (node != null) {
+      return node;
+    }
+    node = select.getHaving();
+    if (node != null) {
+      return node;
+    }
+    return getAgg(select);
   }
 
   private SqlNode getAgg(SqlSelect select) {
@@ -2930,6 +2947,146 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
     validateOrderList(select);
   }
 
+  /** Validates that a query can deliver the modality it promises. Only called
+   * on the top-most SELECT or set operator in the tree. */
+  private void validateModality(SqlNode query) {
+    final SqlModality modality = deduceModality(query);
+    if (query instanceof SqlSelect) {
+      final SqlSelect select = (SqlSelect) query;
+      validateModality(select, modality, true);
+    } else if (query.getKind() == SqlKind.VALUES) {
+      switch (modality) {
+      case STREAM:
+        throw newValidationError(query, Static.RESOURCE.cannotStreamValues());
+      }
+    } else {
+      assert query.isA(SqlKind.SET_QUERY);
+      final SqlCall call = (SqlCall) query;
+      for (SqlNode operand : call.getOperandList()) {
+        if (deduceModality(operand) != modality) {
+          throw newValidationError(operand,
+              Static.RESOURCE.streamSetOpInconsistentInputs());
+        }
+        validateModality(operand);
+      }
+    }
+  }
+
+  /** Return the intended modality of a SELECT or set-op. */
+  private SqlModality deduceModality(SqlNode query) {
+    if (query instanceof SqlSelect) {
+      SqlSelect select = (SqlSelect) query;
+      return select.getModifierNode(SqlSelectKeyword.STREAM) != null
+          ? SqlModality.STREAM
+          : SqlModality.RELATION;
+    } else if (query.getKind() == SqlKind.VALUES) {
+      return SqlModality.RELATION;
+    } else {
+      assert query.isA(SqlKind.SET_QUERY);
+      final SqlCall call = (SqlCall) query;
+      return deduceModality(call.getOperandList().get(0));
+    }
+  }
+
+  public boolean validateModality(SqlSelect select, SqlModality modality,
+      boolean fail) {
+    final SelectScope scope = getRawSelectScope(select);
+    for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+      if (!namespace.right.supportsModality(modality)) {
+        switch (modality) {
+        case STREAM:
+          if (fail) {
+            throw newValidationError(namespace.right.getNode(),
+                Static.RESOURCE.cannotConvertToStream(namespace.left));
+          } else {
+            return false;
+          }
+        default:
+          if (fail) {
+            throw newValidationError(namespace.right.getNode(),
+                Static.RESOURCE.cannotConvertToRelation(namespace.left));
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    // Make sure that aggregation is possible.
+    final SqlNode aggregateNode = getAggregate(select);
+    if (aggregateNode != null) {
+      switch (modality) {
+      case STREAM:
+        SqlNodeList groupList = select.getGroup();
+        if (groupList == null || !containsMonotonic(scope, groupList)) {
+          if (fail) {
+            throw newValidationError(aggregateNode,
+                Static.RESOURCE.streamMustGroupByMonotonic());
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    // Make sure that ORDER BY is possible.
+    final SqlNodeList orderList  = select.getOrderList();
+    if (orderList != null && orderList.size() > 0) {
+      switch (modality) {
+      case STREAM:
+        if (!hasSortedPrefix(scope, orderList)) {
+          if (fail) {
+            throw newValidationError(orderList.get(0),
+                Static.RESOURCE.streamMustOrderByMonotonic());
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  /** Returns whether the prefix is sorted. */
+  private boolean hasSortedPrefix(SelectScope scope, SqlNodeList orderList) {
+    return isSortCompatible(scope, orderList.get(0), false);
+  }
+
+  private boolean isSortCompatible(SelectScope scope, SqlNode node,
+      boolean descending) {
+    switch (node.getKind()) {
+    case DESCENDING:
+      return isSortCompatible(scope, ((SqlCall) node).getOperandList().get(0),
+          true);
+    }
+    final SqlMonotonicity monotonicity = scope.getMonotonicity(node);
+    switch (monotonicity) {
+    case INCREASING:
+    case STRICTLY_INCREASING:
+      return !descending;
+    case DECREASING:
+    case STRICTLY_DECREASING:
+      return descending;
+    default:
+      return false;
+    }
+  }
+
+  private static boolean containsMonotonic(SelectScope scope,
+      SqlNodeList nodes) {
+    for (SqlNode node : nodes) {
+      final SqlMonotonicity monotonicity = scope.getMonotonicity(node);
+      switch (monotonicity) {
+      case CONSTANT:
+      case NOT_MONOTONIC:
+        break;
+      default:
+        return true;
+      }
+    }
+    return false;
+  }
+
   protected void validateWindowClause(SqlSelect select) {
     final SqlNodeList windowList = select.getWindowList();
     if ((windowList == null) || (windowList.size() == 0)) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
index b95204e..007028d 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
@@ -195,6 +195,13 @@ public interface SqlValidatorNamespace {
    * <p>You must not call this method before {@link #validate()} has
    * completed.</p> */
   SqlValidatorNamespace resolve();
+
+  /** Returns whether this namespace is capable of giving results of the desired
+   * modality. {@code true} means streaming, {@code false} means relational.
+   *
+   * @param modality Modality
+   */
+  boolean supportsModality(SqlModality modality);
 }
 
 // End SqlValidatorNamespace.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
index 7b2d48c..e442cdc 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorTable.java
@@ -42,6 +42,8 @@ public interface SqlValidatorTable {
    * Returns the access type of the table
    */
   SqlAccessType getAllowedAccess();
+
+  boolean supportsModality(SqlModality modality);
 }
 
 // End SqlValidatorTable.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/TableConstructorNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/TableConstructorNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/TableConstructorNamespace.java
index 05ddf53..da8a22f 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/TableConstructorNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/TableConstructorNamespace.java
@@ -85,6 +85,10 @@ public class TableConstructorNamespace extends AbstractNamespace {
   public SqlValidatorScope getScope() {
     return scope;
   }
+
+  @Override public boolean supportsModality(SqlModality modality) {
+    return modality == SqlModality.RELATION;
+  }
 }
 
 // End TableConstructorNamespace.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
index 4c70157..044d41b 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
@@ -64,6 +64,11 @@ class TableNamespace extends AbstractNamespace {
     return table;
   }
 
+  @Override public SqlMonotonicity getMonotonicity(String columnName) {
+    final SqlValidatorTable table = getTable();
+    return table.getMonotonicity(columnName);
+  }
+
   /** Creates a TableNamespace based on the same table as this one, but with
    * extended fields.
    *

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
index 8fa93a1..f4765b4 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
@@ -42,6 +42,8 @@ import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.stream.LogicalChi;
+import org.apache.calcite.rel.stream.LogicalDelta;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
@@ -657,6 +659,14 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
     setNewForOldRel(rel, newRel);
   }
 
+  public void rewriteRel(LogicalDelta rel) {
+    rewriteGeneric(rel);
+  }
+
+  public void rewriteRel(LogicalChi rel) {
+    rewriteGeneric(rel);
+  }
+
   /** Generates expressions that reference the flattened input fields from
    * a given row type. */
   private void flattenInputs(List<RelDataTypeField> fieldList, RexNode prefix,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index a68f1c3..207b2dc 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -54,6 +54,7 @@ import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.stream.LogicalDelta;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -524,6 +525,9 @@ public class SqlToRelConverter {
     }
 
     RelNode result = convertQueryRecursive(query, top, null);
+    if (top && isStream(query)) {
+      result = new LogicalDelta(cluster, result.getTraitSet(), result);
+    }
     checkConvertedType(query, result);
 
     boolean dumpPlan = SQL2REL_LOGGER.isLoggable(Level.FINE);
@@ -539,6 +543,11 @@ public class SqlToRelConverter {
     return result;
   }
 
+  private static boolean isStream(SqlNode query) {
+    return query instanceof SqlSelect
+        && ((SqlSelect) query).isKeywordPresent(SqlSelectKeyword.STREAM);
+  }
+
   protected boolean checkConvertedRowType(
       SqlNode query,
       RelDataType convertedRowType) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index ff43113..b5f2725 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -234,6 +234,8 @@ public enum BuiltInMethod {
       int.class),
   CHAR_LENGTH(SqlFunctions.class, "charLength", String.class),
   STRING_CONCAT(SqlFunctions.class, "concat", String.class, String.class),
+  FLOOR(SqlFunctions.class, "floor", int.class, int.class),
+  CEIL(SqlFunctions.class, "ceil", int.class, int.class),
   OVERLAY(SqlFunctions.class, "overlay", String.class, String.class, int.class),
   OVERLAY3(SqlFunctions.class, "overlay", String.class, String.class, int.class,
       int.class),
@@ -265,6 +267,14 @@ public enum BuiltInMethod {
       long.class, TimeUnitRange.class, int.class),
   UNIX_DATE_EXTRACT(DateTimeUtils.class, "unixDateExtract",
       TimeUnitRange.class, long.class),
+  UNIX_DATE_FLOOR(DateTimeUtils.class, "unixDateFloor",
+      TimeUnitRange.class, int.class),
+  UNIX_DATE_CEIL(DateTimeUtils.class, "unixDateCeil",
+      TimeUnitRange.class, int.class),
+  UNIX_TIMESTAMP_FLOOR(DateTimeUtils.class, "unixTimestampFloor",
+      TimeUnitRange.class, long.class),
+  UNIX_TIMESTAMP_CEIL(DateTimeUtils.class, "unixTimestampCeil",
+      TimeUnitRange.class, long.class),
   CURRENT_TIMESTAMP(SqlFunctions.class, "currentTimestamp", DataContext.class),
   CURRENT_TIME(SqlFunctions.class, "currentTime", DataContext.class),
   CURRENT_DATE(SqlFunctions.class, "currentDate", DataContext.class),

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index d4213fc..60f14ba 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -179,4 +179,10 @@ RequireDefaultConstructor=Declaring class ''{0}'' of non-static user-defined fun
 FirstParameterOfAdd=In user-defined aggregate class ''{0}'', first parameter to ''add'' method must be the accumulator (the return type of the ''init'' method)
 FilterableTableInventedFilter=FilterableTable.scan returned a filter that was not in the original list: {0}
 FilterableTableScanReturnedNull=FilterableTable.scan must not return null
+CannotConvertToStream=Cannot convert table ''{0}'' to stream
+CannotConvertToRelation=Cannot convert stream ''{0}'' to relation
+StreamMustGroupByMonotonic=Streaming aggregation requires at least one monotonic expression in GROUP BY clause
+StreamMustOrderByMonotonic=Streaming ORDER BY must start with monotonic expression
+StreamSetOpInconsistentInputs=Set operator cannot combine streaming and non-streaming inputs
+CannotStreamValues=Cannot stream VALUES
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/sql/parser/SqlParserTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/parser/SqlParserTest.java b/core/src/test/java/org/apache/calcite/sql/parser/SqlParserTest.java
index 70eb39c..9b32900 100644
--- a/core/src/test/java/org/apache/calcite/sql/parser/SqlParserTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/parser/SqlParserTest.java
@@ -483,6 +483,21 @@ public class SqlParserTest {
             + "WHERE (TRUE IS NOT DISTINCT FROM TRUE)");
   }
 
+  @Test public void testFloor() {
+    checkExp("floor(1.5)", "FLOOR(1.5)");
+    checkExp("floor(x)", "FLOOR(`X`)");
+    checkExp("floor(x to hour)", "FLOOR(`X` TO HOUR)");
+    checkExp("ceil(x to hour)", "CEIL(`X` TO HOUR)");
+    checkExp("ceil(x + interval '1' minute to second)",
+        "CEIL((`X` + INTERVAL '1' MINUTE TO SECOND))");
+    checkExp("ceil((x + interval '1' minute) to second)",
+        "CEIL((`X` + INTERVAL '1' MINUTE) TO SECOND)");
+    checkExp("ceil(x + (interval '1:23' minute to second))",
+        "CEIL((`X` + INTERVAL '1:23' MINUTE TO SECOND))");
+    checkExp("ceil(x + interval '1:23' minute to second to second)",
+        "CEIL((`X` + INTERVAL '1:23' MINUTE TO SECOND) TO SECOND)");
+  }
+
   @Test public void testCast() {
     checkExp("cast(x as boolean)", "CAST(`X` AS BOOLEAN)");
     checkExp("cast(x as integer)", "CAST(`X` AS INTEGER)");
@@ -1895,6 +1910,18 @@ public class SqlParserTest {
             + "FROM `BAR`) AS `XYZ`");
   }
 
+  @Test public void testSelectStream() {
+    sql("select stream foo from bar")
+        .ok("SELECT STREAM `FOO`\n"
+            + "FROM `BAR`");
+  }
+
+  @Test public void testSelectStreamDistinct() {
+    sql("select stream distinct foo from bar")
+        .ok("SELECT STREAM DISTINCT `FOO`\n"
+                + "FROM `BAR`");
+  }
+
   @Test public void testWhere() {
     check(
         "select * from emp where empno > 5 and gender = 'F'",

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
index 14aec93..3694094 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
@@ -69,7 +69,9 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
           "TABLE(CATALOG.SALES.EMP_ADDRESS)",
           "TABLE(CATALOG.SALES.DEPT)",
           "TABLE(CATALOG.SALES.BONUS)",
-          "TABLE(CATALOG.SALES.SALGRADE)");
+          "TABLE(CATALOG.SALES.ORDERS)",
+          "TABLE(CATALOG.SALES.SALGRADE)",
+          "TABLE(CATALOG.SALES.SHIPMENTS)");
 
   private static final List<String> SCHEMAS =
       Arrays.asList(
@@ -185,6 +187,7 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
       Arrays.asList(
           "KEYWORD(ALL)",
           "KEYWORD(DISTINCT)",
+          "KEYWORD(STREAM)",
           "KEYWORD(*)");
 
   private static final List<String> ORDER_KEYWORDS =

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/sql/test/SqlOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlOperatorBaseTest.java b/core/src/test/java/org/apache/calcite/sql/test/SqlOperatorBaseTest.java
index ba9372d..8b0c0c1 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlOperatorBaseTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlOperatorBaseTest.java
@@ -4324,15 +4324,9 @@ public abstract class SqlOperatorBaseTest {
 
   @Test public void testCeilFunc() {
     tester.setFor(SqlStdOperatorTable.CEIL, VM_FENNEL);
-    if (!enable) {
-      return;
-    }
     tester.checkScalarApprox("ceil(10.1e0)", "DOUBLE NOT NULL", 11, 0);
-    tester.checkScalarApprox(
-        "ceil(cast(-11.2e0 as real))",
-        "REAL NOT NULL",
-        -11,
-        0);
+    tester.checkScalarApprox("ceil(cast(-11.2e0 as real))", "REAL NOT NULL",
+        -11, 0);
     tester.checkScalarExact("ceil(100)", "INTEGER NOT NULL", "100");
     tester.checkScalarExact(
         "ceil(1.3)", "DECIMAL(2, 0) NOT NULL", "2");
@@ -4368,14 +4362,8 @@ public abstract class SqlOperatorBaseTest {
 
   @Test public void testFloorFunc() {
     tester.setFor(SqlStdOperatorTable.FLOOR, VM_FENNEL);
-    if (!enable) {
-      return;
-    }
     tester.checkScalarApprox("floor(2.5e0)", "DOUBLE NOT NULL", 2, 0);
-    tester.checkScalarApprox(
-        "floor(cast(-1.2e0 as real))",
-        "REAL NOT NULL",
-        -2,
+    tester.checkScalarApprox("floor(cast(-1.2e0 as real))", "REAL NOT NULL", -2,
         0);
     tester.checkScalarExact("floor(100)", "INTEGER NOT NULL", "100");
     tester.checkScalarExact(
@@ -4386,6 +4374,69 @@ public abstract class SqlOperatorBaseTest {
     tester.checkNull("floor(cast(null as real))");
   }
 
+  @Test public void testFloorFuncDateTime() {
+    tester.checkFails("^floor('12:34:56')^",
+        "Cannot apply 'FLOOR' to arguments of type 'FLOOR\\(<CHAR\\(8\\)>\\)'\\. Supported form\\(s\\): 'FLOOR\\(<NUMERIC>\\)'\n"
+            + "'FLOOR\\(<DATETIME_INTERVAL>\\)'\n"
+            + "'FLOOR\\(<DATE> TO <TIME_UNIT>\\)'\n"
+            + "'FLOOR\\(<TIME> TO <TIME_UNIT>\\)'\n"
+            + "'FLOOR\\(<TIMESTAMP> TO <TIME_UNIT>\\)'",
+        false);
+    tester.checkFails("^floor(time '12:34:56')^",
+        "(?s)Cannot apply 'FLOOR' to arguments .*", false);
+    tester.checkFails("^floor(123.45 to minute)^",
+        "(?s)Cannot apply 'FLOOR' to arguments .*", false);
+    tester.checkFails("^floor('abcde' to minute)^",
+        "(?s)Cannot apply 'FLOOR' to arguments .*", false);
+    tester.checkScalar(
+        "floor(time '12:34:56' to minute)", "12:34:00", "TIME(0) NOT NULL");
+    tester.checkScalar("floor(timestamp '2015-02-19 12:34:56.78' to second)",
+        "2015-02-19 12:34:56", "TIMESTAMP(2) NOT NULL");
+    tester.checkScalar("floor(timestamp '2015-02-19 12:34:56' to minute)",
+        "2015-02-19 12:34:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkScalar("floor(timestamp '2015-02-19 12:34:56' to year)",
+        "2015-01-01 00:00:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkScalar("floor(timestamp '2015-02-19 12:34:56' to month)",
+        "2015-02-01 00:00:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkNull("floor(cast(null as timestamp) to month)");
+  }
+
+  @Test public void testCeilFuncDateTime() {
+    tester.checkFails("^ceil('12:34:56')^",
+        "Cannot apply 'CEIL' to arguments of type 'CEIL\\(<CHAR\\(8\\)>\\)'\\. Supported form\\(s\\): 'CEIL\\(<NUMERIC>\\)'\n"
+            + "'CEIL\\(<DATETIME_INTERVAL>\\)'\n"
+            + "'CEIL\\(<DATE> TO <TIME_UNIT>\\)'\n"
+            + "'CEIL\\(<TIME> TO <TIME_UNIT>\\)'\n"
+            + "'CEIL\\(<TIMESTAMP> TO <TIME_UNIT>\\)'",
+        false);
+    tester.checkFails("^ceil(time '12:34:56')^",
+        "(?s)Cannot apply 'CEIL' to arguments .*", false);
+    tester.checkFails("^ceil(123.45 to minute)^",
+        "(?s)Cannot apply 'CEIL' to arguments .*", false);
+    tester.checkFails("^ceil('abcde' to minute)^",
+        "(?s)Cannot apply 'CEIL' to arguments .*", false);
+    tester.checkScalar("ceil(time '12:34:56' to minute)",
+        "12:35:00", "TIME(0) NOT NULL");
+    tester.checkScalar("ceil(time '12:59:56' to minute)",
+        "13:00:00", "TIME(0) NOT NULL");
+    tester.checkScalar("ceil(timestamp '2015-02-19 12:34:56.78' to second)",
+        "2015-02-19 12:34:57", "TIMESTAMP(2) NOT NULL");
+    tester.checkScalar("ceil(timestamp '2015-02-19 12:34:56.00' to second)",
+        "2015-02-19 12:34:56", "TIMESTAMP(2) NOT NULL");
+    tester.checkScalar("ceil(timestamp '2015-02-19 12:34:56' to minute)",
+        "2015-02-19 12:35:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkScalar("ceil(timestamp '2015-02-19 12:34:56' to year)",
+        "2016-01-01 00:00:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkScalar("ceil(timestamp '2015-02-19 12:34:56' to month)",
+        "2015-03-01 00:00:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkNull("ceil(cast(null as timestamp) to month)");
+
+    // ceiling alias
+    tester.checkScalar("ceiling(timestamp '2015-02-19 12:34:56' to month)",
+        "2015-03-01 00:00:00", "TIMESTAMP(0) NOT NULL");
+    tester.checkNull("ceiling(cast(null as timestamp) to month)");
+  }
+
   @Test public void testFloorFuncInterval() {
     if (!enable) {
       return;
@@ -4474,26 +4525,10 @@ public abstract class SqlOperatorBaseTest {
     final String[] stringValues = {
       "'a'", "CAST(NULL AS VARCHAR(1))", "''"
     };
-    tester.checkAgg(
-        "COUNT(*)",
-        stringValues,
-        3,
-        (double) 0);
-    tester.checkAgg(
-        "COUNT(x)",
-        stringValues,
-        2,
-        (double) 0);
-    tester.checkAgg(
-        "COUNT(DISTINCT x)",
-        stringValues,
-        2,
-        (double) 0);
-    tester.checkAgg(
-        "COUNT(DISTINCT 123)",
-        stringValues,
-        1,
-        (double) 0);
+    tester.checkAgg("COUNT(*)", stringValues, 3, (double) 0);
+    tester.checkAgg("COUNT(x)", stringValues, 2, (double) 0);
+    tester.checkAgg("COUNT(DISTINCT x)", stringValues, 2, (double) 0);
+    tester.checkAgg("COUNT(DISTINCT 123)", stringValues, 1, (double) 0);
   }
 
   @Test public void testSumFunc() {
@@ -4520,31 +4555,17 @@ public abstract class SqlOperatorBaseTest {
         "(?s)Cannot apply 'SUM' to arguments of type 'SUM\\(<VARCHAR\\(2\\)>\\)'\\. Supported form\\(s\\): 'SUM\\(<NUMERIC>\\)'.*",
         false);
     final String[] values = {"0", "CAST(null AS INTEGER)", "2", "2"};
-    tester.checkAgg(
-        "sum(x)",
-        values,
-        4,
-        (double) 0);
+    tester.checkAgg("sum(x)", values, 4, (double) 0);
     Object result1 = -3;
     if (!enable) {
       return;
     }
-    tester.checkAgg(
-        "sum(CASE x WHEN 0 THEN NULL ELSE -1 END)",
-        values,
-        result1,
+    tester.checkAgg("sum(CASE x WHEN 0 THEN NULL ELSE -1 END)", values, result1,
         (double) 0);
     Object result = -1;
-    tester.checkAgg(
-        "sum(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)",
-        values,
-        result,
-        (double) 0);
-    tester.checkAgg(
-        "sum(DISTINCT x)",
-        values,
-        2,
-        (double) 0);
+    tester.checkAgg("sum(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)", values,
+        result, (double) 0);
+    tester.checkAgg("sum(DISTINCT x)", values, 2, (double) 0);
   }
 
   /** Very similar to {@code tester.checkType}, but generates inside a SELECT
@@ -4577,24 +4598,16 @@ public abstract class SqlOperatorBaseTest {
       return;
     }
     final String[] values = {"0", "CAST(null AS FLOAT)", "3", "3"};
-    tester.checkAgg(
-        "AVG(x)", values, 2d, 0d);
-    tester.checkAgg(
-        "AVG(DISTINCT x)", values, 1.5d, 0d);
+    tester.checkAgg("AVG(x)", values, 2d, 0d);
+    tester.checkAgg("AVG(DISTINCT x)", values, 1.5d, 0d);
     Object result = -1;
-    tester.checkAgg(
-        "avg(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)",
-        values,
-        result,
-        0d);
+    tester.checkAgg("avg(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)", values,
+        result, 0d);
   }
 
   @Test public void testCovarPopFunc() {
     tester.setFor(SqlStdOperatorTable.COVAR_POP, VM_EXPAND);
-    tester.checkFails(
-        "covar_pop(^*^)",
-        "Unknown identifier '\\*'",
-        false);
+    tester.checkFails("covar_pop(^*^)", "Unknown identifier '\\*'", false);
     tester.checkFails(
         "^covar_pop(cast(null as varchar(2)),cast(null as varchar(2)))^",
         "(?s)Cannot apply 'COVAR_POP' to arguments of type 'COVAR_POP\\(<VARCHAR\\(2\\)>, <VARCHAR\\(2\\)>\\)'\\. Supported form\\(s\\): 'COVAR_POP\\(<NUMERIC>, <NUMERIC>\\)'.*",
@@ -4606,11 +4619,7 @@ public abstract class SqlOperatorBaseTest {
       return;
     }
     // with zero values
-    tester.checkAgg(
-        "covar_pop(x)",
-        new String[]{},
-        null,
-        0d);
+    tester.checkAgg("covar_pop(x)", new String[]{}, null, 0d);
   }
 
   @Test public void testCovarSampFunc() {
@@ -4630,11 +4639,7 @@ public abstract class SqlOperatorBaseTest {
       return;
     }
     // with zero values
-    tester.checkAgg(
-        "covar_samp(x)",
-        new String[]{},
-        null,
-        0d);
+    tester.checkAgg("covar_samp(x)", new String[]{}, null, 0d);
   }
 
   @Test public void testRegrSxxFunc() {
@@ -4654,11 +4659,7 @@ public abstract class SqlOperatorBaseTest {
       return;
     }
     // with zero values
-    tester.checkAgg(
-        "regr_sxx(x)",
-        new String[]{},
-        null,
-        0d);
+    tester.checkAgg("regr_sxx(x)", new String[]{}, null, 0d);
   }
 
   @Test public void testRegrSyyFunc() {
@@ -4678,21 +4679,13 @@ public abstract class SqlOperatorBaseTest {
       return;
     }
     // with zero values
-    tester.checkAgg(
-        "regr_syy(x)",
-        new String[]{},
-        null,
-        0d);
+    tester.checkAgg("regr_syy(x)", new String[]{}, null, 0d);
   }
 
   @Test public void testStddevPopFunc() {
     tester.setFor(SqlStdOperatorTable.STDDEV_POP, VM_EXPAND);
-    tester.checkFails(
-        "stddev_pop(^*^)",
-        "Unknown identifier '\\*'",
-        false);
-    tester.checkFails(
-        "^stddev_pop(cast(null as varchar(2)))^",
+    tester.checkFails("stddev_pop(^*^)", "Unknown identifier '\\*'", false);
+    tester.checkFails("^stddev_pop(cast(null as varchar(2)))^",
         "(?s)Cannot apply 'STDDEV_POP' to arguments of type 'STDDEV_POP\\(<VARCHAR\\(2\\)>\\)'\\. Supported form\\(s\\): 'STDDEV_POP\\(<NUMERIC>\\)'.*",
         false);
     tester.checkType("stddev_pop(CAST(NULL AS INTEGER))", "INTEGER");
@@ -4701,33 +4694,17 @@ public abstract class SqlOperatorBaseTest {
     if (!enable) {
       return;
     }
-    tester.checkAgg(
-        "stddev_pop(x)",
-        values,
-        1.414213562373095d, // verified on Oracle 10g
+    // verified on Oracle 10g
+    tester.checkAgg("stddev_pop(x)", values, 1.414213562373095d,
         0.000000000000001d);
-    tester.checkAgg(
-        "stddev_pop(DISTINCT x)", // Oracle does not allow distinct
-        values,
-        1.5d,
-        0d);
-    tester.checkAgg(
-        "stddev_pop(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)",
-        values,
-        0,
-        0d);
+    // Oracle does not allow distinct
+    tester.checkAgg("stddev_pop(DISTINCT x)", values, 1.5d, 0d);
+    tester.checkAgg("stddev_pop(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)",
+        values, 0, 0d);
     // with one value
-    tester.checkAgg(
-        "stddev_pop(x)",
-        new String[]{"5"},
-        0,
-        0d);
+    tester.checkAgg("stddev_pop(x)", new String[]{"5"}, 0, 0d);
     // with zero values
-    tester.checkAgg(
-        "stddev_pop(x)",
-        new String[]{},
-        null,
-        0d);
+    tester.checkAgg("stddev_pop(x)", new String[]{}, null, 0d);
   }
 
   @Test public void testStddevSampFunc() {
@@ -4746,15 +4723,11 @@ public abstract class SqlOperatorBaseTest {
     if (!enable) {
       return;
     }
-    tester.checkAgg(
-        "stddev_samp(x)",
-        values,
-        1.732050807568877d, // verified on Oracle 10g
+    // verified on Oracle 10g
+    tester.checkAgg("stddev_samp(x)", values, 1.732050807568877d,
         0.000000000000001d);
-    tester.checkAgg(
-        "stddev_samp(DISTINCT x)", // Oracle does not allow distinct
-        values,
-        2.121320343559642d,
+    // Oracle does not allow distinct
+    tester.checkAgg("stddev_samp(DISTINCT x)", values, 2.121320343559642d,
         0.000000000000001d);
     tester.checkAgg(
         "stddev_samp(DISTINCT CASE x WHEN 0 THEN NULL ELSE -1 END)",
@@ -4952,13 +4925,8 @@ public abstract class SqlOperatorBaseTest {
     if (!enable) {
       return;
     }
-    tester.checkWinAgg(
-        "last_value(x)",
-        values,
-        "ROWS 3 PRECEDING",
-        "INTEGER",
-        Arrays.asList("3", "0"),
-        0d);
+    tester.checkWinAgg("last_value(x)", values, "ROWS 3 PRECEDING", "INTEGER",
+        Arrays.asList("3", "0"), 0d);
     final String[] values2 = {"1.6", "1.2"};
     tester.checkWinAgg(
         "last_value(x)",
@@ -4983,13 +4951,8 @@ public abstract class SqlOperatorBaseTest {
     if (!enable) {
       return;
     }
-    tester.checkWinAgg(
-        "first_value(x)",
-        values,
-        "ROWS 3 PRECEDING",
-        "INTEGER",
-        Arrays.asList("0"),
-        0d);
+    tester.checkWinAgg("first_value(x)", values, "ROWS 3 PRECEDING", "INTEGER",
+        Arrays.asList("0"), 0d);
     final String[] values2 = {"1.6", "1.2"};
     tester.checkWinAgg(
         "first_value(x)",
@@ -5116,21 +5079,12 @@ public abstract class SqlOperatorBaseTest {
 
   @Test public void testCastTruncates() {
     tester.setFor(SqlStdOperatorTable.CAST);
-    tester.checkScalar(
-        "CAST('ABCD' AS CHAR(2))",
-        "AB",
-        "CHAR(2) NOT NULL");
-    tester.checkScalar(
-        "CAST('ABCD' AS VARCHAR(2))",
-        "AB",
+    tester.checkScalar("CAST('ABCD' AS CHAR(2))", "AB", "CHAR(2) NOT NULL");
+    tester.checkScalar("CAST('ABCD' AS VARCHAR(2))", "AB",
         "VARCHAR(2) NOT NULL");
-    tester.checkScalar(
-        "CAST(x'ABCDEF12' AS BINARY(2))",
-        "abcd",
+    tester.checkScalar("CAST(x'ABCDEF12' AS BINARY(2))", "abcd",
         "BINARY(2) NOT NULL");
-    tester.checkScalar(
-        "CAST(x'ABCDEF12' AS VARBINARY(2))",
-        "abcd",
+    tester.checkScalar("CAST(x'ABCDEF12' AS VARBINARY(2))", "abcd",
         "VARBINARY(2) NOT NULL");
 
     if (!enable) {
@@ -5139,9 +5093,7 @@ public abstract class SqlOperatorBaseTest {
     tester.checkBoolean(
         "CAST(X'' AS BINARY(3)) = X'000000'",
         true);
-    tester.checkBoolean(
-        "CAST(X'' AS BINARY(3)) = X''",
-        false);
+    tester.checkBoolean("CAST(X'' AS BINARY(3)) = X''", false);
   }
 
   /** Test that calls all operators with all possible argument types, and for

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
index d9ac400..24e642b 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
@@ -116,6 +116,7 @@ import org.junit.runners.Suite;
     ReflectiveSchemaTest.class,
     JdbcTest.class,
     CalciteRemoteDriverTest.class,
+    StreamTest.class,
 
     // test cases
     TableInRootSchemaTest.class,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index 02cabb6..6e7fdf3 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.ObjectSqlType;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlModality;
 import org.apache.calcite.sql.validate.SqlMoniker;
 import org.apache.calcite.sql.validate.SqlMonikerImpl;
 import org.apache.calcite.sql.validate.SqlMonikerType;
@@ -51,6 +52,7 @@ import org.apache.calcite.util.Util;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -58,6 +60,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 /**
@@ -144,7 +147,7 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     registerSchema(salesSchema);
 
     // Register "EMP" table.
-    MockTable empTable = MockTable.create(this, salesSchema, "EMP");
+    MockTable empTable = MockTable.create(this, salesSchema, "EMP", false);
     empTable.addColumn("EMPNO", intType);
     empTable.addColumn("ENAME", varchar20Type);
     empTable.addColumn("JOB", varchar10Type);
@@ -157,13 +160,13 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     registerTable(empTable);
 
     // Register "DEPT" table.
-    MockTable deptTable = MockTable.create(this, salesSchema, "DEPT");
+    MockTable deptTable = MockTable.create(this, salesSchema, "DEPT", false);
     deptTable.addColumn("DEPTNO", intType);
     deptTable.addColumn("NAME", varchar10Type);
     registerTable(deptTable);
 
     // Register "BONUS" table.
-    MockTable bonusTable = MockTable.create(this, salesSchema, "BONUS");
+    MockTable bonusTable = MockTable.create(this, salesSchema, "BONUS", false);
     bonusTable.addColumn("ENAME", varchar20Type);
     bonusTable.addColumn("JOB", varchar10Type);
     bonusTable.addColumn("SAL", intType);
@@ -171,7 +174,8 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     registerTable(bonusTable);
 
     // Register "SALGRADE" table.
-    MockTable salgradeTable = MockTable.create(this, salesSchema, "SALGRADE");
+    MockTable salgradeTable = MockTable.create(this, salesSchema, "SALGRADE",
+        false);
     salgradeTable.addColumn("GRADE", intType);
     salgradeTable.addColumn("LOSAL", intType);
     salgradeTable.addColumn("HISAL", intType);
@@ -179,7 +183,7 @@ public class MockCatalogReader implements Prepare.CatalogReader {
 
     // Register "EMP_ADDRESS" table
     MockTable contactAddressTable =
-        MockTable.create(this, salesSchema, "EMP_ADDRESS");
+        MockTable.create(this, salesSchema, "EMP_ADDRESS", false);
     contactAddressTable.addColumn("EMPNO", intType);
     contactAddressTable.addColumn("HOME_ADDRESS", addressType);
     contactAddressTable.addColumn("MAILING_ADDRESS", addressType);
@@ -190,7 +194,8 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     registerSchema(customerSchema);
 
     // Register "CONTACT" table.
-    MockTable contactTable = MockTable.create(this, customerSchema, "CONTACT");
+    MockTable contactTable = MockTable.create(this, customerSchema, "CONTACT",
+        false);
     contactTable.addColumn("CONTACTNO", intType);
     contactTable.addColumn("FNAME", varchar10Type);
     contactTable.addColumn("LNAME", varchar10Type);
@@ -199,11 +204,30 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     registerTable(contactTable);
 
     // Register "ACCOUNT" table.
-    MockTable accountTable = MockTable.create(this, customerSchema, "ACCOUNT");
+    MockTable accountTable = MockTable.create(this, customerSchema, "ACCOUNT",
+        false);
     accountTable.addColumn("ACCTNO", intType);
     accountTable.addColumn("TYPE", varchar20Type);
     accountTable.addColumn("BALANCE", intType);
     registerTable(accountTable);
+
+    // Register "ORDERS" stream.
+    MockTable ordersStream = MockTable.create(this, salesSchema, "ORDERS",
+        true);
+    ordersStream.addColumn("ROWTIME", timestampType);
+    ordersStream.addMonotonic("ROWTIME");
+    ordersStream.addColumn("PRODUCTID", intType);
+    ordersStream.addColumn("ORDERID", intType);
+    registerTable(ordersStream);
+
+    // Register "SHIPMENTS" stream.
+    MockTable shipmentsStream = MockTable.create(this, salesSchema, "SHIPMENTS",
+        true);
+    shipmentsStream.addColumn("ROWTIME", timestampType);
+    shipmentsStream.addMonotonic("ROWTIME");
+    shipmentsStream.addColumn("ORDERID", intType);
+    registerTable(shipmentsStream);
+
     return this;
   }
 
@@ -405,23 +429,26 @@ public class MockCatalogReader implements Prepare.CatalogReader {
    */
   public static class MockTable implements Prepare.PreparingTable {
     private final MockCatalogReader catalogReader;
+    private final boolean stream;
     private final List<Map.Entry<String, RelDataType>> columnList =
         Lists.newArrayList();
     private RelDataType rowType;
     private List<RelCollation> collationList;
     private final List<String> names;
+    private final Set<String> monotonicColumnSet = Sets.newHashSet();
 
     public MockTable(MockCatalogReader catalogReader, String catalogName,
-        String schemaName, String name) {
+        String schemaName, String name, boolean stream) {
       this.catalogReader = catalogReader;
+      this.stream = stream;
       this.names = ImmutableList.of(catalogName, schemaName, name);
     }
 
     public static MockTable create(MockCatalogReader catalogReader,
-        MockSchema schema, String name) {
+        MockSchema schema, String name, boolean stream) {
       MockTable table =
           new MockTable(catalogReader, schema.getCatalogName(), schema.name,
-              name);
+              name, stream);
       schema.addTable(name);
       return table;
     }
@@ -461,6 +488,10 @@ public class MockCatalogReader implements Prepare.CatalogReader {
       return rowType;
     }
 
+    public boolean supportsModality(SqlModality modality) {
+      return modality == (stream ? SqlModality.STREAM : SqlModality.RELATION);
+    }
+
     public void onRegister(RelDataTypeFactory typeFactory) {
       rowType = typeFactory.createStructType(columnList);
       collationList = deduceMonotonicity(this);
@@ -471,7 +502,9 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     }
 
     public SqlMonotonicity getMonotonicity(String columnName) {
-      return SqlMonotonicity.NOT_MONOTONIC;
+      return monotonicColumnSet.contains(columnName)
+          ? SqlMonotonicity.INCREASING
+          : SqlMonotonicity.NOT_MONOTONIC;
     }
 
     public SqlAccessType getAllowedAccess() {
@@ -490,9 +523,14 @@ public class MockCatalogReader implements Prepare.CatalogReader {
       columnList.add(Pair.of(name, type));
     }
 
+    public void addMonotonic(String name) {
+      monotonicColumnSet.add(name);
+      assert Pair.left(columnList).contains(name);
+    }
+
     public RelOptTable extend(List<RelDataTypeField> extendedFields) {
       final MockTable table = new MockTable(catalogReader, names.get(0),
-          names.get(1), names.get(2));
+          names.get(1), names.get(2), stream);
       table.columnList.addAll(columnList);
       table.columnList.addAll(extendedFields);
       table.onRegister(catalogReader.typeFactory);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index bb70d37..5f9aa29 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -793,7 +793,7 @@ public class RelOptRulesTest extends RelOptTestBase {
                     typeFactory.createSqlType(SqlTypeName.INTEGER);
                 for (int i = 0; i < 10; i++) {
                   String t = String.valueOf((char) ('A' + i));
-                  MockTable table = MockTable.create(this, schema, t);
+                  MockTable table = MockTable.create(this, schema, t, false);
                   table.addColumn(t, intType);
                   registerTable(table);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/SqlFunctionsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlFunctionsTest.java b/core/src/test/java/org/apache/calcite/test/SqlFunctionsTest.java
index da43a66..9f82294 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlFunctionsTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlFunctionsTest.java
@@ -23,6 +23,7 @@ import org.apache.calcite.runtime.Utilities;
 
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -38,6 +39,7 @@ import static org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUni
 import static org.apache.calcite.avatica.util.DateTimeUtils.unixDateExtract;
 import static org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString;
 import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimeToString;
+import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestamp;
 import static org.apache.calcite.avatica.util.DateTimeUtils.unixTimestampToString;
 import static org.apache.calcite.avatica.util.DateTimeUtils.ymdToJulian;
 import static org.apache.calcite.avatica.util.DateTimeUtils.ymdToUnixDate;
@@ -53,6 +55,7 @@ import static org.apache.calcite.runtime.SqlFunctions.trim;
 import static org.apache.calcite.runtime.SqlFunctions.upper;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -310,6 +313,47 @@ public class SqlFunctionsTest {
         equalTo((long) day));
   }
 
+  @Test public void testUnixTimestamp() {
+    assertThat(unixTimestamp(1970, 1, 1, 0, 0, 0), is(0L));
+    assertThat(unixTimestamp(1970, 1, 2, 0, 0, 0), is(86400000L));
+    assertThat(unixTimestamp(1970, 1, 1, 23, 59, 59), is(86399000L));
+  }
+
+  @Test public void testFloor() {
+    checkFloor(0, 10, 0);
+    checkFloor(27, 10, 20);
+    checkFloor(30, 10, 30);
+    checkFloor(-30, 10, -30);
+    checkFloor(-27, 10, -30);
+  }
+
+  private void checkFloor(int x, int y, int result) {
+    assertThat(SqlFunctions.floor(x, y), is(result));
+    assertThat(SqlFunctions.floor((long) x, (long) y), is((long) result));
+    assertThat(SqlFunctions.floor((short) x, (short) y), is((short) result));
+    assertThat(SqlFunctions.floor((byte) x, (byte) y), is((byte) result));
+    assertThat(SqlFunctions.floor(BigDecimal.valueOf(x), BigDecimal.valueOf(y)),
+        is(BigDecimal.valueOf(result)));
+  }
+
+  @Test public void testCeil() {
+    checkCeil(0, 10, 0);
+    checkCeil(27, 10, 30);
+    checkCeil(30, 10, 30);
+    checkCeil(-30, 10, -30);
+    checkCeil(-27, 10, -20);
+    checkCeil(-27, 1, -27);
+  }
+
+  private void checkCeil(int x, int y, int result) {
+    assertThat(SqlFunctions.ceil(x, y), is(result));
+    assertThat(SqlFunctions.ceil((long) x, (long) y), is((long) result));
+    assertThat(SqlFunctions.ceil((short) x, (short) y), is((short) result));
+    assertThat(SqlFunctions.ceil((byte) x, (byte) y), is((byte) result));
+    assertThat(SqlFunctions.ceil(BigDecimal.valueOf(x), BigDecimal.valueOf(y)),
+        is(BigDecimal.valueOf(result)));
+  }
+
   /** Unit test for
    * {@link Utilities#compare(java.util.List, java.util.List)}. */
   @Test public void testCompare() {
@@ -318,7 +362,7 @@ public class SqlFunctionsTest {
     final List<String> a = Arrays.asList("a");
     final List<String> empty = Collections.emptyList();
     assertEquals(0, Utilities.compare(ac, ac));
-    assertEquals(0, Utilities.compare(ac, new ArrayList<String>(ac)));
+    assertEquals(0, Utilities.compare(ac, new ArrayList<>(ac)));
     assertEquals(-1, Utilities.compare(a, ac));
     assertEquals(-1, Utilities.compare(empty, ac));
     assertEquals(1, Utilities.compare(ac, a));

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 0d35faf..04fd420 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -1033,6 +1033,27 @@ public class SqlToRelConverterTest extends SqlToRelTestBase {
     }
   }
 
+  @Test public void testStream() {
+    sql("select stream productId from orders where productId = 10")
+        .convertsTo("${plan}");
+  }
+
+  @Test public void testStreamGroupBy() {
+    sql("select stream floor(rowtime to second) as rowtime, count(*) as c\n"
+            + "from orders\n"
+            + "group by floor(rowtime to second)")
+        .convertsTo("${plan}");
+  }
+
+  @Test public void testStreamWindowedAggregation() {
+    sql("select stream *,\n"
+            + "  count(*) over (partition by productId\n"
+            + "    order by rowtime\n"
+            + "    range interval '1' second preceding) as c\n"
+            + "from orders")
+        .convertsTo("${plan}");
+  }
+
   @Test public void testExplainAsXml() {
     String sql = "select 1 + 2, 3 from (values (true))";
     final RelNode rel = tester.convertSqlToRel(sql);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index d569d94..d304ff5 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -97,6 +97,15 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
           + " INTEGER NOT NULL DEPTNO,"
           + " BOOLEAN NOT NULL SLACKER) NOT NULL";
 
+  private static final String STR_AGG_REQUIRES_MONO =
+      "Streaming aggregation requires at least one monotonic expression in GROUP BY clause";
+
+  private static final String STR_ORDER_REQUIRES_MONO =
+      "Streaming ORDER BY must start with monotonic expression";
+
+  private static final String STR_SET_OP_INCONSISTENT =
+      "Set operator cannot combine streaming and non-streaming inputs";
+
   //~ Constructors -----------------------------------------------------------
 
   public SqlValidatorTest() {
@@ -111,6 +120,14 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     Locale.setDefault(Locale.US);
   }
 
+  private static String cannotConvertToStream(String name) {
+    return "Cannot convert table '" + name + "' to stream";
+  }
+
+  private static String cannotConvertToRelation(String table) {
+    return "Cannot convert stream '" + table + "' to relation";
+  }
+
   @Test public void testMultipleSameAsPass() {
     check("select 1 as again,2 as \"again\", 3 as AGAiN from (values (true))");
   }
@@ -4933,8 +4950,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
   }
 
   @Test public void testSumInvalidArgs() {
-    checkFails(
-        "select ^sum(ename)^, deptno from emp group by deptno",
+    checkFails("select ^sum(ename)^, deptno from emp group by deptno",
         "(?s)Cannot apply 'SUM' to arguments of type 'SUM\\(<VARCHAR\\(20\\)>\\)'\\. .*");
   }
 
@@ -6099,8 +6115,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     checkWholeExpFails(
         "interval '1' second >= interval '1' year",
         "(?s).*Cannot apply '>=' to arguments of type '<INTERVAL SECOND> >= <INTERVAL YEAR>'.*");
-    checkWholeExpFails(
-        "interval '1' month = interval '1' day",
+    checkWholeExpFails("interval '1' month = interval '1' day",
         "(?s).*Cannot apply '=' to arguments of type '<INTERVAL MONTH> = <INTERVAL DAY>'.*");
   }
 
@@ -6129,8 +6144,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
   @Test public void testExtract() {
     // TODO: Need to have extract return decimal type for seconds
     // so we can have seconds fractions
-    checkExpType(
-        "extract(year from interval '1-2' year to month)",
+    checkExpType("extract(year from interval '1-2' year to month)",
         "BIGINT NOT NULL");
     checkExp("extract(minute from interval '1.1' second)");
     checkExp("extract(year from DATE '2008-2-2')");
@@ -6156,8 +6170,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     checkExpType(
         "cast(interval '1-1' year to month as interval month)",
         "INTERVAL MONTH NOT NULL");
-    checkExpType(
-        "cast(interval '1:1' hour to minute as interval day)",
+    checkExpType("cast(interval '1:1' hour to minute as interval day)",
         "INTERVAL DAY NOT NULL");
     checkExpType(
         "cast(interval '1:1' hour to minute as interval minute to second)",
@@ -6175,8 +6188,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     checkExpType(
         "(CURRENT_DATE - CURRENT_DATE) HOUR",
         "INTERVAL HOUR NOT NULL");
-    checkExpType(
-        "(CURRENT_DATE - CURRENT_DATE) YEAR TO MONTH",
+    checkExpType("(CURRENT_DATE - CURRENT_DATE) YEAR TO MONTH",
         "INTERVAL YEAR TO MONTH NOT NULL");
     checkWholeExpFails(
         "(CURRENT_DATE - LOCALTIME) YEAR TO MONTH",
@@ -6201,9 +6213,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
 
   @Test public void testUnnest() {
     checkColumnType("select*from unnest(multiset[1])", "INTEGER NOT NULL");
-    checkColumnType(
-        "select*from unnest(multiset[1, 2])",
-        "INTEGER NOT NULL");
+    checkColumnType("select*from unnest(multiset[1, 2])", "INTEGER NOT NULL");
     checkColumnType(
         "select*from unnest(multiset[321.3, 2.33])",
         "DECIMAL(5, 2) NOT NULL");
@@ -6247,14 +6257,11 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     checkColumnType(
         "values new address()",
         "ObjectSqlType(ADDRESS) NOT NULL");
-    checkColumnType(
-        "select home_address from emp_address",
+    checkColumnType("select home_address from emp_address",
         "ObjectSqlType(ADDRESS) NOT NULL");
-    checkColumnType(
-        "select ea.home_address.zip from emp_address ea",
+    checkColumnType("select ea.home_address.zip from emp_address ea",
         "INTEGER NOT NULL");
-    checkColumnType(
-        "select ea.mailing_address.city from emp_address ea",
+    checkColumnType("select ea.mailing_address.city from emp_address ea",
         "VARCHAR(20) NOT NULL");
   }
 
@@ -6280,8 +6287,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
   }
 
   @Test public void testFusion() {
-    checkFails(
-        "select ^fusion(deptno)^ from emp",
+    checkFails("select ^fusion(deptno)^ from emp",
         "(?s).*Cannot apply 'FUSION' to arguments of type 'FUSION.<INTEGER>.'.*");
     check("select fusion(multiset[3]) from emp");
     // todo. FUSION is an aggregate function. test that validator only can
@@ -6305,8 +6311,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
         "Column 'GENDER' not found in any table");
     check("select count(ename, 1, deptno) from emp");
     check("select count(distinct ename, 1, deptno) from emp");
-    checkFails(
-        "select count(deptno, *) from emp",
+    checkFails("select count(deptno, *) from emp",
         "(?s).*Encountered \", \\*\" at .*");
     checkFails(
         "select count(*, deptno) from emp",
@@ -6333,8 +6338,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
 
   @Test public void testFunctionalDistinct() {
     check("select count(distinct sal) from emp");
-    checkFails(
-        "select COALESCE(^distinct^ sal) from emp",
+    checkFails("select COALESCE(^distinct^ sal) from emp",
         "DISTINCT/ALL not allowed with COALESCE function");
   }
 
@@ -6393,8 +6397,8 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     checkResultType("select * from dept extend (x int not null)",
         "RecordType(INTEGER NOT NULL DEPTNO, VARCHAR(10) NOT NULL NAME, INTEGER NOT NULL X) NOT NULL");
     checkResultType("select deptno + x as z\n"
-        + "from dept extend (x int not null) as x\n"
-        + "where x > 10",
+            + "from dept extend (x int not null) as x\n"
+            + "where x > 10",
         "RecordType(INTEGER NOT NULL Z) NOT NULL");
   }
 
@@ -6411,9 +6415,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
             + " BOOLEAN NOT NULL SLACKER) NOT NULL";
     checkResultType("select * from (table emp)", empRecordType);
     checkResultType("table emp", empRecordType);
-    checkFails(
-        "table ^nonexistent^",
-        "Table 'NONEXISTENT' not found");
+    checkFails("table ^nonexistent^", "Table 'NONEXISTENT' not found");
   }
 
   @Test public void testCollectionTable() {
@@ -6421,8 +6423,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
         "select * from table(ramp(3))",
         "RecordType(INTEGER NOT NULL I) NOT NULL");
 
-    checkFails(
-        "select * from table(^ramp('3')^)",
+    checkFails("select * from table(^ramp('3')^)",
         "Cannot apply 'RAMP' to arguments of type 'RAMP\\(<CHAR\\(1\\)>\\)'\\. Supported form\\(s\\): 'RAMP\\(<NUMERIC>\\)'");
 
     checkFails(
@@ -6687,8 +6688,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
             + " CATALOG.SALES.DEPT.DEPTNO,"
             + " CATALOG.SALES.DEPT.NAME}");
 
-    tester.checkFieldOrigin(
-        "select distinct emp.empno, hiredate, 1 as one,\n"
+    tester.checkFieldOrigin("select distinct emp.empno, hiredate, 1 as one,\n"
             + " emp.empno * 2 as twiceEmpno\n"
             + "from emp join dept on true",
         "{CATALOG.SALES.EMP.EMPNO,"
@@ -6712,12 +6712,10 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
             + "  select [e].EMPNO as [x] from [EMP] as [e])",
         "Column 'X' not found in any table");
 
-    tester1.checkQueryFails(
-        "select EMP.^\"x\"^ from EMP",
+    tester1.checkQueryFails("select EMP.^\"x\"^ from EMP",
         "(?s).*Encountered \"\\. \\\\\"\" at line .*");
 
-    tester1.checkResultType(
-        "select [x[y]] z ] from (\n"
+    tester1.checkResultType("select [x[y]] z ] from (\n"
             + "  select [e].EMPNO as [x[y]] z ] from [EMP] as [e])",
         "RecordType(INTEGER NOT NULL x[y] z ) NOT NULL");
   }
@@ -6949,6 +6947,139 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     tester1.checkQuery("select empNo from (select Empno from emP)");
   }
 
+  @Test public void testStream() {
+    sql("select stream * from orders").ok();
+    sql("select stream * from ^emp^")
+        .fails(cannotConvertToStream("EMP"));
+    sql("select * from ^orders^")
+        .fails(cannotConvertToRelation("ORDERS"));
+  }
+
+  @Test public void testStreamWhere() {
+    sql("select stream * from orders where productId < 10").ok();
+    sql("select stream * from ^emp^ where deptno = 10")
+        .fails(cannotConvertToStream("EMP"));
+    sql("select stream * from ^emp^ as e where deptno = 10")
+        .fails(cannotConvertToStream("E"));
+    sql("select stream * from (^select * from emp as e1^) as e\n"
+        + "where deptno = 10")
+        .fails("Cannot convert table 'E' to stream");
+    sql("select * from ^orders^ where productId > 10")
+        .fails(cannotConvertToRelation("ORDERS"));
+  }
+
+  @Test public void testStreamGroupBy() {
+    sql("select stream rowtime, productId, count(*) as c\n"
+        + "from orders\n"
+        + "group by productId, rowtime").ok();
+    sql("select stream floor(rowtime to hour) as rowtime, productId,\n"
+        + " count(*) as c\n"
+        + "from orders\n"
+        + "group by floor(rowtime to hour), productId").ok();
+    sql("select stream productId, count(*) as c\n"
+        + "from orders\n"
+        + "^group by productId^")
+        .fails(STR_AGG_REQUIRES_MONO);
+    sql("select stream ^count(*)^ as c\n"
+        + "from orders")
+        .fails(STR_AGG_REQUIRES_MONO);
+    sql("select stream count(*) as c\n"
+        + "from orders ^group by ()^")
+        .fails(STR_AGG_REQUIRES_MONO);
+  }
+
+  @Test public void testStreamHaving() {
+    sql("select stream rowtime, productId, count(*) as c\n"
+        + "from orders\n"
+        + "group by productId, rowtime\n"
+        + "having count(*) > 5").ok();
+    sql("select stream floor(rowtime to hour) as rowtime, productId,\n"
+        + " count(*) as c\n"
+        + "from orders\n"
+        + "group by floor(rowtime to hour), productId\n"
+        + "having false").ok();
+    sql("select stream productId, count(*) as c\n"
+        + "from orders\n"
+        + "^group by productId^\n"
+        + "having count(*) > 5")
+        .fails(STR_AGG_REQUIRES_MONO);
+    sql("select stream 1\n"
+        + "from orders\n"
+        + "having ^count(*) > 3^")
+        .fails(STR_AGG_REQUIRES_MONO);
+  }
+
+  @Test public void testStreamUnionAll() {
+    sql("select orderId\n"
+        + "from ^orders^\n"
+        + "union all\n"
+        + "select orderId\n"
+        + "from shipments")
+        .fails(cannotConvertToRelation("ORDERS"));
+    sql("select stream orderId\n"
+        + "from orders\n"
+        + "union all\n"
+        + "^select orderId\n"
+        + "from shipments^")
+        .fails(STR_SET_OP_INCONSISTENT);
+    sql("select empno\n"
+        + "from emp\n"
+        + "union all\n"
+        + "^select stream orderId\n"
+        + "from orders^")
+        .fails(STR_SET_OP_INCONSISTENT);
+    sql("select stream orderId\n"
+        + "from orders\n"
+        + "union all\n"
+        + "select stream orderId\n"
+        + "from shipments").ok();
+    sql("select stream rowtime, orderId\n"
+        + "from orders\n"
+        + "union all\n"
+        + "select stream rowtime, orderId\n"
+        + "from shipments\n"
+        + "order by rowtime").ok();
+  }
+
+  @Test public void testStreamValues() {
+    sql("select stream * from (^values 1^) as e")
+        .fails(cannotConvertToStream("E"));
+    sql("select stream orderId from orders\n"
+        + "union all\n"
+        + "^values 1^")
+        .fails(STR_SET_OP_INCONSISTENT);
+    sql("values 1, 2\n"
+        + "union all\n"
+        + "^select stream orderId from orders^\n")
+        .fails(STR_SET_OP_INCONSISTENT);
+  }
+
+  @Test public void testStreamOrderBy() {
+    sql("select stream *\n"
+        + "from orders\n"
+        + "order by rowtime").ok();
+    sql("select stream *\n"
+        + "from orders\n"
+        + "order by floor(rowtime to hour)").ok();
+    sql("select stream floor(rowtime to minute), productId\n"
+        + "from orders\n"
+        + "order by floor(rowtime to hour)").ok();
+    sql("select stream floor(rowtime to minute), productId\n"
+        + "from orders\n"
+        + "order by floor(rowtime to minute), productId desc").ok();
+    sql("select stream *\n"
+        + "from orders\n"
+        + "order by ^productId^, rowtime")
+        .fails(STR_ORDER_REQUIRES_MONO);
+    sql("select stream *\n"
+        + "from orders\n"
+        + "order by ^rowtime desc^")
+        .fails(STR_ORDER_REQUIRES_MONO);
+    sql("select stream *\n"
+        + "from orders\n"
+        + "order by floor(rowtime to hour), rowtime desc").ok();
+  }
+
   @Test public void testNew() {
     // (To debug individual statements, paste them into this method.)
     //            1         2         3         4         5         6

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
new file mode 100644
index 0000000..d4f4117
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TableFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for streaming queries.
+ */
+public class StreamTest {
+  public static final String STREAM_SCHEMA = "     {\n"
+      + "       name: 'STREAMS',\n"
+      + "       tables: [ {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'ORDERS',\n"
+      + "         stream: {\n"
+      + "           stream: true\n"
+      + "         },\n"
+      + "         factory: '" + OrdersStreamTableFactory.class.getName() + "'\n"
+      + "       } ]\n"
+      + "     }\n";
+
+  public static final String STREAM_MODEL = "{\n"
+      + "  version: '1.0',\n"
+      + "  defaultSchema: 'foodmart',\n"
+      + "   schemas: [\n"
+      + STREAM_SCHEMA
+      + "   ]\n"
+      + "}";
+
+  @Test public void testStream() {
+    CalciteAssert.model(STREAM_MODEL)
+        .withDefaultSchema("STREAMS")
+        .query("select stream * from orders")
+        .convertContains("LogicalDelta\n"
+            + "  LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3])\n"
+            + "    EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+        .explainContains("EnumerableInterpreter\n"
+            + "  BindableTableScan(table=[[]])")
+        .returns(
+            startsWith(
+                "ROWTIME=2015-02-15 10:15:00; ID=1; PRODUCT=paint; UNITS=10",
+                "ROWTIME=2015-02-15 10:24:15; ID=2; PRODUCT=paper; UNITS=5"));
+  }
+
+  @Test public void testStreamFilterProject() {
+    CalciteAssert.model(STREAM_MODEL)
+        .withDefaultSchema("STREAMS")
+        .query("select stream product from orders where units > 6")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalProject(PRODUCT=[$2])\n"
+                + "    LogicalFilter(condition=[>($3, 6)])\n"
+                + "      EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+        .explainContains(
+            "EnumerableCalc(expr#0..3=[{inputs}], expr#4=[6], expr#5=[>($t3, $t4)], PRODUCT=[$t2], $condition=[$t5])\n"
+                + "  EnumerableInterpreter\n"
+                + "    BindableTableScan(table=[[]])")
+        .returns(
+            startsWith("PRODUCT=paint",
+                "PRODUCT=brush"));
+  }
+
+  @Test public void testStreamGroupByHaving() {
+    CalciteAssert.model(STREAM_MODEL)
+        .withDefaultSchema("STREAMS")
+        .query("select stream floor(rowtime to hour) as rowtime,\n"
+            + "  product, count(*) as c\n"
+            + "from orders\n"
+            + "group by floor(rowtime to hour), product\n"
+            + "having count(*) > 1")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalFilter(condition=[>($2, 1)])\n"
+                + "    LogicalAggregate(group=[{0, 1}], C=[COUNT()])\n"
+                + "      LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2])\n"
+                + "        EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+        .explainContains(
+            "EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])\n"
+                + "  EnumerableAggregate(group=[{0, 1}], C=[COUNT()])\n"
+                + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2])\n"
+                + "      EnumerableInterpreter\n"
+                + "        BindableTableScan(table=[[]])")
+        .returns(
+            startsWith("ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; C=2"));
+  }
+
+  @Test public void testStreamOrderBy() {
+    CalciteAssert.model(STREAM_MODEL)
+        .withDefaultSchema("STREAMS")
+        .query("select stream floor(rowtime to hour) as rowtime,\n"
+            + "  product, units\n"
+            + "from orders\n"
+            + "order by floor(orders.rowtime to hour), product desc")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+                + "    LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2], UNITS=[$3])\n"
+                + "      EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+        .explainContains(
+            "EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+                + "  EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2], UNITS=[$t3])\n"
+                + "    EnumerableInterpreter\n"
+                + "      BindableTableScan(table=[[]])")
+        .returns(
+            startsWith("ROWTIME=2015-02-15 10:00:00; PRODUCT=paper; UNITS=5",
+                "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=10",
+                "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=3"));
+  }
+
+  @Ignore
+  @Test public void testStreamUnionAllOrderBy() {
+    CalciteAssert.model(STREAM_MODEL)
+        .withDefaultSchema("STREAMS")
+        .query("select stream *\n"
+            + "from (\n"
+            + "  select rowtime, product\n"
+            + "  from orders\n"
+            + "  union all\n"
+            + "  select rowtime, product\n"
+            + "  from orders)\n"
+            + "order by rowtime\n")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalSort(sort0=[$0], dir0=[ASC])\n"
+                + "    LogicalProject(ROWTIME=[$0], PRODUCT=[$1])\n"
+                + "      LogicalUnion(all=[true])\n"
+                + "        LogicalProject(ROWTIME=[$0], PRODUCT=[$2])\n"
+                + "          EnumerableTableScan(table=[[STREAMS, ORDERS]])\n"
+                + "        LogicalProject(ROWTIME=[$0], PRODUCT=[$2])\n"
+                + "          EnumerableTableScan(table=[[STREAMS, ORDERS]])\n")
+        .explainContains(
+            "EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n"
+                + "  EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2], UNITS=[$t3])\n"
+                + "    EnumerableInterpreter\n"
+                + "      BindableTableScan(table=[[]])")
+        .returns(
+            startsWith("ROWTIME=2015-02-15 10:00:00; PRODUCT=paper; UNITS=5",
+                "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=10",
+                "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=3"));
+  }
+
+  private Function<ResultSet, Void> startsWith(String... rows) {
+    final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
+    return new Function<ResultSet, Void>() {
+      public Void apply(ResultSet input) {
+        try {
+          final StringBuilder buf = new StringBuilder();
+          final ResultSetMetaData metaData = input.getMetaData();
+          for (String expectedRow : rowList) {
+            if (!input.next()) {
+              throw new AssertionError("input ended too soon");
+            }
+            CalciteAssert.rowToString(input, buf, metaData);
+            String actualRow = buf.toString();
+            buf.setLength(0);
+            assertThat(actualRow, equalTo(expectedRow));
+          }
+          return null;
+        } catch (SQLException e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  /** Mock table that returns a stream of orders from a fixed array. */
+  @SuppressWarnings("UnusedDeclaration")
+  public static class OrdersStreamTableFactory implements TableFactory<Table> {
+    // public constructor, per factory contract
+    public OrdersStreamTableFactory() {
+    }
+
+    public Table create(SchemaPlus schema, String name,
+        Map<String, Object> operand, RelDataType rowType) {
+      final RelProtoDataType protoRowType = new RelProtoDataType() {
+        public RelDataType apply(RelDataTypeFactory a0) {
+          return a0.builder()
+              .add("ROWTIME", SqlTypeName.TIMESTAMP)
+              .add("ID", SqlTypeName.INTEGER)
+              .add("PRODUCT", SqlTypeName.VARCHAR, 10)
+              .add("UNITS", SqlTypeName.INTEGER)
+              .build();
+        }
+      };
+      final ImmutableList<Object[]> rows = ImmutableList.of(
+          new Object[] {ts(10, 15, 0), 1, "paint", 10},
+          new Object[] {ts(10, 24, 15), 2, "paper", 5},
+          new Object[] {ts(10, 24, 45), 3, "brush", 12},
+          new Object[] {ts(10, 58, 0), 4, "paint", 3},
+          new Object[] {ts(11, 10, 0), 5, "paint", 3});
+
+      return new StreamableTable() {
+        public Table stream() {
+          return new OrdersTable(protoRowType, rows);
+        }
+
+        public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+          return protoRowType.apply(typeFactory);
+        }
+
+        public Statistic getStatistic() {
+          return Statistics.of(100d,
+              ImmutableList.<ImmutableBitSet>of(),
+              RelCollations.createSingleton(0));
+        }
+
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.TABLE;
+        }
+      };
+    }
+
+    private Object ts(int h, int m, int s) {
+      return DateTimeUtils.unixTimestamp(2015, 2, 15, h, m, s);
+    }
+  }
+
+  /** Table representing the ORDERS stream. */
+  public static class OrdersTable implements ScannableTable {
+    private final RelProtoDataType protoRowType;
+    private final ImmutableList<Object[]> rows;
+
+    public OrdersTable(RelProtoDataType protoRowType,
+        ImmutableList<Object[]> rows) {
+      this.protoRowType = protoRowType;
+      this.rows = rows;
+    }
+
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(rows);
+    }
+
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    public Statistic getStatistic() {
+      return Statistics.of(100d,
+          ImmutableList.<ImmutableBitSet>of(),
+          RelCollations.createSingleton(0));
+    }
+
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.STREAM;
+    }
+  }
+}
+
+// End StreamTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 551c9ec..6566401 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -934,7 +934,7 @@ LogicalSort(sort0=[$1], dir0=[ASC])
     </TestCase>
     <TestCase name="testOrderByOrdinalDesc">
         <Resource name="sql">
-            <![CDATA[select empno + 1, deptno, empno from emp order by 2 desc]]>
+            <![CDATA[select empno + 1, deptno, empno from emp order by 2.5 desc]]>
         </Resource>
         <Resource name="plan">
             <![CDATA[
@@ -2357,4 +2357,48 @@ LogicalProject(EXPR$0=[$0])
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testStream">
+        <Resource name="sql">
+            <![CDATA[select stream productId from orders where productId = 10]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalDelta
+  LogicalProject(PRODUCTID=[$1])
+    LogicalFilter(condition=[=($1, 10)])
+      LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testStreamGroupBy">
+        <Resource name="sql">
+            <![CDATA[select stream floor(rowtime to second) as rowtime, count(*) as c
+from orders
+group by floor(rowtime to second)]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalDelta
+  LogicalAggregate(group=[{0}], C=[COUNT()])
+    LogicalProject(ROWTIME=[FLOOR($0, FLAG(SECOND))])
+      LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testStreamWindowedAggregation">
+        <Resource name="sql">
+            <![CDATA[select stream *,
+  count(*) over (partition by productId
+    order by rowtime
+    range interval '1' second preceding) as c
+from orders]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalDelta
+  LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2], C=[COUNT() OVER (PARTITION BY $1 ORDER BY $0 RANGE BETWEEN 1000 PRECEDING AND CURRENT ROW)])
+    LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+]]>
+        </Resource>
+    </TestCase>
 </Root>


[2/4] incubator-calcite git commit: [CALCITE-602] Streaming queries

Posted by jh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/doc/REFERENCE.md
----------------------------------------------------------------------
diff --git a/doc/REFERENCE.md b/doc/REFERENCE.md
index 412bec6..e0c6278 100644
--- a/doc/REFERENCE.md
+++ b/doc/REFERENCE.md
@@ -67,7 +67,7 @@ orderItem:
       expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]
 
 select:
-      SELECT [ ALL | DISTINCT ]
+      SELECT [ STREAM ] [ ALL | DISTINCT ]
           { * | projectItem [, projectItem ]* }
       FROM tableExpression
       [ WHERE booleanExpression ]
@@ -324,6 +324,8 @@ Not implemented:
 | CURRENT_DATE              | Returns the current date in the session time zone, in a value of datatype DATE
 | CURRENT_TIMESTAMP         | Returns the current date and time in the session time zone, in a value of datatype TIMESTAMP WITH TIME ZONE
 | EXTRACT(timeUnit FROM datetime) | Extracts and returns the value of a specified datetime field from a datetime value expression
+| FLOOR(datetime TO timeUnit) | Rounds *datetime* down to *timeUnit*
+| CEIL(datetime TO timeUnit) | Rounds *datetime* up to *timeUnit*
 
 Not implemented:
 * EXTRACT(timeUnit FROM interval)
@@ -533,3 +535,5 @@ Not implemented:
 | Operator syntax      | Description
 | -------------------- | -----------
 | GROUPING(expression) | Returns 1 if expression is rolled up in the current row's grouping set, 0 otherwise
+| GROUP_ID()           | Returns an integer that uniquely identifies the combination of grouping keys
+| GROUPING_ID(expression [, expression ] * ) | Returns a bit vector of the given grouping expressions

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/doc/STREAM.md
----------------------------------------------------------------------
diff --git a/doc/STREAM.md b/doc/STREAM.md
new file mode 100644
index 0000000..b0981fd
--- /dev/null
+++ b/doc/STREAM.md
@@ -0,0 +1,521 @@
+# Calcite SQL extensions for streaming
+
+## Introduction
+
+Streams are collections to records that flow continuously, and forever.
+Unlike tables, they are not typically stored on disk, but flow over the
+network and are held for short periods of time in memory.
+
+Streams complement tables because they represent what is happening in the
+present and future of the enterprise whereas tables represent the past.
+It is very common for a stream to be archived into a table.
+
+Like tables, you often want to query streams in a high-level language
+based on relational algebra, validated according to a schema, and optimized
+to take advantage of available resources and algorithms.
+
+Calcite's SQL is an extension to standard SQL, not another 'SQL-like' language.
+The distinction is important, for several reasons:
+* Streaming SQL is easy to learn for anyone who knows regular SQL.
+* The semantics are clear, because we aim to produce the same results on a
+  stream as if the same data were in a table.
+* You can write queries that combine streams and tables (or the history of
+  a stream, which is basically an in-memory table).
+* Lots of existing tools can generate standard SQL.
+
+## An example schema
+
+Our streaming SQL examples use the following schema:
+* `Orders (rowtime, productId, orderId, units)` - a stream and a table
+* `Products (rowtime, productId, name)` - a table
+* `Shipments (rowtime, orderId)` - a stream
+
+## A simple query
+
+Let's start with the simplest streaming query:
+
+```sql
+SELECT STREAM *
+FROM Orders;
+
+  rowtime | productId | orderId | units
+----------+-----------+---------+-------
+ 10:17:00 |        30 |       5 |     4
+ 10:17:05 |        10 |       6 |     1
+ 10:18:05 |        20 |       7 |     2
+ 10:18:07 |        30 |       8 |    20
+ 11:02:00 |        10 |       9 |     6
+ 11:04:00 |        10 |      10 |     1
+ 11:09:30 |        40 |      11 |    12
+ 11:24:11 |        10 |      12 |     4
+```
+
+This query reads all columns and rows from the `Orders` stream.
+Like any streaming query, it never terminates. It outputs a record whenever
+a record arrives in `Orders`.
+
+Type `Control-C` to terminate the query.
+
+The `STREAM` keyword is the main extension in streaming SQL. It tells the
+system that you are interested in incoming orders, not existing ones. The query
+
+```sql
+SELECT *
+FROM Orders;
+
+  rowtime | productId | orderId | units
+----------+-----------+---------+-------
+ 08:30:00 |        10 |       1 |     3
+ 08:45:10 |        20 |       2 |     1
+ 09:12:21 |        10 |       3 |    10
+ 09:27:44 |        30 |       4 |     2
+
+4 records returned.
+```
+
+is also valid, but will print out all existing orders and then terminate. We
+call it a *relational* query, as opposed to *streaming*. It has traditional
+SQL semantics.
+
+`Orders` is special, in that it has both a stream and a table. If you try to run
+a streaming query on a table, or a relational query on a stream, Calcite gives
+an error:
+
+```sql
+> SELECT * FROM Shipments;
+ERROR: Cannot convert table 'SHIPMENTS' to a stream
+
+> SELECT STREAM * FROM Products;
+ERROR: Cannot convert stream 'PRODUCTS' to a table
+```
+
+# Filtering rows
+
+Just as in regular SQL, you use a `WHERE` clause to filter rows:
+
+```sql
+SELECT STREAM *
+FROM Orders
+WHERE units > 3;
+
+  rowtime | productId | orderId | units
+----------+-----------+---------+-------
+ 10:17:00 |        30 |       5 |     4
+ 10:18:07 |        30 |       8 |    20
+ 11:02:00 |        10 |       9 |     6
+ 11:09:30 |        40 |      11 |    12
+ 11:24:11 |        10 |      12 |     4
+```
+
+# Projecting expressions
+
+Use expressions in the `SELECT` clause to choose which columns to return or
+compute expressions:
+
+```sql
+SELECT STREAM rowtime,
+  'An order for ' || units || ' '
+    || CASE units WHEN 1 THEN 'unit' ELSE 'units' END
+    || ' of product #' || productId AS description
+FROM Orders;
+
+  rowtime | description
+----------+---------------------------------------
+ 10:17:00 | An order for 4 units of product #30
+ 10:17:05 | An order for 1 unit of product #10
+ 10:18:05 | An order for 2 units of product #20
+ 10:18:07 | An order for 20 units of product #30
+ 11:02:00 | An order by 6 units of product #10
+ 11:04:00 | An order by 1 unit of product #10
+ 11:09:30 | An order for 12 units of product #40
+ 11:24:11 | An order by 4 units of product #10
+```
+
+We recommend that you always include the `rowtime` column in the `SELECT`
+clause. Having a sorted timestamp in each stream and streaming query makes it
+possible to do advanced calculations later, such as `GROUP BY` and `JOIN`.
+
+# Tumbling windows
+
+There are several ways to compute aggregate functions on streams. The
+differences are:
+* How many rows come out for each row in?
+* Does each incoming value appear in one total, or more?
+* What defines the "window", the set of rows that contribute to a given output row?
+* Is the result a stream or a relation?
+
+First we'll look a *tumbling window*, which is defined by a streaming
+`GROUP BY`. Here is an example:
+
+```sql
+SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
+  productId,
+  COUNT(*) AS c,
+  SUM(units) AS units
+FROM Orders
+GROUP BY FLOOR(rowtime TO HOUR), productId;
+
+  rowtime | productId |       c | units
+----------+-----------+---------+-------
+ 10:00:00 |        30 |       2 |    24
+ 10:00:00 |        10 |       1 |     1
+ 10:00:00 |        20 |       1 |     7
+ 11:00:00 |        10 |       3 |    11
+ 11:00:00 |        40 |       1 |    12
+```
+
+The result is a stream. At 11 o'clock, Calcite emits a sub-total for every
+`productId` that had an order since 10 o'clock. At 12 o'clock, it will emit
+the orders that occurred between 11:00 and 12:00. Each input row contributes to
+only one output row.
+
+How did Calcite know that the 10:00:00 sub-totals were complete at 11:00:00,
+so that it could emit them? It knows that `rowtime` is increasing, and it knows
+that `FLOOR(rowtime TO HOUR)` is also increasing. So, once it has seen a row
+at or after 11:00:00, it will never see a row that will contribute to a 10:00:00
+total.
+
+A column or expression that is increasing or decreasing is said to be
+*monotonic*. Without a monotonic expression in the `GROUP BY` clause, Calcite is
+not able to make progress, and it will not allow the query:
+
+```sql
+> SELECT STREAM productId,
+>   COUNT(*) AS c,
+>   SUM(units) AS units
+> FROM Orders
+> GROUP BY productId;
+ERROR: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
+```
+
+Monotonic columns need to be declared in the schema. The monotonicity is
+enforced when records enter the stream and assumed by queries that read from
+that stream. We recommend that you give each stream a timestamp column called
+`rowtime`, but you can declare others, `orderId`, for example.
+
+# Filtering after aggregation
+
+As in standard SQL, you can apply a `HAVING` clause to filter rows emitted by
+a streaming `GROUP BY`:
+
+```sql
+SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
+  productId
+FROM Orders
+GROUP BY FLOOR(rowtime TO HOUR), productId
+HAVING COUNT(*) > 2 OR SUM(units) > 10;
+
+  rowtime | productId
+----------+-----------
+ 10:00:00 |        30
+ 11:00:00 |        10
+ 11:00:00 |        40
+```
+
+# Sub-queries, views and SQL's closure property
+
+The previous `HAVING` query can be expressed using a `WHERE` clause on a
+sub-query:
+
+```sql
+SELECT STREAM rowtime, productId
+FROM (
+  SELECT FLOOR(rowtime TO HOUR) AS rowtime,
+    productId,
+    COUNT(*) AS c,
+    SUM(units) AS su
+  FROM Orders
+  GROUP BY FLOOR(rowtime TO HOUR), productId)
+WHERE c > 2 OR su > 10;
+
+  rowtime | productId
+----------+-----------
+ 10:00:00 |        30
+ 11:00:00 |        10
+ 11:00:00 |        40
+```
+
+`HAVING` was introduced in the early days of SQL, when a way was needed to
+perform a filter *after* aggregation. (Recall that `WHERE` filters rows before
+they enter the `GROUP BY` clause.)
+
+Since then, SQL has become a mathematically closed language, which means that
+any operation you can perform on a table can also perform on a query.
+
+The *closure property* of SQL is extremely powerful. Not only does it render
+`HAVING` obsolete (or, at least, reduce it to syntactic sugar), it makes views
+possible:
+
+```sql
+CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
+  SELECT FLOOR(rowtime TO HOUR),
+    productId,
+    COUNT(*),
+    SUM(units)
+  FROM Orders
+  GROUP BY FLOOR(rowtime TO HOUR), productId;
+
+SELECT STREAM rowtime, productId
+FROM HourlyOrderTotals
+WHERE c > 2 OR su > 10;
+
+  rowtime | productId
+----------+-----------
+ 10:00:00 |        30
+ 11:00:00 |        10
+ 11:00:00 |        40
+```
+
+Sub-queries in the `FROM` clause are sometimes referred to as "inline views",
+but really, nested queries are more fundamental. Views are just a convenient
+way to carve your SQL into manageable chunks.
+
+Many people find that nested queries and views are even more useful on streams
+than they are on relations. Streaming queries are pipelines of
+operators all running continuously, and often those pipelines get quite long.
+Nested queries and views help to express and manage those pipelines.
+
+And, by the way, a `WITH` clause can accomplish the same as a sub-query or
+a view:
+
+```sql
+WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
+  SELECT FLOOR(rowtime TO HOUR),
+    productId,
+    COUNT(*),
+    SUM(units)
+  FROM Orders
+  GROUP BY FLOOR(rowtime TO HOUR), productId)
+SELECT STREAM rowtime, productId
+FROM HourlyOrderTotals
+WHERE c > 2 OR su > 10;
+
+  rowtime | productId
+----------+-----------
+ 10:00:00 |        30
+ 11:00:00 |        10
+ 11:00:00 |        40
+```
+
+## Converting between streams and relations
+
+Look back at the definition of the `HourlyOrderTotals` view.
+Is the view a stream or a relation?
+
+It does not contain the `STREAM` keyword, so it is a relation.
+However, it is a relation that can be converted into a stream.
+
+You can use it in both relational and streaming queries:
+
+```sql
+# A relation; will query the historic Orders table.
+# Returns the largest number of product #10 ever sold in one hour.
+SELECT max(su)
+FROM HourlyOrderTotals
+WHERE productId = 10;
+
+# A stream; will query the Orders stream.
+# Returns every hour in which at least one product #10 was sold.
+SELECT STREAM rowtime
+FROM HourlyOrderTotals
+WHERE productId = 10;
+```
+
+This approach is not limited to views and sub-queries.
+Following the approach set out in CQL [<a href="#ref1">1</a>], every query
+in streaming SQL is defined as a relational query and converted to a stream
+using the `STREAM` keyword in the top-most `SELECT`.
+
+If the `STREAM` keyword is present in sub-queries or view definitions, it has no
+effect.
+
+At query preparation time, Calcite figures out whether the relations referenced
+in the query can be converted to streams or historical relations.
+
+Sometimes a stream makes available some of its history (say the last 24 hours of
+data in an Apache Kafka [<a href="#ref2">2</a>] topic)
+but not all. At run time, Calcite figures out whether there is sufficient
+history to run the query, and if not, gives an error.
+
+## Hopping windows
+
+Previously we saw how to define a tumbling window using a `GROUP BY` clause.
+Each record contributed to a single sub-total record, the one containing its
+hour and product id.
+
+But suppose we want to emit, every hour, the number of each product ordered over
+the past three hours. To do this, we use `SELECT ... OVER` and a sliding window
+to combine multiple tumbling windows.
+
+```sql
+SELECT STREAM rowtime,
+  productId,
+  SUM(su) OVER w AS su,
+  SUM(c) OVER w AS c
+FROM HourlyTotals
+WINDOW w AS (
+  ORDER BY rowtime
+  PARTITION BY productId
+  RANGE INTERVAL '2' HOUR PRECEDING)
+```
+
+This query uses the `HourlyOrderTotals` view defined previously.
+The 2 hour interval combines the totals timestamped 09:00:00, 10:00:00 and
+11:00:00 for a particular product into a single total timestamped 11:00:00 and
+summarizing orders for that product between 09:00:00 and 12:00:00.
+
+## Limitations of tumbling and hopping windows
+
+In the present syntax, we acknowledge that it is not easy to create certain
+kinds of windows.
+
+First, let's consider tumbling windows over complex periods.
+
+The `FLOOR` and `CEIL` functions make is easy to create a tumbling window that
+emits on a whole time unit (say every hour, or every minute) but less easy to
+emit, say, every 15 minutes. One could imagine an extension to the `FLOOR`
+function that emits unique values on just about any periodic basis (say in 11
+minute intervals starting from midnight of the current day).
+
+Next, let's consider hopping windows whose retention period is not a multiple
+of its emission period. Say we want to output, at the top of each hour, the
+orders for the previous 7,007 seconds. If we were to simulate this hopping
+window using a sliding window over a tumbling window, as before, we would have
+to sum lots of 1-second windows (because 3,600 and 7,007 are co-prime).
+This is a lot of effort for both the system and the person writing the query.
+
+Calcite could perhaps solve this generalizing `GROUP BY` syntax, but we would
+be destroying the principle that an input row into a `GROUP BY` appears in
+precisely one output row.
+
+Calcite's SQL extensions for streaming queries are evolving. As we learn more
+about how people wish to query streams, we plan to make the language more
+expressive while remaining compatible with standard SQL and consistent with
+its principles, look and feel.
+
+## Sorting
+
+The story for `ORDER BY` is similar to `GROUP BY`.
+The syntax looks like regular SQL, but Calcite must be sure that it can deliver
+timely results. It therefore requires a monotonic expression on the leading edge
+of your `ORDER BY` key.
+
+```sql
+SELECT STREAM FLOOR(rowtime TO hour) AS rowtime, productId, orderId, units
+FROM Orders
+ORDER BY FLOOR(rowtime TO hour) ASC, units DESC;
+
+  rowtime | productId | orderId | units
+----------+-----------+---------+-------
+ 10:00:00 |        30 |       8 |    20
+ 10:00:00 |        30 |       5 |     4
+ 10:00:00 |        20 |       7 |     2
+ 10:00:00 |        10 |       6 |     1
+ 11:00:00 |        40 |      11 |    12
+ 11:00:00 |        10 |       9 |     6
+ 11:00:00 |        10 |      12 |     4
+ 11:00:00 |        10 |      10 |     1
+```
+
+Most queries will return results in the order that they were inserted,
+because the engine is using streaming algorithms, but you should not rely on it.
+For example, consider this:
+
+```sql
+SELECT STREAM *
+FROM Orders
+WHERE productId = 10
+UNION ALL
+SELECT STREAM *
+FROM Orders
+WHERE productId = 30;
+
+  rowtime | productId | orderId | units
+----------+-----------+---------+-------
+ 10:17:05 |        10 |       6 |     1
+ 10:17:00 |        30 |       5 |     4
+ 10:18:07 |        30 |       8 |    20
+ 11:02:00 |        10 |       9 |     6
+ 11:04:00 |        10 |      10 |     1
+ 11:24:11 |        10 |      12 |     4
+```
+
+The rows with `productId` = 30 are apparently out of order, probably because
+the `Orders` stream was partitioned on `productId` and the partitioned streams
+sent their data at different times.
+
+If you require a particular ordering, add an explicit `ORDER BY`:
+
+```sql
+SELECT STREAM *
+FROM Orders
+WHERE productId = 10
+UNION ALL
+SELECT STREAM *
+FROM Orders
+WHERE productId = 30
+ORDER BY rowtime;
+
+  rowtime | productId | orderId | units
+----------+-----------+---------+-------
+ 10:17:00 |        30 |       5 |     4
+ 10:17:05 |        10 |       6 |     1
+ 10:18:07 |        30 |       8 |    20
+ 11:02:00 |        10 |       9 |     6
+ 11:04:00 |        10 |      10 |     1
+ 11:24:11 |        10 |      12 |     4
+```
+
+Calcite will probably implement the `UNION ALL` by merging using `rowtime`,
+which is only slightly less efficient.
+
+You only need to add an `ORDER BY` to the outermost query. If you need to,
+say, perform `GROUP BY` after a `UNION ALL`, Calcite will add an `ORDER BY`
+implicitly, in order to make the GROUP BY algorithm possible.
+
+## Table constructor
+
+The `VALUES` clause creates an inline table with a given set of rows.
+
+Streaming is disallowed. The set of rows never changes, and therefore a stream
+would never return any rows.
+
+```sql
+> SELECT STREAM * FROM (VALUES (1, 'abc'));
+
+ERROR: Cannot stream VALUES
+```
+
+## State of the stream
+
+Not all concepts in this article have been implemented in Calcite.
+And others may be implemented in Calcite but not in a particular adapter
+such as Samza SQL [<a href="#ref3">3</a>].
+
+### Implemented
+* Streaming SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY
+* FLOOR and CEILING functions
+* Monotonicity
+* Streaming VALUES is disallowed
+
+### Not implemented
+* Stream-to-stream JOIN
+* Stream-to-table JOIN
+* Stream on view
+* Streaming UNION ALL with ORDER BY (merge)
+* Relational query on stream
+* Streaming windowed aggregation
+* Check that STREAM in sub-queries and views is ignored
+* Check that streaming ORDER BY cannot have OFFSET or LIMIT
+* Limited history; at run time, check that there is sufficient history
+  to run the query.
+
+## References
+
+* [<a name="ref1">1</a>]
+  <a href="http://ilpubs.stanford.edu:8090/758/">Arasu, Arvind and Babu,
+  Shivnath and Widom, Jennifer (2003) The CQL Continuous Query
+  Language: Semantic Foundations and Query Execution</a>.
+* [<a name="ref2">2</a>]
+  <a href="http://kafka.apache.org/documentation.html">Apache Kafka</a>.
+* [<a name="ref3">3</a>] <a href="http://samza.apache.org">Apache Samza</a>.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
index 95b43c1..db897d9 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/Linq4j.java
@@ -393,8 +393,9 @@ public abstract class Linq4j {
   /**
    * Returns whether the arguments are equal to each other.
    *
-   * <p>Equivalent to {@code Objects#equals} in JDK 1.7 and above.</p>
+   * <p>Equivalent to {@link java.util.Objects#equals} in JDK 1.7 and above.
    */
+  @Deprecated // to be removed before 2.0
   public static <T> boolean equals(T t0, T t1) {
     return t0 == t1 || t0 != null && t0.equals(t1);
   }
@@ -403,8 +404,10 @@ public abstract class Linq4j {
    * Throws {@link NullPointerException} if argument is null, otherwise
    * returns argument.
    *
-   * <p>Equivalent to {@code Objects#equireNonNull} in JDK 1.7 and above.</p>
+   * <p>Equivalent to {@link java.util.Objects#requireNonNull} in JDK 1.7 and
+   * above.
    */
+  @Deprecated // to be removed before 2.0
   public static <T> T requireNonNull(T o) {
     if (o == null) {
       throw new NullPointerException();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/linq4j/src/main/java/org/apache/calcite/linq4j/function/Functions.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/function/Functions.java b/linq4j/src/main/java/org/apache/calcite/linq4j/function/Functions.java
index 5d02cd2..129c9a2 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/function/Functions.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/function/Functions.java
@@ -16,8 +16,6 @@
  */
 package org.apache.calcite.linq4j.function;
 
-import org.apache.calcite.linq4j.Linq4j;
-
 import java.io.Serializable;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
@@ -29,6 +27,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Utilities relating to functions.
@@ -524,7 +523,7 @@ public abstract class Functions {
   private static class IdentityEqualityComparer
       implements EqualityComparer<Object> {
     public boolean equal(Object v1, Object v2) {
-      return Linq4j.equals(v1, v2);
+      return Objects.equals(v1, v2);
     }
 
     public int hashCode(Object t) {
@@ -545,7 +544,7 @@ public abstract class Functions {
       return v1 == v2
           || v1 != null
           && v2 != null
-          && Linq4j.equals(selector.apply(v1), selector.apply(v2));
+          && Objects.equals(selector.apply(v1), selector.apply(v2));
     }
 
     public int hashCode(T t) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Visitor.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Visitor.java b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Visitor.java
index ed4f86a..cf50786 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Visitor.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/Visitor.java
@@ -16,9 +16,8 @@
  */
 package org.apache.calcite.linq4j.tree;
 
-import org.apache.calcite.linq4j.Linq4j;
-
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Node visitor.
@@ -41,7 +40,7 @@ public class Visitor {
   }
 
   public Statement visit(ConditionalStatement conditionalStatement,
-                   List<Node> list) {
+      List<Node> list) {
     return list.equals(conditionalStatement.expressionList)
         ? conditionalStatement
         : Expressions.ifThenElse(list);
@@ -246,7 +245,7 @@ public class Visitor {
   public Expression visit(NewExpression newExpression,
       List<Expression> arguments, List<MemberDeclaration> memberDeclarations) {
     return arguments.equals(newExpression.arguments)
-        && Linq4j.equals(memberDeclarations, newExpression.memberDeclarations)
+        && Objects.equals(memberDeclarations, newExpression.memberDeclarations)
         ? newExpression
         : Expressions.new_(newExpression.type, arguments, memberDeclarations);
   }
@@ -314,8 +313,8 @@ public class Visitor {
 
   public ClassDeclaration visit(ClassDeclaration classDeclaration,
       List<MemberDeclaration> memberDeclarations) {
-    return Linq4j.equals(memberDeclarations,
-      classDeclaration.memberDeclarations)
+    return Objects.equals(memberDeclarations,
+        classDeclaration.memberDeclarations)
         ? classDeclaration
         : Expressions.classDecl(classDeclaration.modifier,
             classDeclaration.name, classDeclaration.extended,