You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/03/08 00:50:59 UTC
[4/4] incubator-calcite git commit: [CALCITE-602] Streaming queries
[CALCITE-602] Streaming queries
Validate and implement streaming queries: streaming scan, project, filter, aggregate, sort.
Implement CEIL and FLOOR functions for date-time and numeric values.
Add CompositeSingleOperandTypeChecker, and make CompositeOperandTypeChecker work for multiple operands.
Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0ecd8702
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0ecd8702
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0ecd8702
Branch: refs/heads/master
Commit: 0ecd8702ab95fd59b7ef2182720d12b2167ae968
Parents: c50a6e0
Author: Julian Hyde <jh...@apache.org>
Authored: Sat Feb 21 18:10:32 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Sat Mar 7 15:33:21 2015 -0800
----------------------------------------------------------------------
README.md | 1 +
.../calcite/avatica/util/DateTimeUtils.java | 69 +++
core/src/main/codegen/templates/Parser.jj | 79 ++-
.../adapter/enumerable/EnumerableTableScan.java | 4 +-
.../calcite/adapter/enumerable/RexImpTable.java | 77 ++-
.../org/apache/calcite/jdbc/CalciteSchema.java | 7 +-
.../java/org/apache/calcite/model/JsonRoot.java | 1 +
.../org/apache/calcite/model/JsonStream.java | 33 ++
.../org/apache/calcite/model/JsonTable.java | 8 +-
.../calcite/prepare/CalcitePrepareImpl.java | 10 +
.../apache/calcite/prepare/RelOptTableImpl.java | 45 ++
.../java/org/apache/calcite/rel/core/Sort.java | 2 +-
.../java/org/apache/calcite/rel/stream/Chi.java | 38 ++
.../org/apache/calcite/rel/stream/Delta.java | 49 ++
.../apache/calcite/rel/stream/LogicalChi.java | 33 ++
.../apache/calcite/rel/stream/LogicalDelta.java | 61 +++
.../apache/calcite/rel/stream/StreamRules.java | 198 +++++++
.../apache/calcite/rel/stream/package-info.java | 35 ++
.../apache/calcite/runtime/CalciteResource.java | 18 +
.../apache/calcite/runtime/SqlFunctions.java | 110 ++++
.../java/org/apache/calcite/schema/Schema.java | 3 +
.../apache/calcite/schema/StreamableTable.java | 32 ++
.../java/org/apache/calcite/sql/SqlKind.java | 12 +-
.../apache/calcite/sql/SqlSelectKeyword.java | 3 +-
.../calcite/sql/advise/SqlAdvisorValidator.java | 6 +
.../apache/calcite/sql/fun/SqlCeilFunction.java | 52 --
.../sql/fun/SqlCollectionTableOperator.java | 23 +-
.../calcite/sql/fun/SqlExtractFunction.java | 4 +-
.../calcite/sql/fun/SqlFloorFunction.java | 38 +-
.../calcite/sql/fun/SqlStdOperatorTable.java | 23 +-
.../sql/type/CompositeOperandTypeChecker.java | 173 ++----
.../type/CompositeSingleOperandTypeChecker.java | 119 +++++
.../apache/calcite/sql/type/OperandTypes.java | 43 +-
.../calcite/sql/validate/AbstractNamespace.java | 4 +
.../sql/validate/IdentifierNamespace.java | 10 +-
.../calcite/sql/validate/SelectNamespace.java | 4 +
.../calcite/sql/validate/SetopNamespace.java | 34 ++
.../calcite/sql/validate/SqlModality.java | 25 +
.../calcite/sql/validate/SqlValidator.java | 13 +
.../calcite/sql/validate/SqlValidatorImpl.java | 181 ++++++-
.../sql/validate/SqlValidatorNamespace.java | 7 +
.../calcite/sql/validate/SqlValidatorTable.java | 2 +
.../sql/validate/TableConstructorNamespace.java | 4 +
.../calcite/sql/validate/TableNamespace.java | 5 +
.../sql2rel/RelStructuredTypeFlattener.java | 10 +
.../calcite/sql2rel/SqlToRelConverter.java | 9 +
.../org/apache/calcite/util/BuiltInMethod.java | 10 +
.../calcite/runtime/CalciteResource.properties | 6 +
.../calcite/sql/parser/SqlParserTest.java | 27 +
.../apache/calcite/sql/test/SqlAdvisorTest.java | 5 +-
.../calcite/sql/test/SqlOperatorBaseTest.java | 262 ++++------
.../org/apache/calcite/test/CalciteSuite.java | 1 +
.../apache/calcite/test/MockCatalogReader.java | 62 ++-
.../apache/calcite/test/RelOptRulesTest.java | 2 +-
.../apache/calcite/test/SqlFunctionsTest.java | 46 +-
.../calcite/test/SqlToRelConverterTest.java | 21 +
.../apache/calcite/test/SqlValidatorTest.java | 207 ++++++--
.../org/apache/calcite/test/StreamTest.java | 296 +++++++++++
.../calcite/test/SqlToRelConverterTest.xml | 46 +-
doc/REFERENCE.md | 6 +-
doc/STREAM.md | 521 +++++++++++++++++++
.../java/org/apache/calcite/linq4j/Linq4j.java | 7 +-
.../calcite/linq4j/function/Functions.java | 7 +-
.../org/apache/calcite/linq4j/tree/Visitor.java | 11 +-
64 files changed, 2745 insertions(+), 515 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 48d485e..40458ec 100644
--- a/README.md
+++ b/README.md
@@ -198,6 +198,7 @@ For more details, see the <a href="doc/REFERENCE.md">Reference guide</a>.
* <a href="doc/HOWTO.md">HOWTO</a>
* <a href="doc/MODEL.md">JSON model</a>
* <a href="doc/REFERENCE.md">Reference guide</a>
+* <a href="doc/STREAM.md">Streaming SQL</a>
* <a href="doc/HISTORY.md">Release notes and history</a>
### Pre-Apache resources
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
index 084c027..8acdf93 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/util/DateTimeUtils.java
@@ -700,6 +700,66 @@ public class DateTimeUtils {
}
}
+ public static long unixTimestampFloor(TimeUnitRange range, long timestamp) {
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ final int f = julianDateFloor(range, date + EPOCH_JULIAN, true);
+ return (long) f * MILLIS_PER_DAY;
+ }
+
+ public static long unixDateFloor(TimeUnitRange range, long date) {
+ return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+ }
+
+ public static long unixTimestampCeil(TimeUnitRange range, long timestamp) {
+ int date = (int) (timestamp / MILLIS_PER_DAY);
+ final int f = julianDateFloor(range, date + EPOCH_JULIAN, false);
+ return (long) f * MILLIS_PER_DAY;
+ }
+
+ public static long unixDateCeil(TimeUnitRange range, long date) {
+ return julianDateFloor(range, (int) date + EPOCH_JULIAN, true);
+ }
+
+ private static int julianDateFloor(TimeUnitRange range, int julian,
+ boolean floor) {
+ // this shifts the epoch back to astronomical year -4800 instead of the
+ // start of the Christian era in year AD 1 of the proleptic Gregorian
+ // calendar.
+ int j = julian + 32044;
+ int g = j / 146097;
+ int dg = j % 146097;
+ int c = (dg / 36524 + 1) * 3 / 4;
+ int dc = dg - c * 36524;
+ int b = dc / 1461;
+ int db = dc % 1461;
+ int a = (db / 365 + 1) * 3 / 4;
+ int da = db - a * 365;
+
+ // integer number of full years elapsed since March 1, 4801 BC
+ int y = g * 400 + c * 100 + b * 4 + a;
+ // integer number of full months elapsed since the last March 1
+ int m = (da * 5 + 308) / 153 - 2;
+ // number of days elapsed since day 1 of the month
+ int d = da - (m + 4) * 153 / 5 + 122;
+ int year = y - 4800 + (m + 2) / 12;
+ int month = (m + 2) % 12 + 1;
+ int day = d + 1;
+ switch (range) {
+ case YEAR:
+ if (!floor && (month > 1 || day > 1)) {
+ ++year;
+ }
+ return ymdToUnixDate(year, 1, 1);
+ case MONTH:
+ if (!floor && day > 1) {
+ ++month;
+ }
+ return ymdToUnixDate(year, month, 1);
+ default:
+ throw new AssertionError(range);
+ }
+ }
+
public static int ymdToUnixDate(int year, int month, int day) {
final int julian = ymdToJulian(year, month, day);
return julian - EPOCH_JULIAN;
@@ -721,6 +781,15 @@ public class DateTimeUtils {
return j;
}
+ public static long unixTimestamp(int year, int month, int day, int hour,
+ int minute, int second) {
+ final int date = ymdToUnixDate(year, month, day);
+ return (long) date * MILLIS_PER_DAY
+ + (long) hour * MILLIS_PER_HOUR
+ + (long) minute * MILLIS_PER_MINUTE
+ + (long) second * MILLIS_PER_SECOND;
+ }
+
//~ Inner Classes ----------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/codegen/templates/Parser.jj
----------------------------------------------------------------------
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index cc9a555..6b62ce3 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -91,6 +91,7 @@ import org.apache.calcite.sql.parser.SqlParserImplFactory;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.parser.SqlParserUtil;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
import com.google.common.collect.ImmutableList;
@@ -304,12 +305,12 @@ SqlNode ExtendedBuiltinFunctionCall() :
/*
* Parse Floor/Ceil function parameters
*/
-SqlNode FloorCeilOptions( SqlParserPos pos, boolean floorFlag) :
+SqlNode FloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
{
SqlNode node;
}
{
- node = StandardFloorCeilOptions( pos, floorFlag)
+ node = StandardFloorCeilOptions(pos, floorFlag)
{
return node;
}
@@ -943,14 +944,18 @@ SqlSelect SqlSelect() :
}
SqlSelectKeywords(keywords)
(
+ <STREAM> {
+ keywords.add(SqlSelectKeyword.STREAM.symbol(getPos()));
+ }
+ )?
+ (
<DISTINCT> {
keywords.add(SqlSelectKeyword.DISTINCT.symbol(getPos()));
}
| <ALL> {
keywords.add(SqlSelectKeyword.ALL.symbol(getPos()));
}
- | E()
- )
+ )?
selectList = SelectList()
<FROM>
fromClause = FromClause()
@@ -1958,7 +1963,7 @@ SqlNodeList GroupByOpt() :
{
<GROUP> { pos = getPos(); }
<BY> list = GroupingElementList() {
- return new SqlNodeList(list, pos);
+ return new SqlNodeList(list, pos.plusAll(list));
}
|
{
@@ -2222,19 +2227,16 @@ SqlNodeList OrderBy(boolean accept) :
SqlParserPos pos;
}
{
- <ORDER>
- {
+ <ORDER> {
+ pos = getPos();
if (!accept) {
// Someone told us ORDER BY wasn't allowed here. So why
// did they bother calling us? To get the correct
// parser position for error reporting.
- throw SqlUtil.newContextException(getPos(),
- RESOURCE.illegalOrderBy());
+ throw SqlUtil.newContextException(pos, RESOURCE.illegalOrderBy());
}
}
- <BY> e = OrderItem()
- {
- pos = getPos();
+ <BY> e = OrderItem() {
list = startList(e);
}
(
@@ -2243,7 +2245,7 @@ SqlNodeList OrderBy(boolean accept) :
LOOKAHEAD(2) <COMMA> e = OrderItem() { list.add(e); }
) *
{
- return new SqlNodeList(list, pos);
+ return new SqlNodeList(list, pos.plusAll(list));
}
}
@@ -2253,29 +2255,22 @@ SqlNodeList OrderBy(boolean accept) :
SqlNode OrderItem() :
{
SqlNode e;
- SqlParserPos pos;
}
{
e = Expression(ExprContext.ACCEPT_SUBQUERY)
(
<ASC>
- | <DESC>
- {
- pos = getPos();
- e = SqlStdOperatorTable.DESC.createCall(pos, e);
+ | <DESC> {
+ e = SqlStdOperatorTable.DESC.createCall(getPos(), e);
}
)?
(
- <NULLS> <FIRST>
- {
- pos = getPos();
- e = SqlStdOperatorTable.NULLS_FIRST.createCall(pos, e);
+ <NULLS> <FIRST> {
+ e = SqlStdOperatorTable.NULLS_FIRST.createCall(getPos(), e);
}
|
- <NULLS> <LAST>
- {
- pos = getPos();
- e = SqlStdOperatorTable.NULLS_LAST.createCall(pos, e);
+ <NULLS> <LAST> {
+ e = SqlStdOperatorTable.NULLS_LAST.createCall(getPos(), e);
}
)?
{
@@ -3864,7 +3859,7 @@ SqlNode BuiltinFunctionCall() :
}
<LPAREN>
unit = TimeUnit()
- { args = startList(new SqlIntervalQualifier(unit, null,getPos())); }
+ { args = startList(new SqlIntervalQualifier(unit, null, getPos())); }
<FROM>
e = Expression(ExprContext.ACCEPT_SUBQUERY) { args.add(e); }
<RPAREN>
@@ -4183,13 +4178,22 @@ SqlNode StandardFloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
SqlIdentifier name;
SqlParserPos overPos = null;
SqlIdentifier id = null;
- SqlNode e = null;
- SqlCall function = null;
- SqlNodeList args;
+ SqlNode e;
+ List<SqlNode> args;
+ TimeUnit unit;
boolean over = false;
}
{
- args = ParenthesizedQueryOrCommaList(ExprContext.ACCEPT_SUBQUERY)
+ <LPAREN> e = Expression(ExprContext.ACCEPT_SUBQUERY) {
+ args = startList(e);
+ }
+ (
+ <TO>
+ unit = TimeUnit() {
+ args.add(new SqlIntervalQualifier(unit, null, getPos()));
+ }
+ )?
+ <RPAREN>
[
<OVER>
{
@@ -4205,16 +4209,10 @@ SqlNode StandardFloorCeilOptions(SqlParserPos pos, boolean floorFlag) :
SqlOperator op = floorFlag
? SqlStdOperatorTable.FLOOR
: SqlStdOperatorTable.CEIL;
- function = op.createCall(
- pos, args.toArray());
+ final SqlCall function = op.createCall(pos.plus(getPos()), args);
if (over) {
- if (id != null) {
- return SqlStdOperatorTable.OVER.createCall(
- overPos, new SqlNode[] {function, id});
- } else {
- return SqlStdOperatorTable.OVER.createCall(
- overPos, new SqlNode[] { function, e });
- }
+ return SqlStdOperatorTable.OVER.createCall(overPos, function,
+ Util.first(id, e));
} else {
return function;
}
@@ -4981,6 +4979,7 @@ SqlPostfixOperator PostfixRowOperator() :
| < STATIC: "STATIC" >
| < STDDEV_POP: "STDDEV_POP" >
| < STDDEV_SAMP: "STDDEV_SAMP" >
+ | < STREAM: "STREAM" >
| < STRUCTURE: "STRUCTURE" >
| < STYLE: "STYLE" >
| < SUBCLASS_ORIGIN: "SUBCLASS_ORIGIN" >
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
index ddd3212..ad67052 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableScan.java
@@ -37,6 +37,7 @@ import org.apache.calcite.schema.FilterableTable;
import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.util.BuiltInMethod;
@@ -104,7 +105,8 @@ public class EnumerableTableScan
}
} else if (table instanceof ScannableTable
|| table instanceof FilterableTable
- || table instanceof ProjectableFilterableTable) {
+ || table instanceof ProjectableFilterableTable
+ || table instanceof StreamableTable) {
return Object[].class;
} else {
return Object.class;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index e1fd9b0..29c107f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -17,6 +17,8 @@
package org.apache.calcite.adapter.enumerable;
import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.BlockStatement;
@@ -236,14 +238,20 @@ public class RexImpTable {
defineMethod(LN, "ln", NullPolicy.STRICT);
defineMethod(LOG10, "log10", NullPolicy.STRICT);
defineMethod(ABS, "abs", NullPolicy.STRICT);
- defineMethod(CEIL, "ceil", NullPolicy.STRICT);
- defineMethod(FLOOR, "floor", NullPolicy.STRICT);
// datetime
defineImplementor(DATETIME_PLUS, NullPolicy.STRICT,
new DatetimeArithmeticImplementor(), false);
defineMethod(EXTRACT_DATE, BuiltInMethod.UNIX_DATE_EXTRACT.method,
NullPolicy.STRICT);
+ defineImplementor(FLOOR, NullPolicy.STRICT,
+ new FloorImplementor(BuiltInMethod.FLOOR.method.getName(),
+ BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method,
+ BuiltInMethod.UNIX_DATE_FLOOR.method), false);
+ defineImplementor(CEIL, NullPolicy.STRICT,
+ new FloorImplementor(BuiltInMethod.CEIL.method.getName(),
+ BuiltInMethod.UNIX_TIMESTAMP_CEIL.method,
+ BuiltInMethod.UNIX_DATE_CEIL.method), false);
map.put(IS_NULL, new IsXxxImplementor(null, false));
map.put(IS_NOT_NULL, new IsXxxImplementor(null, true));
@@ -1402,9 +1410,70 @@ public class RexImpTable {
}
}
+ /** Implementor for the {@code FLOOR} and {@code CEIL} functions. */
+ private static class FloorImplementor extends MethodNameImplementor {
+ final Method timestampMethod;
+ final Method dateMethod;
+
+ FloorImplementor(String methodName, Method timestampMethod,
+ Method dateMethod) {
+ super(methodName);
+ this.timestampMethod = timestampMethod;
+ this.dateMethod = dateMethod;
+ }
+
+ public Expression implement(RexToLixTranslator translator, RexCall call,
+ List<Expression> translatedOperands) {
+ switch (call.getOperands().size()) {
+ case 1:
+ switch (call.getType().getSqlTypeName()) {
+ case BIGINT:
+ case INTEGER:
+ case SMALLINT:
+ case TINYINT:
+ return translatedOperands.get(0);
+ }
+ return super.implement(translator, call, translatedOperands);
+ case 2:
+ final Type type;
+ final Method floorMethod;
+ switch (call.getType().getSqlTypeName()) {
+ case TIMESTAMP:
+ type = long.class;
+ floorMethod = timestampMethod;
+ break;
+ default:
+ type = int.class;
+ floorMethod = dateMethod;
+ }
+ final ConstantExpression tur =
+ (ConstantExpression) translatedOperands.get(1);
+ final TimeUnitRange timeUnitRange = (TimeUnitRange) tur.value;
+ switch (timeUnitRange) {
+ case YEAR:
+ case MONTH:
+ return Expressions.call(floorMethod, tur,
+ call(translatedOperands, type, TimeUnit.DAY));
+ default:
+ return call(translatedOperands, type, timeUnitRange.startUnit);
+ }
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private Expression call(List<Expression> translatedOperands, Type type,
+ TimeUnit timeUnit) {
+ return Expressions.call(SqlFunctions.class, methodName,
+ Types.castIfNecessary(type, translatedOperands.get(0)),
+ Types.castIfNecessary(type,
+ Expressions.constant(timeUnit.multiplier)));
+ }
+ }
+
/** Implementor for a function that generates calls to a given method. */
private static class MethodImplementor implements NotNullImplementor {
- private final Method method;
+ protected final Method method;
MethodImplementor(Method method) {
this.method = method;
@@ -1428,7 +1497,7 @@ public class RexImpTable {
* <p>Use this, as opposed to {@link MethodImplementor}, if the SQL function
* is overloaded; then you can use one implementor for several overloads. */
private static class MethodNameImplementor implements NotNullImplementor {
- private final String methodName;
+ protected final String methodName;
MethodNameImplementor(String methodName) {
this.methodName = methodName;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
index 4b34c3e..346b721 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
@@ -16,7 +16,6 @@
*/
package org.apache.calcite.jdbc;
-import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.materialize.Lattice;
import org.apache.calcite.schema.Function;
@@ -522,10 +521,8 @@ public class CalciteSchema {
public final String name;
public Entry(CalciteSchema schema, String name) {
- Linq4j.requireNonNull(schema);
- Linq4j.requireNonNull(name);
- this.schema = schema;
- this.name = name;
+ this.schema = Preconditions.checkNotNull(schema);
+ this.name = Preconditions.checkNotNull(name);
}
/** Returns this object's path. For example ["hr", "emps"]. */
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonRoot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonRoot.java b/core/src/main/java/org/apache/calcite/model/JsonRoot.java
index 25e1fa3..ba73e5b 100644
--- a/core/src/main/java/org/apache/calcite/model/JsonRoot.java
+++ b/core/src/main/java/org/apache/calcite/model/JsonRoot.java
@@ -32,6 +32,7 @@ import java.util.List;
* {@link JsonSchema} (in collection {@link JsonRoot#schemas schemas})
* {@link JsonTable} (in collection {@link JsonMapSchema#tables tables})
* {@link JsonColumn} (in collection {@link JsonTable#columns columns}
+ * {@link JsonStream} (in field {@link JsonTable#stream stream}
* {@link JsonView}
* {@link JsonFunction} (in collection {@link JsonMapSchema#functions functions})
* {@link JsonLattice} (in collection {@link JsonSchema#lattices lattices})
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonStream.java b/core/src/main/java/org/apache/calcite/model/JsonStream.java
new file mode 100644
index 0000000..f8d728a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/model/JsonStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.model;
+
+/**
+ * Information about whether a table allows streaming.
+ *
+ * @see org.apache.calcite.model.JsonRoot Description of schema elements
+ * @see org.apache.calcite.model.JsonTable#stream
+ */
+public class JsonStream {
+ /** Whether the table allows streaming. */
+ public boolean stream = true;
+
+ /** Whether the history of the table is available. */
+ public boolean history = false;
+}
+
+// End JsonStream.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/model/JsonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/JsonTable.java b/core/src/main/java/org/apache/calcite/model/JsonTable.java
index c0e7d8d..806405b 100644
--- a/core/src/main/java/org/apache/calcite/model/JsonTable.java
+++ b/core/src/main/java/org/apache/calcite/model/JsonTable.java
@@ -18,8 +18,8 @@ package org.apache.calcite.model;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.collect.Lists;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -36,7 +36,11 @@ import java.util.List;
@JsonSubTypes.Type(value = JsonView.class, name = "view") })
public abstract class JsonTable {
public String name;
- public final List<JsonColumn> columns = new ArrayList<JsonColumn>();
+ public final List<JsonColumn> columns = Lists.newArrayList();
+
+ /** Information about whether the table can be streamed, and if so, whether
+ * the history of the table is also available. */
+ public JsonStream stream;
public abstract void accept(ModelHandler handler);
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
index 2f884c1..e655e67 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.rel.rules.ReduceExpressionsRule;
import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.rules.TableScanRule;
import org.apache.calcite.rel.rules.ValuesReduceRule;
+import org.apache.calcite.rel.stream.StreamRules;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
@@ -154,6 +155,9 @@ public class CalcitePrepareImpl implements CalcitePrepare {
/** Whether the enumerable convention is enabled. */
public static final boolean ENABLE_ENUMERABLE = true;
+ /** Whether the streaming is enabled. */
+ public static final boolean ENABLE_STREAM = true;
+
private static final Set<String> SIMPLE_SQLS =
ImmutableSet.of(
"SELECT 1",
@@ -338,6 +342,12 @@ public class CalcitePrepareImpl implements CalcitePrepare {
EnumerableBindable.EnumerableToBindableConverterRule.INSTANCE);
}
+ if (ENABLE_STREAM) {
+ for (RelOptRule rule : StreamRules.RULES) {
+ planner.addRule(rule);
+ }
+ }
+
// Change the below to enable constant-reduction.
if (false) {
for (RelOptRule rule : CONSTANT_REDUCTION_RULES) {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
index e47ea5b..8925ede 100644
--- a/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/RelOptTableImpl.java
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
@@ -35,9 +36,11 @@ import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.sql.SqlAccessType;
+import org.apache.calcite.sql.validate.SqlModality;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
@@ -125,6 +128,9 @@ public class RelOptTableImpl implements Prepare.PreparingTable {
table.getClass());
}
};
+ } else if (table instanceof StreamableTable) {
+ return getClassExpressionFunction(tableEntry,
+ ((StreamableTable) table).stream());
} else {
return new Function<Class, Expression>() {
public Expression apply(Class input) {
@@ -243,14 +249,53 @@ public class RelOptTableImpl implements Prepare.PreparingTable {
return rowType;
}
+ public boolean supportsModality(SqlModality modality) {
+ switch (modality) {
+ case STREAM:
+ return table instanceof StreamableTable;
+ default:
+ return !(table instanceof StreamableTable);
+ }
+ }
+
public List<String> getQualifiedName() {
return names;
}
public SqlMonotonicity getMonotonicity(String columnName) {
+ final int i = rowType.getFieldNames().indexOf(columnName);
+ if (i >= 0) {
+ for (RelCollation collation : table.getStatistic().getCollations()) {
+ final RelFieldCollation fieldCollation =
+ collation.getFieldCollations().get(0);
+ if (fieldCollation.getFieldIndex() == i) {
+ return monotonicity(fieldCollation.direction);
+ }
+ }
+ }
return SqlMonotonicity.NOT_MONOTONIC;
}
+ /** Converts a {@link org.apache.calcite.rel.RelFieldCollation.Direction}
+ * value to a {@link org.apache.calcite.sql.validate.SqlMonotonicity}. */
+ private static SqlMonotonicity
+ monotonicity(RelFieldCollation.Direction direction) {
+ switch (direction) {
+ case ASCENDING:
+ return SqlMonotonicity.INCREASING;
+ case STRICTLY_ASCENDING:
+ return SqlMonotonicity.STRICTLY_INCREASING;
+ case DESCENDING:
+ return SqlMonotonicity.DECREASING;
+ case STRICTLY_DESCENDING:
+ return SqlMonotonicity.STRICTLY_DECREASING;
+ case CLUSTERED:
+ return SqlMonotonicity.MONOTONIC;
+ default:
+ throw new AssertionError("unknown: " + direction);
+ }
+ }
+
public SqlAccessType getAllowedAccess() {
return SqlAccessType.ALL;
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/core/Sort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Sort.java b/core/src/main/java/org/apache/calcite/rel/core/Sort.java
index 06f9299..022479b 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Sort.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Sort.java
@@ -45,7 +45,7 @@ import java.util.List;
public abstract class Sort extends SingleRel {
//~ Instance fields --------------------------------------------------------
- protected final RelCollation collation;
+ public final RelCollation collation;
protected final ImmutableList<RexNode> fieldExps;
public final RexNode offset;
public final RexNode fetch;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/Chi.java b/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
new file mode 100644
index 0000000..36e03c0
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/Chi.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+
+/**
+ * Relational operator that converts a stream to a relation.
+ *
+ * <p>Chi is named for the Greek letter χ and pronounced 'kai'.
+ *
+ * <p>Chi is the inverse of {@link Delta}. For any relation {@code R},
+ * Chi(Delta(R)) == R.
+ */
+public class Chi extends SingleRel {
+ protected Chi(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ super(cluster, traits, input);
+ }
+}
+
+// End Chi.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/Delta.java b/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
new file mode 100644
index 0000000..ce1417c
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/Delta.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.TableScan;
+
+/**
+ * Relational operator that converts a relation to a stream.
+ *
+ * <p>For example, if {@code Orders} is a table, and {@link TableScan}(Orders)
+ * is a relational operator that returns the current contents of the table,
+ * then {@link Delta}(TableScan(Orders)) is a relational operator that returns
+ * all inserts into the table.
+ *
+ * <p>If unrestricted, Delta returns all previous inserts into the table (from
+ * time -∞ to now) and all future inserts into the table (from now
+ * to +∞) and never terminates.
+ */
+public abstract class Delta extends SingleRel {
+ protected Delta(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ super(cluster, traits, input);
+ }
+
+ /** Creates a Delta by parsing serialized output. */
+ protected Delta(RelInput input) {
+ this(input.getCluster(), input.getTraitSet(), input.getInput());
+ }
+}
+
+// End Delta.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java b/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
new file mode 100644
index 0000000..fad29e7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/LogicalChi.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Sub-class of {@link Chi}
+ * not targeted at any particular engine or calling convention.
+ */
+public final class LogicalChi extends Chi {
+ public LogicalChi(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+ super(cluster, traits, input);
+ }
+}
+
+// End LogicalChi.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java b/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
new file mode 100644
index 0000000..05de63b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/LogicalDelta.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.List;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.stream.Delta}
+ * not targeted at any particular engine or calling convention.
+ */
+public final class LogicalDelta extends Delta {
+ /**
+ * Creates a LogicalDelta.
+ *
+ * <p>Use {@link #create} unless you know what you're doing.
+ *
+ * @param cluster Cluster that this relational expression belongs to
+ * @param input Input relational expression
+ */
+ public LogicalDelta(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input) {
+ super(cluster, traits, input);
+ }
+
+ /** Creates a LogicalDelta by parsing serialized output. */
+ public LogicalDelta(RelInput input) {
+ super(input);
+ }
+
+ /** Creates a LogicalDelta. */
+ public static LogicalDelta create(RelNode input) {
+ final RelTraitSet traitSet = input.getTraitSet().replace(Convention.NONE);
+ return new LogicalDelta(input.getCluster(), traitSet, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new LogicalDelta(getCluster(), traitSet, sole(inputs));
+ }
+}
+
+// End LogicalDelta.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
new file mode 100644
index 0000000..28b9972
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Rules and relational operators for streaming relational expressions.
+ */
+public class StreamRules {
+ private StreamRules() {}
+
+ public static final ImmutableList<RelOptRule> RULES =
+ ImmutableList.of(
+ new DeltaProjectTransposeRule(),
+ new DeltaFilterTransposeRule(),
+ new DeltaAggregateTransposeRule(),
+ new DeltaSortTransposeRule(),
+ new DeltaUnionTransposeRule(),
+ new DeltaTableScanRule());
+
+ /** Planner rule that pushes a {@link Delta} through a {@link Project}. */
+ public static class DeltaProjectTransposeRule extends RelOptRule {
+ private DeltaProjectTransposeRule() {
+ super(
+ operand(Delta.class,
+ operand(Project.class, any())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ Util.discard(delta);
+ final Project project = call.rel(1);
+ final LogicalDelta newDelta = LogicalDelta.create(project.getInput());
+ final LogicalProject newProject =
+ LogicalProject.create(newDelta, project.getProjects(),
+ project.getRowType().getFieldNames());
+ call.transformTo(newProject);
+ }
+ }
+
+ /** Planner rule that pushes a {@link Delta} through a {@link Filter}. */
+ public static class DeltaFilterTransposeRule extends RelOptRule {
+ private DeltaFilterTransposeRule() {
+ super(
+ operand(Delta.class,
+ operand(Filter.class, any())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ Util.discard(delta);
+ final Filter filter = call.rel(1);
+ final LogicalDelta newDelta = LogicalDelta.create(filter.getInput());
+ final LogicalFilter newFilter =
+ LogicalFilter.create(newDelta, filter.getCondition());
+ call.transformTo(newFilter);
+ }
+ }
+
+ /** Planner rule that pushes a {@link Delta} through an {@link Aggregate}. */
+ public static class DeltaAggregateTransposeRule extends RelOptRule {
+ private DeltaAggregateTransposeRule() {
+ super(
+ operand(Delta.class,
+ operand(Aggregate.class, any())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ Util.discard(delta);
+ final Aggregate aggregate = call.rel(1);
+ final LogicalDelta newDelta =
+ LogicalDelta.create(aggregate.getInput());
+ final LogicalAggregate newAggregate =
+ LogicalAggregate.create(newDelta, aggregate.indicator,
+ aggregate.getGroupSet(), aggregate.groupSets,
+ aggregate.getAggCallList());
+ call.transformTo(newAggregate);
+ }
+ }
+
+ /** Planner rule that pushes a {@link Delta} through an {@link Sort}. */
+ public static class DeltaSortTransposeRule extends RelOptRule {
+ private DeltaSortTransposeRule() {
+ super(
+ operand(Delta.class,
+ operand(Sort.class, any())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ Util.discard(delta);
+ final Sort sort = call.rel(1);
+ final LogicalDelta newDelta =
+ LogicalDelta.create(sort.getInput());
+ final LogicalSort newSort =
+ LogicalSort.create(newDelta, sort.collation, sort.offset, sort.fetch);
+ call.transformTo(newSort);
+ }
+ }
+
+ /** Planner rule that pushes a {@link Delta} through an {@link Union}. */
+ public static class DeltaUnionTransposeRule extends RelOptRule {
+ private DeltaUnionTransposeRule() {
+ super(
+ operand(Delta.class,
+ operand(Union.class, any())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ Util.discard(delta);
+ final Union union = call.rel(1);
+ final List<RelNode> newInputs = Lists.newArrayList();
+ for (RelNode input : union.getInputs()) {
+ final LogicalDelta newDelta =
+ LogicalDelta.create(input);
+ newInputs.add(newDelta);
+ }
+ final LogicalUnion newUnion = LogicalUnion.create(newInputs, union.all);
+ call.transformTo(newUnion);
+ }
+ }
+
+ /** Planner rule that pushes a {@link Delta} into a {@link TableScan} of a
+ * {@link org.apache.calcite.schema.StreamableTable}.
+ *
+ * <p>Very likely, the stream was only represented as a table for uniformity
+ * with the other relations in the system. The Delta disappears and the stream
+ * can be implemented directly. */
+ public static class DeltaTableScanRule extends RelOptRule {
+ private DeltaTableScanRule() {
+ super(
+ operand(Delta.class,
+ operand(TableScan.class, none())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ final TableScan scan = call.rel(1);
+ final RelOptCluster cluster = delta.getCluster();
+ final RelOptTable relOptTable = scan.getTable();
+ final StreamableTable streamableTable =
+ relOptTable.unwrap(StreamableTable.class);
+ if (streamableTable != null) {
+ final Table table1 = streamableTable.stream();
+ final RelOptTable relOptTable2 =
+ RelOptTableImpl.create(relOptTable.getRelOptSchema(),
+ relOptTable.getRowType(), table1);
+ final LogicalTableScan newScan =
+ LogicalTableScan.create(cluster, relOptTable2);
+ call.transformTo(newScan);
+ }
+ }
+ }
+}
+
+// End StreamRules.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/package-info.java b/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
new file mode 100644
index 0000000..12c680a
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/stream/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines relational expressions for streaming.
+ *
+ * <h2>Related packages and classes</h2>
+ * <ul>
+ *
+ * <li>Package <code>
+ * <a href="../core/package-summary.html">org.apache.calcite.rel.core</a></code>
+ * contains core relational expressions
+ *
+ * </ul>
+ */
+@PackageMarker
+package org.apache.calcite.rel.stream;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 842ed7d..245d00a 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -549,6 +549,24 @@ public interface CalciteResource {
@BaseMessage("FilterableTable.scan must not return null")
ExInst<CalciteException> filterableTableScanReturnedNull();
+
+ @BaseMessage("Cannot convert table ''{0}'' to stream")
+ ExInst<SqlValidatorException> cannotConvertToStream(String tableName);
+
+ @BaseMessage("Cannot convert stream ''{0}'' to relation")
+ ExInst<SqlValidatorException> cannotConvertToRelation(String tableName);
+
+ @BaseMessage("Streaming aggregation requires at least one monotonic expression in GROUP BY clause")
+ ExInst<SqlValidatorException> streamMustGroupByMonotonic();
+
+ @BaseMessage("Streaming ORDER BY must start with monotonic expression")
+ ExInst<SqlValidatorException> streamMustOrderByMonotonic();
+
+ @BaseMessage("Set operator cannot combine streaming and non-streaming inputs")
+ ExInst<SqlValidatorException> streamSetOpInconsistentInputs();
+
+ @BaseMessage("Cannot stream VALUES")
+ ExInst<SqlValidatorException> cannotStreamValues();
}
// End CalciteResource.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
index fabdf09..77e6827 100644
--- a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
+++ b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
@@ -687,6 +687,116 @@ public class SqlFunctions {
return bigDecimals[1];
}
+ // FLOOR
+
+ public static double floor(double b0) {
+ return Math.floor(b0);
+ }
+
+ public static BigDecimal floor(BigDecimal b0) {
+ return b0.setScale(0, BigDecimal.ROUND_FLOOR);
+ }
+
+ /** SQL <code>FLOOR</code> operator applied to byte values. */
+ public static byte floor(byte b0, byte b1) {
+ return (byte) floor((int) b0, (int) b1);
+ }
+
+ /** SQL <code>FLOOR</code> operator applied to short values. */
+ public static short floor(short b0, short b1) {
+ return (short) floor((int) b0, (int) b1);
+ }
+
+ /** SQL <code>FLOOR</code> operator applied to int values. */
+ public static int floor(int b0, int b1) {
+ int r = b0 % b1;
+ if (r < 0) {
+ r += b1;
+ }
+ return b0 - r;
+ }
+
+ /** SQL <code>FLOOR</code> operator applied to long values. */
+ public static long floor(long b0, long b1) {
+ long r = b0 % b1;
+ if (r < 0) {
+ r += b1;
+ }
+ return b0 - r;
+ }
+
+ // temporary
+ public static BigDecimal floor(BigDecimal b0, int b1) {
+ return floor(b0, BigDecimal.valueOf(b1));
+ }
+
+ // temporary
+ public static int floor(int b0, BigDecimal b1) {
+ return floor(b0, b1.intValue());
+ }
+
+ public static BigDecimal floor(BigDecimal b0, BigDecimal b1) {
+ final BigDecimal[] bigDecimals = b0.divideAndRemainder(b1);
+ BigDecimal r = bigDecimals[1];
+ if (r.signum() < 0) {
+ r = r.add(b1);
+ }
+ return b0.subtract(r);
+ }
+
+ // CEIL
+
+ public static double ceil(double b0) {
+ return Math.ceil(b0);
+ }
+
+ public static BigDecimal ceil(BigDecimal b0) {
+ return b0.setScale(0, BigDecimal.ROUND_CEILING);
+ }
+
+ /** SQL <code>CEIL</code> operator applied to byte values. */
+ public static byte ceil(byte b0, byte b1) {
+ return floor((byte) (b0 + b1 - 1), b1);
+ }
+
+ /** SQL <code>CEIL</code> operator applied to short values. */
+ public static short ceil(short b0, short b1) {
+ return floor((short) (b0 + b1 - 1), b1);
+ }
+
+ /** SQL <code>CEIL</code> operator applied to int values. */
+ public static int ceil(int b0, int b1) {
+ int r = b0 % b1;
+ if (r > 0) {
+ r -= b1;
+ }
+ return b0 - r;
+ }
+
+ /** SQL <code>CEIL</code> operator applied to long values. */
+ public static long ceil(long b0, long b1) {
+ return floor(b0 + b1 - 1, b1);
+ }
+
+ // temporary
+ public static BigDecimal ceil(BigDecimal b0, int b1) {
+ return ceil(b0, BigDecimal.valueOf(b1));
+ }
+
+ // temporary
+ public static int ceil(int b0, BigDecimal b1) {
+ return ceil(b0, b1.intValue());
+ }
+
+ public static BigDecimal ceil(BigDecimal b0, BigDecimal b1) {
+ final BigDecimal[] bigDecimals = b0.divideAndRemainder(b1);
+ BigDecimal r = bigDecimals[1];
+ if (r.signum() > 0) {
+ r = r.subtract(b1);
+ }
+ return b0.subtract(r);
+ }
+
// ABS
/** SQL <code>ABS</code> operator applied to byte values. */
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/schema/Schema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/Schema.java b/core/src/main/java/org/apache/calcite/schema/Schema.java
index ecda690..065ec5d 100644
--- a/core/src/main/java/org/apache/calcite/schema/Schema.java
+++ b/core/src/main/java/org/apache/calcite/schema/Schema.java
@@ -181,6 +181,9 @@ public interface Schema {
* <p>Used by Apache Phoenix, and others. Must have a single BIGINT column
* called "$seq". */
SEQUENCE,
+
+ /** Stream. */
+ STREAM,
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/StreamableTable.java b/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
new file mode 100644
index 0000000..a9dcdf8
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/schema/StreamableTable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schema;
+
+import org.apache.calcite.rel.stream.Delta;
+
+/**
+ * Table that can be converted to a stream.
+ *
+ * @see Delta
+ */
+public interface StreamableTable extends Table {
+ /** Returns an enumerator over the rows in this Table. Each row is represented
+ * as an array of its column values. */
+ Table stream();
+}
+
+// End StreamableTable.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/SqlKind.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlKind.java b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
index 2f3cf26..5fa715e 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlKind.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
@@ -493,6 +493,16 @@ public enum SqlKind {
CURRENT_VALUE,
/**
+ * The "FLOOR" function
+ */
+ FLOOR,
+
+ /**
+ * The "CEIL" function
+ */
+ CEIL,
+
+ /**
* The "TRIM" function.
*/
TRIM,
@@ -639,7 +649,7 @@ public enum SqlKind {
EnumSet.complementOf(
EnumSet.of(
AS, DESCENDING, CUBE, ROLLUP, GROUPING_SETS, EXTEND,
- SELECT, JOIN, OTHER_FUNCTION, CAST, TRIM,
+ SELECT, JOIN, OTHER_FUNCTION, CAST, TRIM, FLOOR, CEIL,
LITERAL_CHAIN, JDBC_FN, PRECEDING, FOLLOWING, ORDER_BY,
NULLS_FIRST, NULLS_LAST, COLLECTION_TABLE, TABLESAMPLE,
WITH, WITH_ITEM));
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java b/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
index c016289..1ca4158 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSelectKeyword.java
@@ -23,7 +23,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
*/
public enum SqlSelectKeyword implements SqlLiteral.SqlSymbol {
DISTINCT,
- ALL;
+ ALL,
+ STREAM;
/**
* Creates a parse-tree node representing an occurrence of this keyword
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java b/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
index b8ddaf4..a7228af 100644
--- a/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
+++ b/core/src/main/java/org/apache/calcite/sql/advise/SqlAdvisorValidator.java
@@ -28,6 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.OverScope;
import org.apache.calcite.sql.validate.SqlConformance;
+import org.apache.calcite.sql.validate.SqlModality;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -194,6 +195,11 @@ public class SqlAdvisorValidator extends SqlValidatorImpl {
}
}
+ @Override public boolean validateModality(SqlSelect select,
+ SqlModality modality, boolean fail) {
+ return true;
+ }
+
protected boolean shouldAllowOverRelation() {
return true; // no reason not to be lenient
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
deleted file mode 100644
index a984c94..0000000
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlCeilFunction.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.sql.fun;
-
-import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-import org.apache.calcite.sql.validate.SqlValidatorScope;
-
-/**
- * Support for the CEIL/CEILING builtin function.
- */
-public class SqlCeilFunction extends SqlFunction {
- //~ Constructors -----------------------------------------------------------
-
- public SqlCeilFunction() {
- super(
- "CEIL",
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.ARG0,
- null,
- OperandTypes.NUMERIC,
- SqlFunctionCategory.NUMERIC);
- }
-
- //~ Methods ----------------------------------------------------------------
-
- public SqlMonotonicity getMonotonicity(SqlCall call,
- SqlValidatorScope scope) {
- return scope.getMonotonicity(call.operand(0));
- }
-}
-
-// End SqlCeilFunction.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
index a8b8535..385cfb5 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlCollectionTableOperator.java
@@ -20,6 +20,7 @@ import org.apache.calcite.sql.SqlFunctionalOperator;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.validate.SqlModality;
/**
* SqlCollectionTableOperator is the "table function derived table" operator. It
@@ -30,33 +31,19 @@ import org.apache.calcite.sql.type.ReturnTypes;
* {@link SqlStdOperatorTable#EXPLICIT_TABLE} is a prefix operator.
*/
public class SqlCollectionTableOperator extends SqlFunctionalOperator {
- //~ Static fields/initializers ---------------------------------------------
-
- public static final int MODALITY_RELATIONAL = 1;
- public static final int MODALITY_STREAM = 2;
-
- //~ Instance fields --------------------------------------------------------
-
- private final int modality;
+ private final SqlModality modality;
//~ Constructors -----------------------------------------------------------
- public SqlCollectionTableOperator(String name, int modality) {
- super(
- name,
- SqlKind.COLLECTION_TABLE,
- 200,
- true,
- ReturnTypes.ARG0,
- null,
+ public SqlCollectionTableOperator(String name, SqlModality modality) {
+ super(name, SqlKind.COLLECTION_TABLE, 200, true, ReturnTypes.ARG0, null,
OperandTypes.ANY);
-
this.modality = modality;
}
//~ Methods ----------------------------------------------------------------
- public int getModality() {
+ public SqlModality getModality() {
return modality;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
index 7e43262..1e870c3 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlExtractFunction.java
@@ -59,9 +59,9 @@ public class SqlExtractFunction extends SqlFunction {
int leftPrec,
int rightPrec) {
final SqlWriter.Frame frame = writer.startFunCall(getName());
- call.operand(0).unparse(writer, leftPrec, rightPrec);
+ call.operand(0).unparse(writer, 0, 0);
writer.sep("FROM");
- call.operand(1).unparse(writer, leftPrec, rightPrec);
+ call.operand(1).unparse(writer, 0, 0);
writer.endFunCall(frame);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
index bec351c..e9d147f 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlFloorFunction.java
@@ -17,28 +17,33 @@
package org.apache.calcite.sql.fun;
import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.sql.validate.SqlValidatorScope;
+import com.google.common.base.Preconditions;
+
/**
- * Definition of the "FLOOR" builtin SQL function.
+ * Definition of the "FLOOR" and "CEIL" built-in SQL functions.
*/
-public class SqlFloorFunction extends SqlFunction {
+public class SqlFloorFunction extends SqlMonotonicUnaryFunction {
//~ Constructors -----------------------------------------------------------
- public SqlFloorFunction() {
- super(
- "FLOOR",
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.ARG0,
- null,
- OperandTypes.NUMERIC,
+ public SqlFloorFunction(SqlKind kind) {
+ super(kind.name(), kind, ReturnTypes.ARG0_OR_EXACT_NO_SCALE, null,
+ OperandTypes.or(OperandTypes.NUMERIC_OR_INTERVAL,
+ OperandTypes.sequence(
+ "'" + kind + "(<DATE> TO <TIME_UNIT>)'\n"
+ + "'" + kind + "(<TIME> TO <TIME_UNIT>)'\n"
+ + "'" + kind + "(<TIMESTAMP> TO <TIME_UNIT>)'",
+ OperandTypes.DATETIME,
+ OperandTypes.ANY)),
SqlFunctionCategory.NUMERIC);
+ Preconditions.checkArgument(kind == SqlKind.FLOOR || kind == SqlKind.CEIL);
}
//~ Methods ----------------------------------------------------------------
@@ -49,6 +54,19 @@ public class SqlFloorFunction extends SqlFunction {
// Monotonic iff its first argument is, but not strict.
return scope.getMonotonicity(call.operand(0)).unstrict();
}
+
+ @Override public void unparse(SqlWriter writer, SqlCall call, int leftPrec,
+ int rightPrec) {
+ final SqlWriter.Frame frame = writer.startFunCall(getName());
+ if (call.operandCount() == 2) {
+ call.operand(0).unparse(writer, 0, 100);
+ writer.sep("TO");
+ call.operand(1).unparse(writer, 100, 0);
+ } else {
+ call.operand(0).unparse(writer, 0, 0);
+ }
+ writer.endFunCall(frame);
+ }
}
// End SqlFloorFunction.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 9e5644d..ef7133b 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlModality;
import com.google.common.collect.ImmutableList;
@@ -958,9 +959,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
* {@link #EXPLICIT_TABLE} is a prefix operator.
*/
public static final SqlSpecialOperator COLLECTION_TABLE =
- new SqlCollectionTableOperator(
- "TABLE",
- SqlCollectionTableOperator.MODALITY_RELATIONAL);
+ new SqlCollectionTableOperator("TABLE", SqlModality.RELATION);
public static final SqlOverlapsOperator OVERLAPS =
new SqlOverlapsOperator();
@@ -1208,26 +1207,12 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
/**
* The <code>FLOOR</code> function.
*/
- public static final SqlFunction FLOOR =
- new SqlMonotonicUnaryFunction(
- "FLOOR",
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.ARG0_OR_EXACT_NO_SCALE,
- null,
- OperandTypes.NUMERIC_OR_INTERVAL,
- SqlFunctionCategory.NUMERIC);
+ public static final SqlFunction FLOOR = new SqlFloorFunction(SqlKind.FLOOR);
/**
* The <code>CEIL</code> function.
*/
- public static final SqlFunction CEIL =
- new SqlMonotonicUnaryFunction(
- "CEIL",
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.ARG0_OR_EXACT_NO_SCALE,
- null,
- OperandTypes.NUMERIC_OR_INTERVAL,
- SqlFunctionCategory.NUMERIC);
+ public static final SqlFunction CEIL = new SqlFloorFunction(SqlKind.CEIL);
/**
* The <code>USER</code> function.
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java b/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
index e5bb0ce..712c3c4 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/CompositeOperandTypeChecker.java
@@ -18,15 +18,15 @@ package org.apache.calcite.sql.type;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.util.Util;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.AbstractList;
import java.util.List;
+import javax.annotation.Nullable;
/**
* This class allows multiple existing {@link SqlOperandTypeChecker} rules to be
@@ -75,8 +75,7 @@ import java.util.List;
* SqlSingleOperandTypeChecker, and signature generation is not supported. For
* AND composition, only the first rule is used for signature generation.
*/
-public class CompositeOperandTypeChecker
- implements SqlSingleOperandTypeChecker {
+public class CompositeOperandTypeChecker implements SqlOperandTypeChecker {
//~ Enums ------------------------------------------------------------------
/** How operands are composed. */
@@ -86,8 +85,9 @@ public class CompositeOperandTypeChecker
//~ Instance fields --------------------------------------------------------
- private final ImmutableList<SqlSingleOperandTypeChecker> allowedRules;
- private final Composition composition;
+ protected final ImmutableList<? extends SqlOperandTypeChecker> allowedRules;
+ protected final Composition composition;
+ private final String allowedSignatures;
//~ Constructors -----------------------------------------------------------
@@ -97,25 +97,30 @@ public class CompositeOperandTypeChecker
*/
CompositeOperandTypeChecker(
Composition composition,
- ImmutableList<SqlSingleOperandTypeChecker> allowedRules) {
- assert null != allowedRules;
+ ImmutableList<? extends SqlOperandTypeChecker> allowedRules,
+ @Nullable String allowedSignatures) {
+ this.allowedRules = Preconditions.checkNotNull(allowedRules);
+ this.composition = Preconditions.checkNotNull(composition);
+ this.allowedSignatures = allowedSignatures;
assert allowedRules.size() > 1;
- this.allowedRules = allowedRules;
- this.composition = composition;
}
//~ Methods ----------------------------------------------------------------
- public ImmutableList<SqlSingleOperandTypeChecker> getRules() {
+ public ImmutableList<? extends SqlOperandTypeChecker> getRules() {
return allowedRules;
}
public String getAllowedSignatures(SqlOperator op, String opName) {
+ if (allowedSignatures != null) {
+ return allowedSignatures;
+ }
if (composition == Composition.SEQUENCE) {
- throw Util.needToImplement("must override getAllowedSignatures");
+ throw new AssertionError(
+ "specify allowedSignatures or override getAllowedSignatures");
}
StringBuilder ret = new StringBuilder();
- for (Ord<SqlSingleOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+ for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
if (ord.i > 0) {
ret.append(SqlOperator.NL);
}
@@ -213,134 +218,66 @@ public class CompositeOperandTypeChecker
return max;
}
- public boolean checkSingleOperandType(
+ public boolean checkOperandTypes(
SqlCallBinding callBinding,
- SqlNode node,
- int iFormalOperand,
boolean throwOnFailure) {
- assert allowedRules.size() >= 1;
-
- if (composition == Composition.SEQUENCE) {
- return allowedRules.get(iFormalOperand).checkSingleOperandType(
- callBinding, node, 0, throwOnFailure);
- }
-
- int typeErrorCount = 0;
-
- boolean throwOnAndFailure =
- (composition == Composition.AND)
- && throwOnFailure;
-
- for (SqlSingleOperandTypeChecker rule : allowedRules) {
- if (!rule.checkSingleOperandType(
- callBinding,
- node,
- iFormalOperand,
- throwOnAndFailure)) {
- typeErrorCount++;
- }
+ if (check(callBinding)) {
+ return true;
}
-
- boolean ret;
- switch (composition) {
- case AND:
- ret = typeErrorCount == 0;
- break;
- case OR:
- ret = typeErrorCount < allowedRules.size();
- break;
- default:
- // should never come here
- throw Util.unexpected(composition);
+ if (!throwOnFailure) {
+ return false;
}
-
- if (!ret && throwOnFailure) {
- // In the case of a composite OR, we want to throw an error
- // describing in more detail what the problem was, hence doing the
- // loop again.
- for (SqlSingleOperandTypeChecker rule : allowedRules) {
- rule.checkSingleOperandType(
- callBinding,
- node,
- iFormalOperand,
- true);
+ if (composition == Composition.OR) {
+ for (SqlOperandTypeChecker allowedRule : allowedRules) {
+ allowedRule.checkOperandTypes(callBinding, true);
}
-
- // If no exception thrown, just throw a generic validation signature
- // error.
- throw callBinding.newValidationSignatureError();
}
- return ret;
+ // If no exception thrown, just throw a generic validation
+ // signature error.
+ throw callBinding.newValidationSignatureError();
}
- public boolean checkOperandTypes(
- SqlCallBinding callBinding,
- boolean throwOnFailure) {
- int typeErrorCount = 0;
-
- label:
- for (Ord<SqlSingleOperandTypeChecker> ord : Ord.zip(allowedRules)) {
- SqlSingleOperandTypeChecker rule = ord.e;
-
- switch (composition) {
- case SEQUENCE:
- if (ord.i >= callBinding.getOperandCount()) {
- break label;
- }
- if (!rule.checkSingleOperandType(
+ private boolean check(SqlCallBinding callBinding) {
+ switch (composition) {
+ case SEQUENCE:
+ if (callBinding.getOperandCount() != allowedRules.size()) {
+ return false;
+ }
+ for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+ SqlOperandTypeChecker rule = ord.e;
+ if (!((SqlSingleOperandTypeChecker) rule).checkSingleOperandType(
callBinding,
callBinding.getCall().operand(ord.i),
0,
false)) {
- typeErrorCount++;
+ return false;
}
- break;
- default:
+ }
+ return true;
+
+ case AND:
+ for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+ SqlOperandTypeChecker rule = ord.e;
if (!rule.checkOperandTypes(callBinding, false)) {
- typeErrorCount++;
- if (composition == Composition.AND) {
- // Avoid trying other rules in AND if the first one fails.
- break label;
- }
- } else if (composition == Composition.OR) {
- break label; // true OR any == true, just break
+ // Avoid trying other rules in AND if the first one fails.
+ return false;
}
- break;
}
- }
+ return true;
- boolean failed;
- switch (composition) {
- case AND:
- case SEQUENCE:
- failed = typeErrorCount > 0;
- break;
case OR:
- failed = typeErrorCount == allowedRules.size();
- break;
- default:
- throw new AssertionError();
- }
-
- if (failed) {
- if (throwOnFailure) {
- // In the case of a composite OR, we want to throw an error
- // describing in more detail what the problem was, hence doing
- // the loop again.
- if (composition == Composition.OR) {
- for (SqlOperandTypeChecker allowedRule : allowedRules) {
- allowedRule.checkOperandTypes(callBinding, true);
- }
+ for (Ord<SqlOperandTypeChecker> ord : Ord.zip(allowedRules)) {
+ SqlOperandTypeChecker rule = ord.e;
+ if (rule.checkOperandTypes(callBinding, false)) {
+ return true;
}
-
- // If no exception thrown, just throw a generic validation
- // signature error.
- throw callBinding.newValidationSignatureError();
}
return false;
+
+ default:
+ throw new AssertionError();
}
- return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java b/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
new file mode 100644
index 0000000..de8c303
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/type/CompositeSingleOperandTypeChecker.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Allows multiple
+ * {@link org.apache.calcite.sql.type.SqlSingleOperandTypeChecker} rules to be
+ * combined into one rule.
+ */
+public class CompositeSingleOperandTypeChecker
+ extends CompositeOperandTypeChecker
+ implements SqlSingleOperandTypeChecker {
+
+ //~ Constructors -----------------------------------------------------------
+
+ /**
+ * Package private. Use {@link org.apache.calcite.sql.type.OperandTypes#and},
+ * {@link org.apache.calcite.sql.type.OperandTypes#or}.
+ */
+ CompositeSingleOperandTypeChecker(
+ CompositeOperandTypeChecker.Composition composition,
+ ImmutableList<? extends SqlSingleOperandTypeChecker> allowedRules,
+ String allowedSignatures) {
+ super(composition, allowedRules, allowedSignatures);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ @Override public ImmutableList<? extends SqlSingleOperandTypeChecker>
+ getRules() {
+ return (ImmutableList<? extends SqlSingleOperandTypeChecker>) allowedRules;
+ }
+
+ public boolean checkSingleOperandType(
+ SqlCallBinding callBinding,
+ SqlNode node,
+ int iFormalOperand,
+ boolean throwOnFailure) {
+ assert allowedRules.size() >= 1;
+
+ final ImmutableList<? extends SqlSingleOperandTypeChecker> rules =
+ getRules();
+ if (composition == Composition.SEQUENCE) {
+ return rules.get(iFormalOperand).checkSingleOperandType(
+ callBinding, node, 0, throwOnFailure);
+ }
+
+ int typeErrorCount = 0;
+
+ boolean throwOnAndFailure =
+ (composition == Composition.AND)
+ && throwOnFailure;
+
+ for (SqlSingleOperandTypeChecker rule : rules) {
+ if (!rule.checkSingleOperandType(
+ callBinding,
+ node,
+ iFormalOperand,
+ throwOnAndFailure)) {
+ typeErrorCount++;
+ }
+ }
+
+ boolean ret;
+ switch (composition) {
+ case AND:
+ ret = typeErrorCount == 0;
+ break;
+ case OR:
+ ret = typeErrorCount < allowedRules.size();
+ break;
+ default:
+ // should never come here
+ throw Util.unexpected(composition);
+ }
+
+ if (!ret && throwOnFailure) {
+ // In the case of a composite OR, we want to throw an error
+ // describing in more detail what the problem was, hence doing the
+ // loop again.
+ for (SqlSingleOperandTypeChecker rule : rules) {
+ rule.checkSingleOperandType(
+ callBinding,
+ node,
+ iFormalOperand,
+ true);
+ }
+
+ // If no exception thrown, just throw a generic validation signature
+ // error.
+ throw callBinding.newValidationSignatureError();
+ }
+
+ return ret;
+ }
+}
+
+// End CompositeSingleOperandTypeChecker.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ecd8702/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
index c70d403..10bccbd 100644
--- a/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
+++ b/core/src/main/java/org/apache/calcite/sql/type/OperandTypes.java
@@ -72,21 +72,52 @@ public abstract class OperandTypes {
/**
* Creates a checker that passes if any one of the rules passes.
*/
+ public static SqlOperandTypeChecker or(SqlOperandTypeChecker... rules) {
+ return new CompositeOperandTypeChecker(
+ CompositeOperandTypeChecker.Composition.OR,
+ ImmutableList.copyOf(rules), null);
+ }
+
+ /**
+ * Creates a single-operand checker that passes if any one of the rules
+ * passes.
+ */
+ public static SqlOperandTypeChecker and(SqlOperandTypeChecker... rules) {
+ return new CompositeOperandTypeChecker(
+ CompositeOperandTypeChecker.Composition.AND,
+ ImmutableList.copyOf(rules), null);
+ }
+
+ /**
+ * Creates a single-operand checker that passes if any one of the rules
+ * passes.
+ */
public static SqlSingleOperandTypeChecker or(
SqlSingleOperandTypeChecker... rules) {
- return new CompositeOperandTypeChecker(
+ return new CompositeSingleOperandTypeChecker(
CompositeOperandTypeChecker.Composition.OR,
- ImmutableList.copyOf(rules));
+ ImmutableList.copyOf(rules), null);
}
/**
- * Creates a checker that passes if any one of the rules passes.
+ * Creates a single-operand checker that passes if any one of the rules
+ * passes.
*/
public static SqlSingleOperandTypeChecker and(
SqlSingleOperandTypeChecker... rules) {
- return new CompositeOperandTypeChecker(
+ return new CompositeSingleOperandTypeChecker(
CompositeOperandTypeChecker.Composition.AND,
- ImmutableList.copyOf(rules));
+ ImmutableList.copyOf(rules), null);
+ }
+
+ /**
+ * Creates an operand checker from a sequence of single-operand checkers.
+ */
+ public static SqlOperandTypeChecker sequence(String allowedSignatures,
+ SqlSingleOperandTypeChecker... rules) {
+ return new CompositeOperandTypeChecker(
+ CompositeOperandTypeChecker.Composition.SEQUENCE,
+ ImmutableList.copyOf(rules), allowedSignatures);
}
// ----------------------------------------------------------------------
@@ -111,7 +142,7 @@ public abstract class OperandTypes {
public static final SqlOperandTypeChecker ONE_OR_MORE =
variadic(SqlOperandCountRanges.from(1));
- private static SqlOperandTypeChecker variadic(
+ public static SqlOperandTypeChecker variadic(
final SqlOperandCountRange range) {
return new SqlOperandTypeChecker() {
public boolean checkOperandTypes(