You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/03/08 00:50:59 UTC

[4/4] incubator-calcite git commit: [CALCITE-602] Streaming queries

[CALCITE-602] Streaming queries

Validate and implement streaming queries: streaming scan, project, filter, aggregate, sort.

Implement CEIL and FLOOR functions for date-time and numeric values.

Add CompositeSingleOperandTypeChecker, and make CompositeOperandTypeChecker work for multiple operands.


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0ecd8702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0ecd8702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0ecd8702

Branch: refs/heads/master
Commit: 0ecd8702ab95fd59b7ef2182720d12b2167ae968
Parents: c50a6e0
Author: Julian Hyde <jh...@apache.org>
Authored: Sat Feb 21 18:10:32 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Mar 7 15:33:21 2015 -0800

----------------------------------------------------------------------
 README.md                                       |   1 +
 .../calcite/avatica/util/DateTimeUtils.java     |  69 +++
 core/src/main/codegen/templates/Parser.jj       |  79 ++-
 .../adapter/enumerable/EnumerableTableScan.java |   4 +-
 .../calcite/adapter/enumerable/RexImpTable.java |  77 ++-
 .../org/apache/calcite/jdbc/CalciteSchema.java  |   7 +-
 .../java/org/apache/calcite/model/JsonRoot.java |   1 +
 .../org/apache/calcite/model/JsonStream.java    |  33 ++
 .../org/apache/calcite/model/JsonTable.java     |   8 +-
 .../calcite/prepare/CalcitePrepareImpl.java     |  10 +
 .../apache/calcite/prepare/RelOptTableImpl.java |  45 ++
 .../java/org/apache/calcite/rel/core/Sort.java  |   2 +-
 .../java/org/apache/calcite/rel/stream/Chi.java |  38 ++
 .../org/apache/calcite/rel/stream/Delta.java    |  49 ++
 .../apache/calcite/rel/stream/LogicalChi.java   |  33 ++
 .../apache/calcite/rel/stream/LogicalDelta.java |  61 +++
 .../apache/calcite/rel/stream/StreamRules.java  | 198 +++++++
 .../apache/calcite/rel/stream/package-info.java |  35 ++
 .../apache/calcite/runtime/CalciteResource.java |  18 +
 .../apache/calcite/runtime/SqlFunctions.java    | 110 ++++
 .../java/org/apache/calcite/schema/Schema.java  |   3 +
 .../apache/calcite/schema/StreamableTable.java  |  32 ++
 .../java/org/apache/calcite/sql/SqlKind.java    |  12 +-
 .../apache/calcite/sql/SqlSelectKeyword.java    |   3 +-
 .../calcite/sql/advise/SqlAdvisorValidator.java |   6 +
 .../apache/calcite/sql/fun/SqlCeilFunction.java |  52 --
 .../sql/fun/SqlCollectionTableOperator.java     |  23 +-
 .../calcite/sql/fun/SqlExtractFunction.java     |   4 +-
 .../calcite/sql/fun/SqlFloorFunction.java       |  38 +-
 .../calcite/sql/fun/SqlStdOperatorTable.java    |  23 +-
 .../sql/type/CompositeOperandTypeChecker.java   | 173 ++----
 .../type/CompositeSingleOperandTypeChecker.java | 119 +++++
 .../apache/calcite/sql/type/OperandTypes.java   |  43 +-
 .../calcite/sql/validate/AbstractNamespace.java |   4 +
 .../sql/validate/IdentifierNamespace.java       |  10 +-
 .../calcite/sql/validate/SelectNamespace.java   |   4 +
 .../calcite/sql/validate/SetopNamespace.java    |  34 ++
 .../calcite/sql/validate/SqlModality.java       |  25 +
 .../calcite/sql/validate/SqlValidator.java      |  13 +
 .../calcite/sql/validate/SqlValidatorImpl.java  | 181 ++++++-
 .../sql/validate/SqlValidatorNamespace.java     |   7 +
 .../calcite/sql/validate/SqlValidatorTable.java |   2 +
 .../sql/validate/TableConstructorNamespace.java |   4 +
 .../calcite/sql/validate/TableNamespace.java    |   5 +
 .../sql2rel/RelStructuredTypeFlattener.java     |  10 +
 .../calcite/sql2rel/SqlToRelConverter.java      |   9 +
 .../org/apache/calcite/util/BuiltInMethod.java  |  10 +
 .../calcite/runtime/CalciteResource.properties  |   6 +
 .../calcite/sql/parser/SqlParserTest.java       |  27 +
 .../apache/calcite/sql/test/SqlAdvisorTest.java |   5 +-
 .../calcite/sql/test/SqlOperatorBaseTest.java   | 262 ++++------
 .../org/apache/calcite/test/CalciteSuite.java   |   1 +
 .../apache/calcite/test/MockCatalogReader.java  |  62 ++-
 .../apache/calcite/test/RelOptRulesTest.java    |   2 +-
 .../apache/calcite/test/SqlFunctionsTest.java   |  46 +-
 .../calcite/test/SqlToRelConverterTest.java     |  21 +
 .../apache/calcite/test/SqlValidatorTest.java   | 207 ++++++--
 .../org/apache/calcite/test/StreamTest.java     | 296 +++++++++++
 .../calcite/test/SqlToRelConverterTest.xml      |  46 +-
 doc/REFERENCE.md                                |   6 +-
 doc/STREAM.md                                   | 521 +++++++++++++++++++
 .../java/org/apache/calcite/linq4j/Linq4j.java  |   7 +-
 .../calcite/linq4j/function/Functions.java      |   7 +-
 .../org/apache/calcite/linq4j/tree/Visitor.java |  11 +-
 64 files changed, 2745 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 48d485e..40458ec 100644
--- a/README.md
+++ b/README.md
@@ -198,6 +198,7 @@ For more details, see the <a href="doc/REFERENCE.md">Reference guide</a>.
 * <a href="doc/HOWTO.md">HOWTO</a>
 * <a href="doc/MODEL.md">JSON model</a>
 * <a href="doc/REFERENCE.md">Reference guide</a>
+* <a href="doc/STREAM.md">Streaming SQL</a>
 * <a href="doc/HISTORY.md">Release notes and history</a>
 
 ### Pre-Apache resources

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
index 084c027..8acdf93 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -700,6 +700,66 @@ public class DateTimeUtils {
     }
   }
 
+  public static long unixTimestampFloor(TimeUnitRange range, long timestamp) {
+    int date = (int) (timestamp / MILLIS_PER_DAY);
+    final int f = julianDateFloor(range, date + EPOCH_JULIAN, true);
+    return (long) f * MILLIS_PER_DAY;
+  }
+
+  public static long unixDateFloor(TimeUnitRange range, long date) {
+    return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+  }
+
+  public static long unixTimestampCeil(TimeUnitRange range, long timestamp) {
+    int date = (int) (timestamp / MILLIS_PER_DAY);
+    final int f = julianDateFloor(range, date + EPOCH_JULIAN, false);
+    return (long) f * MILLIS_PER_DAY;
+  }
+
+  public static long unixDateCeil(TimeUnitRange range, long date) {
+    return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+  }
+
+  private static int julianDateFloor(TimeUnitRange range, int julian,
+      boolean floor) {
+    // this shifts the epoch back to astronomical year -4800 instead of the
+    // start of the Christian era in year AD 1 of the proleptic Gregorian
+    // calendar.
+    int j = julian + 32044;
+    int g = j / 146097;
+    int dg = j % 146097;
+    int c = (dg / 36524 + 1) * 3 / 4;
+    int dc = dg - c * 36524;
+    int b = dc / 1461;
+    int db = dc % 1461;
+    int a = (db / 365 + 1) * 3 / 4;
+    int da = db - a * 365;
+
+    // integer number of full years elapsed since March 1, 4801 BC
+    int y = g * 400 + c * 100 + b * 4 + a;
+    // integer number of full months elapsed since the last March 1
+    int m = (da * 5 + 308) / 153 - 2;
+    // number of days elapsed since day 1 of the month
+    int d = da - (m + 4) * 153 / 5 + 122;
+    int year = y - 4800 + (m + 2) / 12;
+    int month = (m + 2) % 12 + 1;
+    int day = d + 1;
+    switch (range) {
+    case YEAR:
+      if (!floor && (month > 1 || day > 1)) {
+        ++year;
+      }
+      return ymdToUnixDate(year, 1, 1);
+    case MONTH:
+      if (!floor && day > 1) {
+        ++month;
+      }
+      return ymdToUnixDate(year, month, 1);
+    default:
+      throw new AssertionError(range);
+    }
+  }
+
   public static int ymdToUnixDate(int year, int month, int day) {
     final int julian = ymdToJulian(year, month, day);
     return julian - EPOCH_JULIAN;
@@ -721,6 +781,15 @@ public class DateTimeUtils {
     return j;
   }
 
+  public static long unixTimestamp(int year, int month, int day, int hour,
+      int minute, int second) {
+    final int date = ymdToUnixDate(year, month, day);
+    return (long) date * MILLIS_PER_DAY
+        + (long) hour * MILLIS_PER_HOUR
+        + (long) minute * MILLIS_PER_MINUTE
+        + (long) second * MILLIS_PER_SECOND;
+  }
+
   //~ Inner Classes ----------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/codegen/templates/Parser.jj
----------------------------------------------------------------------
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index cc9a555..6b62ce3 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -91,6 +91,7 @@ import org.apache.calcite.sql.parser.SqlParserImplFactory;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.parser.SqlParserUtil;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Util;
 import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -304,12 +305,12 @@ SqlNode ExtendedBuiltinFunctionCall() :
 /*
 * Parse Floor/Ceil function parameters
 */
-SqlNode FloorCeilOptions( SqlParserPos pos, boolean floorFlag) :
+SqlNode FloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
 {
     SqlNode node;
 }
 {
-    node = StandardFloorCeilOptions( pos, floorFlag)
+    node = StandardFloorCeilOptions(pos, floorFlag)
     {
         return node;
     }
@@ -943,14 +944,18 @@ SqlSelect SqlSelect() :
     }
     SqlSelectKeywords(keywords)
     (
+        <STREAM> {
+            keywords.add(SqlSelectKeyword.STREAM.symbol(getPos()));
+        }
+    )?
+    (
         <DISTINCT> {
             keywords.add(SqlSelectKeyword.DISTINCT.symbol(getPos()));
         }
         |   <ALL> {
             keywords.add(SqlSelectKeyword.ALL.symbol(getPos()));
         }
-        |   E()
-    )
+    )?
     selectList = SelectList()
     <FROM>
     fromClause = FromClause()
@@ -1958,7 +1963,7 @@ SqlNodeList GroupByOpt() :
 {
     <GROUP> { pos = getPos(); }
     <BY> list = GroupingElementList() {
-        return new SqlNodeList(list, pos);
+        return new SqlNodeList(list, pos.plusAll(list));
     }
 |
     {
@@ -2222,19 +2227,16 @@ SqlNodeList OrderBy(boolean accept) :
     SqlParserPos pos;
 }
 {
-    <ORDER>
-    {
+    <ORDER> {
+        pos = getPos();
         if (!accept) {
             // Someone told us ORDER BY wasn't allowed here.  So why
             // did they bother calling us?  To get the correct
             // parser position for error reporting.
-            throw SqlUtil.newContextException(getPos(),
-                RESOURCE.illegalOrderBy());
+            throw SqlUtil.newContextException(pos, RESOURCE.illegalOrderBy());
         }
     }
-    <BY> e = OrderItem()
-    {
-        pos = getPos();
+    <BY> e = OrderItem() {
         list = startList(e);
     }
     (
@@ -2243,7 +2245,7 @@ SqlNodeList OrderBy(boolean accept) :
         LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
     ) *
     {
-        return new SqlNodeList(list, pos);
+        return new SqlNodeList(list, pos.plusAll(list));
     }
 }
 
@@ -2253,29 +2255,22 @@ SqlNodeList OrderBy(boolean accept) :
 SqlNode OrderItem() :
 {
     SqlNode e;
-    SqlParserPos pos;
 }
 {
     e = Expression(ExprContext.ACCEPT_SUBQUERY)
     (
         <ASC>
-        | <DESC>
-        {
-            pos = getPos();
-            e = SqlStdOperatorTable.DESC.createCall(pos, e);
+    |   <DESC> {
+            e = SqlStdOperatorTable.DESC.createCall(getPos(), e);
         }
     )?
     (
-        <NULLS> <FIRST>
-        {
-            pos = getPos();
-            e = SqlStdOperatorTable.NULLS_FIRST.createCall(pos, e);
+        <NULLS> <FIRST> {
+            e = SqlStdOperatorTable.NULLS_FIRST.createCall(getPos(), e);
         }
     |
-        <NULLS> <LAST>
-        {
-            pos = getPos();
-            e = SqlStdOperatorTable.NULLS_LAST.createCall(pos, e);
+        <NULLS> <LAST> {
+            e = SqlStdOperatorTable.NULLS_LAST.createCall(getPos(), e);
         }
     )?
     {
@@ -3864,7 +3859,7 @@ SqlNode BuiltinFunctionCall() :
         }
         <LPAREN>
         unit = TimeUnit()
-        { args = startList(new SqlIntervalQualifier(unit, null,getPos())); }
+        { args = startList(new SqlIntervalQualifier(unit, null, getPos())); }
         <FROM>
         e = Expression(ExprContext.ACCEPT_SUBQUERY) { args.add(e); }
         <RPAREN>
@@ -4183,13 +4178,22 @@ SqlNode StandardFloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
     SqlIdentifier name;
     SqlParserPos overPos = null;
     SqlIdentifier id = null;
-    SqlNode e = null;
-    SqlCall function = null;
-    SqlNodeList args;
+    SqlNode e;
+    List<SqlNode> args;
+    TimeUnit unit;
     boolean over = false;
 }
 {
-    args = ParenthesizedQueryOrCommaList(ExprContext.ACCEPT_SUBQUERY)
+    <LPAREN> e = Expression(ExprContext.ACCEPT_SUBQUERY) {
+        args = startList(e);
+    }
+    (
+        <TO>
+        unit = TimeUnit() {
+            args.add(new SqlIntervalQualifier(unit, null, getPos()));
+        }
+    )?
+    <RPAREN>
     [
         <OVER>
         {
@@ -4205,16 +4209,10 @@ SqlNode StandardFloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
         SqlOperator op = floorFlag
             ? SqlStdOperatorTable.FLOOR
             : SqlStdOperatorTable.CEIL;
-        function =  op.createCall(
-            pos, args.toArray());
+        final SqlCall function =  op.createCall(pos.plus(getPos()), args);
         if (over) {
-            if (id != null) {
-                return SqlStdOperatorTable.OVER.createCall(
-                    overPos, new SqlNode[] {function, id});
-            } else {
-                return SqlStdOperatorTable.OVER.createCall(
-                    overPos, new SqlNode[] { function, e });
-            }
+            return SqlStdOperatorTable.OVER.createCall(overPos, function,
+                Util.first(id, e));
         } else {
             return function;
         }
@@ -4981,6 +4979,7 @@ SqlPostfixOperator PostfixRowOperator() :
     | < STATIC: "STATIC" >
     | < STDDEV_POP: "STDDEV_POP" >
     | < STDDEV_SAMP: "STDDEV_SAMP" >
+    | < STREAM: "STREAM" >
     | < STRUCTURE: "STRUCTURE" >
     | < STYLE: "STYLE" >
     | < SUBCLASS_ORIGIN: "SUBCLASS_ORIGIN" >

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
index ddd3212..ad67052 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
@@ -37,6 +37,7 @@ import org.apache.calcite.schema.FilterableTable;
 import org.apache.calcite.schema.ProjectableFilterableTable;
 import org.apache.calcite.schema.QueryableTable;
 import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.BuiltInMethod;
 
@@ -104,7 +105,8 @@ public class EnumerableTableScan
       }
     } else if (table instanceof ScannableTable
         || table instanceof FilterableTable
-        || table instanceof ProjectableFilterableTable) {
+        || table instanceof ProjectableFilterableTable
+        || table instanceof StreamableTable) {
       return Object[].class;
     } else {
       return Object.class;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index e1fd9b0..29c107f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -17,6 +17,8 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.BlockStatement;
@@ -236,14 +238,20 @@ public class RexImpTable {
     defineMethod(LN, "ln", NullPolicy.STRICT);
     defineMethod(LOG10, "log10", NullPolicy.STRICT);
     defineMethod(ABS, "abs", NullPolicy.STRICT);
-    defineMethod(CEIL, "ceil", NullPolicy.STRICT);
-    defineMethod(FLOOR, "floor", NullPolicy.STRICT);
 
     // datetime
     defineImplementor(DATETIME_PLUS, NullPolicy.STRICT,
         new DatetimeArithmeticImplementor(), false);
     defineMethod(EXTRACT_DATE, BuiltInMethod.UNIX_DATE_EXTRACT.method,
         NullPolicy.STRICT);
+    defineImplementor(FLOOR, NullPolicy.STRICT,
+        new FloorImplementor(BuiltInMethod.FLOOR.method.getName(),
+            BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method,
+            BuiltInMethod.UNIX_DATE_FLOOR.method), false);
+    defineImplementor(CEIL, NullPolicy.STRICT,
+        new FloorImplementor(BuiltInMethod.CEIL.method.getName(),
+            BuiltInMethod.UNIX_TIMESTAMP_CEIL.method,
+            BuiltInMethod.UNIX_DATE_CEIL.method), false);
 
     map.put(IS_NULL, new IsXxxImplementor(null, false));
     map.put(IS_NOT_NULL, new IsXxxImplementor(null, true));
@@ -1402,9 +1410,70 @@ public class RexImpTable {
     }
   }
 
+  /** Implementor for the {@code FLOOR} and {@code CEIL} functions. */
+  private static class FloorImplementor extends MethodNameImplementor {
+    final Method timestampMethod;
+    final Method dateMethod;
+
+    FloorImplementor(String methodName, Method timestampMethod,
+        Method dateMethod) {
+      super(methodName);
+      this.timestampMethod = timestampMethod;
+      this.dateMethod = dateMethod;
+    }
+
+    public Expression implement(RexToLixTranslator translator, RexCall call,
+        List<Expression> translatedOperands) {
+      switch (call.getOperands().size()) {
+      case 1:
+        switch (call.getType().getSqlTypeName()) {
+        case BIGINT:
+        case INTEGER:
+        case SMALLINT:
+        case TINYINT:
+          return translatedOperands.get(0);
+        }
+        return super.implement(translator, call, translatedOperands);
+      case 2:
+        final Type type;
+        final Method floorMethod;
+        switch (call.getType().getSqlTypeName()) {
+        case TIMESTAMP:
+          type = long.class;
+          floorMethod = timestampMethod;
+          break;
+        default:
+          type = int.class;
+          floorMethod = dateMethod;
+        }
+        final ConstantExpression tur =
+            (ConstantExpression) translatedOperands.get(1);
+        final TimeUnitRange timeUnitRange = (TimeUnitRange) tur.value;
+        switch (timeUnitRange) {
+        case YEAR:
+        case MONTH:
+          return Expressions.call(floorMethod, tur,
+              call(translatedOperands, type, TimeUnit.DAY));
+        default:
+          return call(translatedOperands, type, timeUnitRange.startUnit);
+        }
+      default:
+        throw new AssertionError();
+      }
+    }
+
+    private Expression call(List<Expression> translatedOperands, Type type,
+        TimeUnit timeUnit) {
+      return Expressions.call(SqlFunctions.class, methodName,
+          Types.castIfNecessary(type, translatedOperands.get(0)),
+          Types.castIfNecessary(type,
+              Expressions.constant(timeUnit.multiplier)));
+    }
+  }
+
   /** Implementor for a function that generates calls to a given method. */
   private static class MethodImplementor implements NotNullImplementor {
-    private final Method method;
+    protected final Method method;
 
     MethodImplementor(Method method) {
       this.method = method;
@@ -1428,7 +1497,7 @@ public class RexImpTable {
    * <p>Use this, as opposed to {@link MethodImplementor}, if the SQL function
    * is overloaded; then you can use one implementor for several overloads. */
   private static class MethodNameImplementor implements NotNullImplementor {
-    private final String methodName;
+    protected final String methodName;
 
     MethodNameImplementor(String methodName) {
       this.methodName = methodName;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
index 4b34c3e..346b721 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
@@ -16,7 +16,6 @@
  */
 package org.apache.calcite.jdbc;
 
-import org.apache.calcite.linq4j.Linq4j;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.materialize.Lattice;
 import org.apache.calcite.schema.Function;
@@ -522,10 +521,8 @@ public class CalciteSchema {
     public final String name;
 
     public Entry(CalciteSchema schema, String name) {
-      Linq4j.requireNonNull(schema);
-      Linq4j.requireNonNull(name);
-      this.schema = schema;
-      this.name = name;
+      this.schema = Preconditions.checkNotNull(schema);
+      this.name = Preconditions.checkNotNull(name);
     }
 
     /** Returns this object's path. For example ["hr", "emps"]. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonRoot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonRoot.java b/core/src/main/java/org/apache/calcite/model/JsonRoot.java
index 25e1fa3..ba73e5b 100644
--- a/core/src/main/java/org/apache/calcite/model/JsonRoot.java
+++ b/core/src/main/java/org/apache/calcite/model/JsonRoot.java
@@ -32,6 +32,7 @@ import java.util.List;
  *   {@link JsonSchema} (in collection {@link JsonRoot#schemas schemas})
  *     {@link JsonTable} (in collection {@link JsonMapSchema#tables tables})
  *       {@link JsonColumn} (in collection {@link JsonTable#columns columns}
+ *       {@link JsonStream} (in field {@link JsonTable#stream stream}
  *     {@link JsonView}
  *     {@link JsonFunction} (in collection {@link JsonMapSchema#functions functions})
  *     {@link JsonLattice} (in collection {@link JsonSchema#lattices lattices})

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonStream.java b/core/src/main/java/org/apache/calcite/model/JsonStream.java
new file mode 100644
index 0000000..f8d728a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/model/JsonStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.model;
+
+/**
+ * Information about whether a table allows streaming.
+ *
+ * @see org.apache.calcite.model.JsonRoot Description of schema elements
+ * @see org.apache.calcite.model.JsonTable#stream
+ */
+public class JsonStream {
+  /** Whether the table allows streaming. */
+  public boolean stream = true;
+
+  /** Whether the history of the table is available. */
+  public boolean history = false;
+}
+
+// End JsonStream.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonTable.java b/core/src/main/java/org/apache/calcite/model/JsonTable.java
index c0e7d8d..806405b 100644
--- a/core/src/main/java/org/apache/calcite/model/JsonTable.java
+++ b/core/src/main/java/org/apache/calcite/model/JsonTable.java
@@ -18,8 +18,8 @@ package org.apache.calcite.model;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.collect.Lists;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -36,7 +36,11 @@ import java.util.List;
     @JsonSubTypes.Type(value = JsonView.class, name = "view") })
 public abstract class JsonTable {
   public String name;
-  public final List<JsonColumn> columns = new ArrayList<JsonColumn>();
+  public final List<JsonColumn> columns = Lists.newArrayList();
+
+  /** Information about whether the table can be streamed, and if so, whether
+   * the history of the table is also available. */
+  public JsonStream stream;
 
   public abstract void accept(ModelHandler handler);
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
index 2f884c1..e655e67 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.rel.rules.ReduceExpressionsRule;
 import org.apache.calcite.rel.rules.SortProjectTransposeRule;
 import org.apache.calcite.rel.rules.TableScanRule;
 import org.apache.calcite.rel.rules.ValuesReduceRule;
+import org.apache.calcite.rel.stream.StreamRules;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
@@ -154,6 +155,9 @@ public class CalcitePrepareImpl implements CalcitePrepare {
   /** Whether the enumerable convention is enabled. */
   public static final boolean ENABLE_ENUMERABLE = true;
 
+  /** Whether the streaming is enabled. */
+  public static final boolean ENABLE_STREAM = true;
+
   private static final Set<String> SIMPLE_SQLS =
       ImmutableSet.of(
           "SELECT 1",
@@ -338,6 +342,12 @@ public class CalcitePrepareImpl implements CalcitePrepare {
           EnumerableBindable.EnumerableToBindableConverterRule.INSTANCE);
     }
 
+    if (ENABLE_STREAM) {
+      for (RelOptRule rule : StreamRules.RULES) {
+        planner.addRule(rule);
+      }
+    }
+
     // Change the below to enable constant-reduction.
     if (false) {
       for (RelOptRule rule : CONSTANT_REDUCTION_RULES) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
index e47ea5b..8925ede 100644
--- a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
@@ -35,9 +36,11 @@ import org.apache.calcite.schema.ProjectableFilterableTable;
 import org.apache.calcite.schema.QueryableTable;
 import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.sql.SqlAccessType;
+import org.apache.calcite.sql.validate.SqlModality;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
@@ -125,6 +128,9 @@ public class RelOptTableImpl implements Prepare.PreparingTable {
               table.getClass());
         }
       };
+    } else if (table instanceof StreamableTable) {
+      return getClassExpressionFunction(tableEntry,
+          ((StreamableTable) table).stream());
     } else {
       return new Function<Class, Expression>() {
         public Expression apply(Class input) {
@@ -243,14 +249,53 @@ public class RelOptTableImpl implements Prepare.PreparingTable {
     return rowType;
   }
 
+  public boolean supportsModality(SqlModality modality) {
+    switch (modality) {
+    case STREAM:
+      return table instanceof StreamableTable;
+    default:
+      return !(table instanceof StreamableTable);
+    }
+  }
+
   public List<String> getQualifiedName() {
     return names;
   }
 
   public SqlMonotonicity getMonotonicity(String columnName) {
+    final int i = rowType.getFieldNames().indexOf(columnName);
+    if (i >= 0) {
+      for (RelCollation collation : table.getStatistic().getCollations()) {
+        final RelFieldCollation fieldCollation =
+            collation.getFieldCollations().get(0);
+        if (fieldCollation.getFieldIndex() == i) {
+          return monotonicity(fieldCollation.direction);
+        }
+      }
+    }
     return SqlMonotonicity.NOT_MONOTONIC;
   }
 
+  /** Converts a {@link org.apache.calcite.rel.RelFieldCollation.Direction}
+   * value to a {@link org.apache.calcite.sql.validate.SqlMonotonicity}. */
+  private static SqlMonotonicity
+  monotonicity(RelFieldCollation.Direction direction) {
+    switch (direction) {
+    case ASCENDING:
+      return SqlMonotonicity.INCREASING;
+    case STRICTLY_ASCENDING:
+      return SqlMonotonicity.STRICTLY_INCREASING;
+    case DESCENDING:
+      return SqlMonotonicity.DECREASING;
+    case STRICTLY_DESCENDING:
+      return SqlMonotonicity.STRICTLY_DECREASING;
+    case CLUSTERED:
+      return SqlMonotonicity.MONOTONIC;
+    default:
+      throw new AssertionError("unknown: " + direction);
+    }
+  }
+
   public SqlAccessType getAllowedAccess() {
     return SqlAccessType.ALL;
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/core/Sort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Sort.java b/core/src/main/java/org/apache/calcite/rel/core/Sort.java
index 06f9299..022479b 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Sort.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Sort.java
@@ -45,7 +45,7 @@ import java.util.List;
 public abstract class Sort extends SingleRel {
   //~ Instance fields --------------------------------------------------------
 
-  protected final RelCollation collation;
+  public final RelCollation collation;
   protected final ImmutableList<RexNode> fieldExps;
   public final RexNode offset;
   public final RexNode fetch;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/Chi.java b/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
new file mode 100644
index 0000000..36e03c0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+
+/**
+ * Relational operator that converts a stream to a relation.
+ *
+ * <p>Chi is named for the Greek letter &chi; and pronounced 'kai'.
+ *
+ * <p>Chi is the inverse of {@link Delta}. For any relation {@code R},
+ * Chi(Delta(R)) == R.
+ */
+public class Chi extends SingleRel {
+  protected Chi(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    super(cluster, traits, input);
+  }
+}
+
+// End Chi.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/Delta.java b/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
new file mode 100644
index 0000000..ce1417c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.TableScan;
+
+/**
+ * Relational operator that converts a relation to a stream.
+ *
+ * <p>For example, if {@code Orders} is a table, and {@link TableScan}(Orders)
+ * is a relational operator that returns the current contents of the table,
+ * then {@link Delta}(TableScan(Orders)) is a relational operator that returns
+ * all inserts into the table.
+ *
+ * <p>If unrestricted, Delta returns all previous inserts into the table (from
+ * time -&infin; to now) and all future inserts into the table (from now
+ * to +&infin;) and never terminates.
+ */
+public abstract class Delta extends SingleRel {
+  protected Delta(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    super(cluster, traits, input);
+  }
+
+  /** Creates a Delta by parsing serialized output. */
+  protected Delta(RelInput input) {
+    this(input.getCluster(), input.getTraitSet(), input.getInput());
+  }
+}
+
+// End Delta.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java b/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
new file mode 100644
index 0000000..fad29e7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Sub-class of {@link Chi}
+ * not targeted at any particular engine or calling convention.
+ */
+public final class LogicalChi extends Chi {
+  public LogicalChi(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    super(cluster, traits, input);
+  }
+}
+
+// End LogicalChi.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java b/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
new file mode 100644
index 0000000..05de63b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.List;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.stream.Delta}
+ * not targeted at any particular engine or calling convention.
+ */
+public final class LogicalDelta extends Delta {
+  /**
+   * Creates a LogicalDelta.
+   *
+   * <p>Use {@link #create} unless you know what you're doing.
+   *
+   * @param cluster   Cluster that this relational expression belongs to
+   * @param input     Input relational expression
+   */
+  public LogicalDelta(RelOptCluster cluster, RelTraitSet traits,
+      RelNode input) {
+    super(cluster, traits, input);
+  }
+
+  /** Creates a LogicalDelta by parsing serialized output. */
+  public LogicalDelta(RelInput input) {
+    super(input);
+  }
+
+  /** Creates a LogicalDelta. */
+  public static LogicalDelta create(RelNode input) {
+    final RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE);
+    return new LogicalDelta(input.getCluster(), traitSet, input);
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new LogicalDelta(getCluster(), traitSet, sole(inputs));
+  }
+}
+
+// End LogicalDelta.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
new file mode 100644
index 0000000..28b9972
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Rules and relational operators for streaming relational expressions.
+ */
+public class StreamRules {
+  private StreamRules() {}
+
+  public static final ImmutableList<RelOptRule> RULES =
+      ImmutableList.of(
+          new DeltaProjectTransposeRule(),
+          new DeltaFilterTransposeRule(),
+          new DeltaAggregateTransposeRule(),
+          new DeltaSortTransposeRule(),
+          new DeltaUnionTransposeRule(),
+          new DeltaTableScanRule());
+
+  /** Planner rule that pushes a {@link Delta} through a {@link Project}. */
+  public static class DeltaProjectTransposeRule extends RelOptRule {
+    private DeltaProjectTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Project.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Project project = call.rel(1);
+      final LogicalDelta newDelta = LogicalDelta.create(project.getInput());
+      final LogicalProject newProject =
+          LogicalProject.create(newDelta, project.getProjects(),
+              project.getRowType().getFieldNames());
+      call.transformTo(newProject);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through a {@link Filter}. */
+  public static class DeltaFilterTransposeRule extends RelOptRule {
+    private DeltaFilterTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Filter.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Filter filter = call.rel(1);
+      final LogicalDelta newDelta = LogicalDelta.create(filter.getInput());
+      final LogicalFilter newFilter =
+          LogicalFilter.create(newDelta, filter.getCondition());
+      call.transformTo(newFilter);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through an {@link Aggregate}. */
+  public static class DeltaAggregateTransposeRule extends RelOptRule {
+    private DeltaAggregateTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Aggregate.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Aggregate aggregate = call.rel(1);
+      final LogicalDelta newDelta =
+          LogicalDelta.create(aggregate.getInput());
+      final LogicalAggregate newAggregate =
+          LogicalAggregate.create(newDelta, aggregate.indicator,
+              aggregate.getGroupSet(), aggregate.groupSets,
+              aggregate.getAggCallList());
+      call.transformTo(newAggregate);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through an {@link Sort}. */
+  public static class DeltaSortTransposeRule extends RelOptRule {
+    private DeltaSortTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Sort.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Sort sort = call.rel(1);
+      final LogicalDelta newDelta =
+          LogicalDelta.create(sort.getInput());
+      final LogicalSort newSort =
+          LogicalSort.create(newDelta, sort.collation, sort.offset, sort.fetch);
+      call.transformTo(newSort);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} through an {@link Union}. */
+  public static class DeltaUnionTransposeRule extends RelOptRule {
+    private DeltaUnionTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Union.class, any())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      Util.discard(delta);
+      final Union union = call.rel(1);
+      final List<RelNode> newInputs = Lists.newArrayList();
+      for (RelNode input : union.getInputs()) {
+        final LogicalDelta newDelta =
+            LogicalDelta.create(input);
+        newInputs.add(newDelta);
+      }
+      final LogicalUnion newUnion = LogicalUnion.create(newInputs, union.all);
+      call.transformTo(newUnion);
+    }
+  }
+
+  /** Planner rule that pushes a {@link Delta} into a {@link TableScan} of a
+   * {@link org.apache.calcite.schema.StreamableTable}.
+   *
+   * <p>Very likely, the stream was only represented as a table for uniformity
+   * with the other relations in the system. The Delta disappears and the stream
+   * can be implemented directly. */
+  public static class DeltaTableScanRule extends RelOptRule {
+    private DeltaTableScanRule() {
+      super(
+          operand(Delta.class,
+              operand(TableScan.class, none())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final TableScan scan = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      final RelOptTable relOptTable = scan.getTable();
+      final StreamableTable streamableTable =
+          relOptTable.unwrap(StreamableTable.class);
+      if (streamableTable != null) {
+        final Table table1 = streamableTable.stream();
+        final RelOptTable relOptTable2 =
+            RelOptTableImpl.create(relOptTable.getRelOptSchema(),
+                relOptTable.getRowType(), table1);
+        final LogicalTableScan newScan =
+            LogicalTableScan.create(cluster, relOptTable2);
+        call.transformTo(newScan);
+      }
+    }
+  }
+}
+
+// End StreamRules.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/package-info.java b/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
new file mode 100644
index 0000000..12c680a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines relational expressions for streaming.
+ *
+ * <h2>Related packages and classes</h2>
+ * <ul>
+ *
+ * <li>Package <code>
+ * <a href="../core/package-summary.html">org.apache.calcite.rel.core</a></code>
+ * contains core relational expressions
+ *
+ * </ul>
+ */
+@PackageMarker
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 842ed7d..245d00a 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -549,6 +549,24 @@ public interface CalciteResource {
 
   @BaseMessage("FilterableTable.scan must not return null")
   ExInst<CalciteException> filterableTableScanReturnedNull();
+
+  @BaseMessage("Cannot convert table ''{0}'' to stream")
+  ExInst<SqlValidatorException> cannotConvertToStream(String tableName);
+
+  @BaseMessage("Cannot convert stream ''{0}'' to relation")
+  ExInst<SqlValidatorException> cannotConvertToRelation(String tableName);
+
+  @BaseMessage("Streaming aggregation requires at least one monotonic expression in GROUP BY clause")
+  ExInst<SqlValidatorException> streamMustGroupByMonotonic();
+
+  @BaseMessage("Streaming ORDER BY must start with monotonic expression")
+  ExInst<SqlValidatorException> streamMustOrderByMonotonic();
+
+  @BaseMessage("Set operator cannot combine streaming and non-streaming inputs")
+  ExInst<SqlValidatorException> streamSetOpInconsistentInputs();
+
+  @BaseMessage("Cannot stream VALUES")
+  ExInst<SqlValidatorException> cannotStreamValues();
 }
 
 // End CalciteResource.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
index fabdf09..77e6827 100644
--- a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
+++ b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
@@ -687,6 +687,116 @@ public class SqlFunctions {
     return bigDecimals[1];
   }
 
+  // FLOOR
+
+  public static double floor(double b0) {
+    return Math.floor(b0);
+  }
+
+  public static BigDecimal floor(BigDecimal b0) {
+    return b0.setScale(0, BigDecimal.ROUND_FLOOR);
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to byte values. */
+  public static byte floor(byte b0, byte b1) {
+    return (byte) floor((int) b0, (int) b1);
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to short values. */
+  public static short floor(short b0, short b1) {
+    return (short) floor((int) b0, (int) b1);
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to int values. */
+  public static int floor(int b0, int b1) {
+    int r = b0 % b1;
+    if (r < 0) {
+      r += b1;
+    }
+    return b0 - r;
+  }
+
+  /** SQL <code>FLOOR</code> operator applied to long values. */
+  public static long floor(long b0, long b1) {
+    long r = b0 % b1;
+    if (r < 0) {
+      r += b1;
+    }
+    return b0 - r;
+  }
+
+  // temporary
+  public static BigDecimal floor(BigDecimal b0, int b1) {
+    return floor(b0, BigDecimal.valueOf(b1));
+  }
+
+  // temporary
+  public static int floor(int b0, BigDecimal b1) {
+    return floor(b0, b1.intValue());
+  }
+
+  public static BigDecimal floor(BigDecimal b0, BigDecimal b1) {
+    final BigDecimal[] bigDecimals = b0.divideAndRemainder(b1);
+    BigDecimal r = bigDecimals[1];
+    if (r.signum() < 0) {
+      r = r.add(b1);
+    }
+    return b0.subtract(r);
+  }
+
+  // CEIL
+
+  public static double ceil(double b0) {
+    return Math.ceil(b0);
+  }
+
+  public static BigDecimal ceil(BigDecimal b0) {
+    return b0.setScale(0, BigDecimal.ROUND_CEILING);
+  }
+
+  /** SQL <code>CEIL</code> operator applied to byte values. */
+  public static byte ceil(byte b0, byte b1) {
+    return floor((byte) (b0 + b1 - 1), b1);
+  }
+
+  /** SQL <code>CEIL</code> operator applied to short values. */
+  public static short ceil(short b0, short b1) {
+    return floor((short) (b0 + b1 - 1), b1);
+  }
+
+  /** SQL <code>CEIL</code> operator applied to int values. */
+  public static int ceil(int b0, int b1) {
+    int r = b0 % b1;
+    if (r > 0) {
+      r -= b1;
+    }
+    return b0 - r;
+  }
+
+  /** SQL <code>CEIL</code> operator applied to long values. */
+  public static long ceil(long b0, long b1) {
+    return floor(b0 + b1 - 1, b1);
+  }
+
+  // temporary
+  public static BigDecimal ceil(BigDecimal b0, int b1) {
+    return ceil(b0, BigDecimal.valueOf(b1));
+  }
+
+  // temporary
+  public static int ceil(int b0, BigDecimal b1) {
+    return ceil(b0, b1.intValue());
+  }
+
+  public static BigDecimal ceil(BigDecimal b0, BigDecimal b1) {
+    final BigDecimal[] bigDecimals = b0.divideAndRemainder(b1);
+    BigDecimal r = bigDecimals[1];
+    if (r.signum() > 0) {
+      r = r.subtract(b1);
+    }
+    return b0.subtract(r);
+  }
+
   // ABS
 
   /** SQL <code>ABS</code> operator applied to byte values. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/schema/Schema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/Schema.java b/core/src/main/java/org/apache/calcite/schema/Schema.java
index ecda690..065ec5d 100644
--- a/core/src/main/java/org/apache/calcite/schema/Schema.java
+++ b/core/src/main/java/org/apache/calcite/schema/Schema.java
@@ -181,6 +181,9 @@ public interface Schema {
      * <p>Used by Apache Phoenix, and others. Must have a single BIGINT column
      * called "$seq". */
     SEQUENCE,
+
+    /** Stream. */
+    STREAM,
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/StreamableTable.java b/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
new file mode 100644
index 0000000..a9dcdf8
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schema;
+
+import org.apache.calcite.rel.stream.Delta;
+
+/**
+ * Table that can be converted to a stream.
+ *
+ * @see Delta
+ */
+public interface StreamableTable extends Table {
+  /** Returns an enumerator over the rows in this Table. Each row is represented
+   * as an array of its column values. */
+  Table stream();
+}
+
+// End StreamableTable.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/SqlKind.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlKind.java b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
index 2f3cf26..5fa715e 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlKind.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
@@ -493,6 +493,16 @@ public enum SqlKind {
   CURRENT_VALUE,
 
   /**
+   * The "FLOOR" function
+   */
+  FLOOR,
+
+  /**
+   * The "CEIL" function
+   */
+  CEIL,
+
+  /**
    * The "TRIM" function.
    */
   TRIM,
@@ -639,7 +649,7 @@ public enum SqlKind {
       EnumSet.complementOf(
           EnumSet.of(
               AS, DESCENDING, CUBE, ROLLUP, GROUPING_SETS, EXTEND,
-              SELECT, JOIN, OTHER_FUNCTION, CAST, TRIM,
+              SELECT, JOIN, OTHER_FUNCTION, CAST, TRIM, FLOOR, CEIL,
               LITERAL_CHAIN, JDBC_FN, PRECEDING, FOLLOWING, ORDER_BY,
               NULLS_FIRST, NULLS_LAST, COLLECTION_TABLE, TABLESAMPLE,
               WITH, WITH_ITEM));

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java b/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
index c016289..1ca4158 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
@@ -23,7 +23,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
  */
 public enum SqlSelectKeyword implements SqlLiteral.SqlSymbol {
   DISTINCT,
-  ALL;
+  ALL,
+  STREAM;
 
   /**
    * Creates a parse-tree node representing an occurrence of this keyword

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java b/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
index b8ddaf4..a7228af 100644
--- a/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
+++ b/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.sql.validate.OverScope;
 import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlModality;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -194,6 +195,11 @@ public class SqlAdvisorValidator extends SqlValidatorImpl {
     }
   }
 
+  @Override public boolean validateModality(SqlSelect select,
+      SqlModality modality, boolean fail) {
+    return true;
+  }
+
   protected boolean shouldAllowOverRelation() {
     return true; // no reason not to be lenient
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
deleted file mode 100644
index a984c94..0000000
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-import org.apache.calcite.sql.validate.SqlValidatorScope;
-
-/**
- * Support for the CEIL/CEILING builtin function.
- */
-public class SqlCeilFunction extends SqlFunction {
-  //~ Constructors -----------------------------------------------------------
-
-  public SqlCeilFunction() {
-    super(
-        "CEIL",
-        SqlKind.OTHER_FUNCTION,
-        ReturnTypes.ARG0,
-        null,
-        OperandTypes.NUMERIC,
-        SqlFunctionCategory.NUMERIC);
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  public SqlMonotonicity getMonotonicity(SqlCall call,
-      SqlValidatorScope scope) {
-    return scope.getMonotonicity(call.operand(0));
-  }
-}
-
-// End SqlCeilFunction.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
index a8b8535..385cfb5 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
@@ -20,6 +20,7 @@ import org.apache.calcite.sql.SqlFunctionalOperator;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.validate.SqlModality;
 
 /**
  * SqlCollectionTableOperator is the "table function derived table" operator. It
@@ -30,33 +31,19 @@ import org.apache.calcite.sql.type.ReturnTypes;
  * {@link SqlStdOperatorTable#EXPLICIT_TABLE} is a prefix operator.
  */
 public class SqlCollectionTableOperator extends SqlFunctionalOperator {
-  //~ Static fields/initializers ---------------------------------------------
-
-  public static final int MODALITY_RELATIONAL = 1;
-  public static final int MODALITY_STREAM = 2;
-
-  //~ Instance fields --------------------------------------------------------
-
-  private final int modality;
+  private final SqlModality modality;
 
   //~ Constructors -----------------------------------------------------------
 
-  public SqlCollectionTableOperator(String name, int modality) {
-    super(
-        name,
-        SqlKind.COLLECTION_TABLE,
-        200,
-        true,
-        ReturnTypes.ARG0,
-        null,
+  public SqlCollectionTableOperator(String name, SqlModality modality) {
+    super(name, SqlKind.COLLECTION_TABLE, 200, true, ReturnTypes.ARG0, null,
         OperandTypes.ANY);
-
     this.modality = modality;
   }
 
   //~ Methods ----------------------------------------------------------------
 
-  public int getModality() {
+  public SqlModality getModality() {
     return modality;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
index 7e43262..1e870c3 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
@@ -59,9 +59,9 @@ public class SqlExtractFunction extends SqlFunction {
       int leftPrec,
       int rightPrec) {
     final SqlWriter.Frame frame = writer.startFunCall(getName());
-    call.operand(0).unparse(writer, leftPrec, rightPrec);
+    call.operand(0).unparse(writer, 0, 0);
     writer.sep("FROM");
-    call.operand(1).unparse(writer, leftPrec, rightPrec);
+    call.operand(1).unparse(writer, 0, 0);
     writer.endFunCall(frame);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
index bec351c..e9d147f 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
@@ -17,28 +17,33 @@
 package org.apache.calcite.sql.fun;
 
 import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 
+import com.google.common.base.Preconditions;
+
 /**
- * Definition of the "FLOOR" builtin SQL function.
+ * Definition of the "FLOOR" and "CEIL" built-in SQL functions.
  */
-public class SqlFloorFunction extends SqlFunction {
+public class SqlFloorFunction extends SqlMonotonicUnaryFunction {
   //~ Constructors -----------------------------------------------------------
 
-  public SqlFloorFunction() {
-    super(
-        "FLOOR",
-        SqlKind.OTHER_FUNCTION,
-        ReturnTypes.ARG0,
-        null,
-        OperandTypes.NUMERIC,
+  public SqlFloorFunction(SqlKind kind) {
+    super(kind.name(), kind, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null,
+        OperandTypes.or(OperandTypes.NUMERIC_OR_INTERVAL,
+            OperandTypes.sequence(
+                "'" + kind + "(<DATE> TO <TIME_UNIT>)'\n"
+                + "'" + kind + "(<TIME> TO <TIME_UNIT>)'\n"
+                + "'" + kind + "(<TIMESTAMP> TO <TIME_UNIT>)'",
+                OperandTypes.DATETIME,
+                OperandTypes.ANY)),
         SqlFunctionCategory.NUMERIC);
+    Preconditions.checkArgument(kind == SqlKind.FLOOR || kind == SqlKind.CEIL);
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -49,6 +54,19 @@ public class SqlFloorFunction extends SqlFunction {
     // Monotonic iff its first argument is, but not strict.
     return scope.getMonotonicity(call.operand(0)).unstrict();
   }
+
+  @Override public void unparse(SqlWriter writer, SqlCall call, int leftPrec,
+      int rightPrec) {
+    final SqlWriter.Frame frame = writer.startFunCall(getName());
+    if (call.operandCount() == 2) {
+      call.operand(0).unparse(writer, 0, 100);
+      writer.sep("TO");
+      call.operand(1).unparse(writer, 100, 0);
+    } else {
+      call.operand(0).unparse(writer, 0, 0);
+    }
+    writer.endFunCall(frame);
+  }
 }
 
 // End SqlFloorFunction.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 9e5644d..ef7133b 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlOperandCountRanges;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlModality;
 
 import com.google.common.collect.ImmutableList;
 
@@ -958,9 +959,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
    * {@link #EXPLICIT_TABLE} is a prefix operator.
    */
   public static final SqlSpecialOperator COLLECTION_TABLE =
-      new SqlCollectionTableOperator(
-          "TABLE",
-          SqlCollectionTableOperator.MODALITY_RELATIONAL);
+      new SqlCollectionTableOperator("TABLE", SqlModality.RELATION);
 
   public static final SqlOverlapsOperator OVERLAPS =
       new SqlOverlapsOperator();
@@ -1208,26 +1207,12 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
   /**
    * The <code>FLOOR</code> function.
    */
-  public static final SqlFunction FLOOR =
-      new SqlMonotonicUnaryFunction(
-          "FLOOR",
-          SqlKind.OTHER_FUNCTION,
-          ReturnTypes.ARG0_OR_EXACT_NO_SCALE,
-          null,
-          OperandTypes.NUMERIC_OR_INTERVAL,
-          SqlFunctionCategory.NUMERIC);
+  public static final SqlFunction FLOOR = new SqlFloorFunction(SqlKind.FLOOR);
 
   /**
    * The <code>CEIL</code> function.
    */
-  public static final SqlFunction CEIL =
-      new SqlMonotonicUnaryFunction(
-          "CEIL",
-          SqlKind.OTHER_FUNCTION,
-          ReturnTypes.ARG0_OR_EXACT_NO_SCALE,
-          null,
-          OperandTypes.NUMERIC_OR_INTERVAL,
-          SqlFunctionCategory.NUMERIC);
+  public static final SqlFunction CEIL = new SqlFloorFunction(SqlKind.CEIL);
 
   /**
    * The <code>USER</code> function.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java b/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
index e5bb0ce..712c3c4 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
@@ -18,15 +18,15 @@ package org.apache.calcite.sql.type;
 
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperandCountRange;
 import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.util.Util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import java.util.AbstractList;
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  * This class allows multiple existing {@link SqlOperandTypeChecker} rules to be
@@ -75,8 +75,7 @@ import java.util.List;
  * SqlSingleOperandTypeChecker, and signature generation is not supported. For
  * AND composition, only the first rule is used for signature generation.
  */
-public class CompositeOperandTypeChecker
-    implements SqlSingleOperandTypeChecker {
+public class CompositeOperandTypeChecker implements SqlOperandTypeChecker {
   //~ Enums ------------------------------------------------------------------
 
   /** How operands are composed. */
@@ -86,8 +85,9 @@ public class CompositeOperandTypeChecker
 
   //~ Instance fields --------------------------------------------------------
 
-  private final ImmutableList<SqlSingleOperandTypeChecker> allowedRules;
-  private final Composition composition;
+  protected final ImmutableList<? extends SqlOperandTypeChecker> allowedRules;
+  protected final Composition composition;
+  private final String allowedSignatures;
 
   //~ Constructors -----------------------------------------------------------
 
@@ -97,25 +97,30 @@ public class CompositeOperandTypeChecker
    */
   CompositeOperandTypeChecker(
       Composition composition,
-      ImmutableList<SqlSingleOperandTypeChecker> allowedRules) {
-    assert null != allowedRules;
+      ImmutableList<? extends SqlOperandTypeChecker> allowedRules,
+      @Nullable String allowedSignatures) {
+    this.allowedRules = Preconditions.checkNotNull(allowedRules);
+    this.composition = Preconditions.checkNotNull(composition);
+    this.allowedSignatures = allowedSignatures;
     assert allowedRules.size() > 1;
-    this.allowedRules = allowedRules;
-    this.composition = composition;
   }
 
   //~ Methods ----------------------------------------------------------------
 
-  public ImmutableList<SqlSingleOperandTypeChecker> getRules() {
+  public ImmutableList<? extends SqlOperandTypeChecker> getRules() {
     return allowedRules;
   }
 
   public String getAllowedSignatures(SqlOperator op, String opName) {
+    if (allowedSignatures != null) {
+      return allowedSignatures;
+    }
     if (composition == Composition.SEQUENCE) {
-      throw Util.needToImplement("must override getAllowedSignatures");
+      throw new AssertionError(
+          "specify allowedSignatures or override getAllowedSignatures");
     }
     StringBuilder ret = new StringBuilder();
-    for (Ord<SqlSingleOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+    for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
       if (ord.i > 0) {
         ret.append(SqlOperator.NL);
       }
@@ -213,134 +218,66 @@ public class CompositeOperandTypeChecker
     return max;
   }
 
-  public boolean checkSingleOperandType(
+  public boolean checkOperandTypes(
       SqlCallBinding callBinding,
-      SqlNode node,
-      int iFormalOperand,
       boolean throwOnFailure) {
-    assert allowedRules.size() >= 1;
-
-    if (composition == Composition.SEQUENCE) {
-      return allowedRules.get(iFormalOperand).checkSingleOperandType(
-          callBinding, node, 0, throwOnFailure);
-    }
-
-    int typeErrorCount = 0;
-
-    boolean throwOnAndFailure =
-        (composition == Composition.AND)
-            && throwOnFailure;
-
-    for (SqlSingleOperandTypeChecker rule : allowedRules) {
-      if (!rule.checkSingleOperandType(
-          callBinding,
-          node,
-          iFormalOperand,
-          throwOnAndFailure)) {
-        typeErrorCount++;
-      }
+    if (check(callBinding)) {
+      return true;
     }
-
-    boolean ret;
-    switch (composition) {
-    case AND:
-      ret = typeErrorCount == 0;
-      break;
-    case OR:
-      ret = typeErrorCount < allowedRules.size();
-      break;
-    default:
-      // should never come here
-      throw Util.unexpected(composition);
+    if (!throwOnFailure) {
+      return false;
     }
-
-    if (!ret && throwOnFailure) {
-      // In the case of a composite OR, we want to throw an error
-      // describing in more detail what the problem was, hence doing the
-      // loop again.
-      for (SqlSingleOperandTypeChecker rule : allowedRules) {
-        rule.checkSingleOperandType(
-            callBinding,
-            node,
-            iFormalOperand,
-            true);
+    if (composition == Composition.OR) {
+      for (SqlOperandTypeChecker allowedRule : allowedRules) {
+        allowedRule.checkOperandTypes(callBinding, true);
       }
-
-      // If no exception thrown, just throw a generic validation signature
-      // error.
-      throw callBinding.newValidationSignatureError();
     }
 
-    return ret;
+    // If no exception thrown, just throw a generic validation
+    // signature error.
+    throw callBinding.newValidationSignatureError();
   }
 
-  public boolean checkOperandTypes(
-      SqlCallBinding callBinding,
-      boolean throwOnFailure) {
-    int typeErrorCount = 0;
-
-  label:
-    for (Ord<SqlSingleOperandTypeChecker> ord : Ord.zip(allowedRules)) {
-      SqlSingleOperandTypeChecker rule = ord.e;
-
-      switch (composition) {
-      case SEQUENCE:
-        if (ord.i >= callBinding.getOperandCount()) {
-          break label;
-        }
-        if (!rule.checkSingleOperandType(
+  private boolean check(SqlCallBinding callBinding) {
+    switch (composition) {
+    case SEQUENCE:
+      if (callBinding.getOperandCount() != allowedRules.size()) {
+        return false;
+      }
+      for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+        SqlOperandTypeChecker rule = ord.e;
+        if (!((SqlSingleOperandTypeChecker) rule).checkSingleOperandType(
             callBinding,
             callBinding.getCall().operand(ord.i),
             0,
             false)) {
-          typeErrorCount++;
+          return false;
         }
-        break;
-      default:
+      }
+      return true;
+
+    case AND:
+      for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+        SqlOperandTypeChecker rule = ord.e;
         if (!rule.checkOperandTypes(callBinding, false)) {
-          typeErrorCount++;
-          if (composition == Composition.AND) {
-            // Avoid trying other rules in AND if the first one fails.
-            break label;
-          }
-        } else if (composition == Composition.OR) {
-          break label; // true OR any == true, just break
+          // Avoid trying other rules in AND if the first one fails.
+          return false;
         }
-        break;
       }
-    }
+      return true;
 
-    boolean failed;
-    switch (composition) {
-    case AND:
-    case SEQUENCE:
-      failed = typeErrorCount > 0;
-      break;
     case OR:
-      failed = typeErrorCount == allowedRules.size();
-      break;
-    default:
-      throw new AssertionError();
-    }
-
-    if (failed) {
-      if (throwOnFailure) {
-        // In the case of a composite OR, we want to throw an error
-        // describing in more detail what the problem was, hence doing
-        // the loop again.
-        if (composition == Composition.OR) {
-          for (SqlOperandTypeChecker allowedRule : allowedRules) {
-            allowedRule.checkOperandTypes(callBinding, true);
-          }
+      for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+        SqlOperandTypeChecker rule = ord.e;
+        if (rule.checkOperandTypes(callBinding, false)) {
+          return true;
         }
-
-        // If no exception thrown, just throw a generic validation
-        // signature error.
-        throw callBinding.newValidationSignatureError();
       }
       return false;
+
+    default:
+      throw new AssertionError();
     }
-    return true;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java b/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
new file mode 100644
index 0000000..de8c303
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Allows multiple
+ * {@link org.apache.calcite.sql.type.SqlSingleOperandTypeChecker} rules to be
+ * combined into one rule.
+ */
+public class CompositeSingleOperandTypeChecker
+    extends CompositeOperandTypeChecker
+    implements SqlSingleOperandTypeChecker {
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Package private. Use {@link org.apache.calcite.sql.type.OperandTypes#and},
+   * {@link org.apache.calcite.sql.type.OperandTypes#or}.
+   */
+  CompositeSingleOperandTypeChecker(
+      CompositeOperandTypeChecker.Composition composition,
+      ImmutableList<? extends SqlSingleOperandTypeChecker> allowedRules,
+      String allowedSignatures) {
+    super(composition, allowedRules, allowedSignatures);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @SuppressWarnings("unchecked")
+  @Override public ImmutableList<? extends SqlSingleOperandTypeChecker>
+  getRules() {
+    return (ImmutableList<? extends SqlSingleOperandTypeChecker>) allowedRules;
+  }
+
+  public boolean checkSingleOperandType(
+      SqlCallBinding callBinding,
+      SqlNode node,
+      int iFormalOperand,
+      boolean throwOnFailure) {
+    assert allowedRules.size() >= 1;
+
+    final ImmutableList<? extends SqlSingleOperandTypeChecker> rules =
+        getRules();
+    if (composition == Composition.SEQUENCE) {
+      return rules.get(iFormalOperand).checkSingleOperandType(
+          callBinding, node, 0, throwOnFailure);
+    }
+
+    int typeErrorCount = 0;
+
+    boolean throwOnAndFailure =
+        (composition == Composition.AND)
+            && throwOnFailure;
+
+    for (SqlSingleOperandTypeChecker rule : rules) {
+      if (!rule.checkSingleOperandType(
+          callBinding,
+          node,
+          iFormalOperand,
+          throwOnAndFailure)) {
+        typeErrorCount++;
+      }
+    }
+
+    boolean ret;
+    switch (composition) {
+    case AND:
+      ret = typeErrorCount == 0;
+      break;
+    case OR:
+      ret = typeErrorCount < allowedRules.size();
+      break;
+    default:
+      // should never come here
+      throw Util.unexpected(composition);
+    }
+
+    if (!ret && throwOnFailure) {
+      // In the case of a composite OR, we want to throw an error
+      // describing in more detail what the problem was, hence doing the
+      // loop again.
+      for (SqlSingleOperandTypeChecker rule : rules) {
+        rule.checkSingleOperandType(
+            callBinding,
+            node,
+            iFormalOperand,
+            true);
+      }
+
+      // If no exception thrown, just throw a generic validation signature
+      // error.
+      throw callBinding.newValidationSignatureError();
+    }
+
+    return ret;
+  }
+}
+
+// End CompositeSingleOperandTypeChecker.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
index c70d403..10bccbd 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
@@ -72,21 +72,52 @@ public abstract class OperandTypes {
   /**
    * Creates a checker that passes if any one of the rules passes.
    */
+  public static SqlOperandTypeChecker or(SqlOperandTypeChecker... rules) {
+    return new CompositeOperandTypeChecker(
+        CompositeOperandTypeChecker.Composition.OR,
+        ImmutableList.copyOf(rules), null);
+  }
+
+  /**
+   * Creates a single-operand checker that passes if any one of the rules
+   * passes.
+   */
+  public static SqlOperandTypeChecker and(SqlOperandTypeChecker... rules) {
+    return new CompositeOperandTypeChecker(
+        CompositeOperandTypeChecker.Composition.AND,
+        ImmutableList.copyOf(rules), null);
+  }
+
+  /**
+   * Creates a single-operand checker that passes if any one of the rules
+   * passes.
+   */
   public static SqlSingleOperandTypeChecker or(
       SqlSingleOperandTypeChecker... rules) {
-    return new CompositeOperandTypeChecker(
+    return new CompositeSingleOperandTypeChecker(
         CompositeOperandTypeChecker.Composition.OR,
-        ImmutableList.copyOf(rules));
+        ImmutableList.copyOf(rules), null);
   }
 
   /**
-   * Creates a checker that passes if any one of the rules passes.
+   * Creates a single-operand checker that passes if any one of the rules
+   * passes.
    */
   public static SqlSingleOperandTypeChecker and(
       SqlSingleOperandTypeChecker... rules) {
-    return new CompositeOperandTypeChecker(
+    return new CompositeSingleOperandTypeChecker(
         CompositeOperandTypeChecker.Composition.AND,
-        ImmutableList.copyOf(rules));
+        ImmutableList.copyOf(rules), null);
+  }
+
+  /**
+   * Creates an operand checker from a sequence of single-operand checkers.
+   */
+  public static SqlOperandTypeChecker sequence(String allowedSignatures,
+      SqlSingleOperandTypeChecker... rules) {
+    return new CompositeOperandTypeChecker(
+        CompositeOperandTypeChecker.Composition.SEQUENCE,
+        ImmutableList.copyOf(rules), allowedSignatures);
   }
 
   // ----------------------------------------------------------------------
@@ -111,7 +142,7 @@ public abstract class OperandTypes {
   public static final SqlOperandTypeChecker ONE_OR_MORE =
       variadic(SqlOperandCountRanges.from(1));
 
-  private static SqlOperandTypeChecker variadic(
+  public static SqlOperandTypeChecker variadic(
       final SqlOperandCountRange range) {
     return new SqlOperandTypeChecker() {
       public boolean checkOperandTypes(