You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by da...@apache.org on 2020/08/18 08:47:39 UTC
[calcite] 02/02: [CALCITE-4171] Support named parameters for table
window functions
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
commit ebbb7bb88eb5a732ff954790373fe8d62eb5b8ab
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Mon Aug 17 13:44:41 2020 +0800
[CALCITE-4171] Support named parameters for table window functions
[CALCITE-4171] Support named parameters for table window functions
* Changes SqlArgumentAssignmentOperator to allow non-scala query as
operand
* In SqlCallBinding, matches the permuted operand by name with name
matcher
* Refactor SqlWindowTableFunction and its sub-class to reuse same
logic
* Do not patch up the SqlWindowTableFunction with DEFAULTs when sql
validation
---
.../calcite/adapter/enumerable/EnumUtils.java | 1 -
.../apache/calcite/runtime/CalciteResource.java | 4 +
.../org/apache/calcite/sql/SqlCallBinding.java | 45 +++++++--
.../apache/calcite/sql/SqlHopTableFunction.java | 88 ++++++++--------
.../calcite/sql/SqlSessionTableFunction.java | 78 ++++++++------
.../apache/calcite/sql/SqlTumbleTableFunction.java | 82 ++++++++-------
.../apache/calcite/sql/SqlWindowTableFunction.java | 77 +++++++++++++-
.../sql/fun/SqlArgumentAssignmentOperator.java | 4 +
.../calcite/sql/validate/SqlValidatorImpl.java | 8 +-
.../calcite/runtime/CalciteResource.properties | 1 +
.../apache/calcite/test/SqlToRelConverterTest.java | 64 ++++++++++++
.../org/apache/calcite/test/SqlValidatorTest.java | 98 ++++++++++++++++--
.../apache/calcite/test/SqlToRelConverterTest.xml | 112 +++++++++++++++++++++
core/src/test/resources/sql/stream.iq | 61 +++++++++++
site/_docs/reference.md | 63 ++++++++++--
15 files changed, 646 insertions(+), 140 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index 87fd196..ae9455f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -804,7 +804,6 @@ public class EnumUtils {
expressions.add(expression);
}
final Expression wmColExprToLong = EnumUtils.convert(wmColExpr, long.class);
- final Expression shiftExpr = Expressions.constant(1, long.class);
// Find the fixed window for a timestamp given a window size and an offset, and return the
// window start.
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 7987546..3aaec91 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -220,6 +220,10 @@ public interface CalciteResource {
@BaseMessage("Column ''{0}'' is ambiguous")
ExInst<SqlValidatorException> columnAmbiguous(String a0);
+ @BaseMessage("Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''?")
+ ExInst<SqlValidatorException> paramNotFoundInFunctionDidYouMean(String a0,
+ String a1, String a2);
+
@BaseMessage("Operand {0} must be a query")
ExInst<SqlValidatorException> needQueryOp(String a0);
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java b/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
index d944812..ea6dad8 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java
@@ -27,6 +27,7 @@ import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SelectScope;
import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
@@ -34,6 +35,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -161,17 +163,42 @@ public class SqlCallBinding extends SqlOperatorBinding {
* formal parameters of the function. */
private List<SqlNode> permutedOperands(final SqlCall call) {
final SqlFunction operator = (SqlFunction) call.getOperator();
- return Lists.transform(operator.getParamNames(), paramName -> {
- for (SqlNode operand2 : call.getOperandList()) {
- final SqlCall call2 = (SqlCall) operand2;
- assert operand2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT;
- final SqlIdentifier id = call2.operand(1);
- if (id.getSimple().equals(paramName)) {
- return call2.operand(0);
+ final List<String> paramNames = operator.getParamNames();
+ final List<SqlNode> permuted = new ArrayList<>();
+ final SqlNameMatcher nameMatcher = validator.getCatalogReader().nameMatcher();
+ for (final String paramName : paramNames) {
+ Pair<String, SqlIdentifier> args = null;
+ for (int j = 0; j < call.getOperandList().size(); j++) {
+ final SqlCall call2 = call.operand(j);
+ assert call2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT;
+ final SqlIdentifier operandID = call2.operand(1);
+ final String operandName = operandID.getSimple();
+ if (nameMatcher.matches(operandName, paramName)) {
+ permuted.add(call2.operand(0));
+ break;
+ } else if (args == null
+ && nameMatcher.isCaseSensitive()
+ && operandName.equalsIgnoreCase(paramName)) {
+ args = Pair.of(paramName, operandID);
+ }
+ // the last operand, there is still no match.
+ if (j == call.getOperandList().size() - 1) {
+ if (args != null) {
+ throw SqlUtil.newContextException(args.right.getParserPosition(),
+ RESOURCE.paramNotFoundInFunctionDidYouMean(args.right.getSimple(),
+ call.getOperator().getName(), args.left));
+ }
+ if (!(operator instanceof SqlWindowTableFunction)) {
+ // Not like user defined functions, we do not patch up the operands
+ // with DEFAULT and then convert to nulls during sql-to-rel conversion.
+ // Thus, there is no need to show the optional operands in the plan and
+ // decide if the optional operand is null when code generation.
+ permuted.add(DEFAULT_CALL);
+ }
}
}
- return DEFAULT_CALL;
- });
+ }
+ return permuted;
}
/**
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
index f5936ae..816140b 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -16,60 +16,68 @@
*/
package org.apache.calcite.sql;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeUtil;
-import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
/**
- * SqlHopTableFunction implements an operator for hopping. It allows four parameters:
- * 1. a table;
- * 2. a descriptor to provide a watermarked column name from the input table;
- * 3. an interval parameter to specify the length of window shifting;
- * 4. an interval parameter to specify the length of window size.
+ * SqlHopTableFunction implements an operator for hopping.
+ *
+ * <p>It allows four parameters:
+ *
+ * <ol>
+ * <li>a table</li>
+ * <li>a descriptor to provide a watermarked column name from the input table</li>
+ * <li>an interval parameter to specify the length of window shifting</li>
+ * <li>an interval parameter to specify the length of window size</li>
+ * </ol>
*/
public class SqlHopTableFunction extends SqlWindowTableFunction {
public SqlHopTableFunction() {
- super(SqlKind.HOP.name());
+ super(SqlKind.HOP.name(), OperandTypeCheckerImpl.INSTANCE);
}
- @Override public SqlOperandCountRange getOperandCountRange() {
- return SqlOperandCountRanges.between(4, 5);
+ @Override public List<String> getParamNames() {
+ return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SLIDE, PARAM_SIZE, PARAM_OFFSET);
}
- @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
- boolean throwOnFailure) {
- final SqlNode operand0 = callBinding.operand(0);
- final SqlValidator validator = callBinding.getValidator();
- final RelDataType type = validator.getValidatedNodeType(operand0);
- if (type.getSqlTypeName() != SqlTypeName.ROW) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
- }
- final SqlNode operand1 = callBinding.operand(1);
- if (operand1.getKind() != SqlKind.DESCRIPTOR) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /** Operand type checker for HOP. */
+ private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
+ static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();
+
+ @Override public boolean checkOperandTypes(
+ SqlCallBinding callBinding, boolean throwOnFailure) {
+ if (!validateTableWithFollowingDescriptors(callBinding, 1)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ if (!validateTailingIntervals(callBinding, 2)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ return true;
}
- validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
- final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
- if (!SqlTypeUtil.isInterval(type2)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.between(4, 5);
}
- final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
- if (!SqlTypeUtil.isInterval(type3)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public String getAllowedSignatures(SqlOperator op, String opName) {
+ return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
+ + "datetime interval, datetime interval[, datetime interval])";
}
- if (callBinding.getOperandCount() > 4) {
- final RelDataType type4 = validator.getValidatedNodeType(callBinding.operand(4));
- if (!SqlTypeUtil.isInterval(type4)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
- }
+
+ @Override public Consistency getConsistency() {
+ return Consistency.NONE;
}
- return true;
- }
- @Override public String getAllowedSignatures(String opNameToUse) {
- return getName() + "(TABLE table_name, DESCRIPTOR(col), "
- + "datetime interval, datetime interval[, datetime interval])";
+ @Override public boolean isOptional(int i) {
+ return i == 4;
+ }
}
}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
index fef50cc..61a2ba6 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
@@ -18,54 +18,70 @@ package org.apache.calcite.sql;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.SqlValidator;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
/**
* SqlSessionTableFunction implements an operator for per-key sessionization. It allows
* four parameters:
- * 1. a table.
- * 2. a descriptor to provide a watermarked column name from the input table.
- * 3. a descriptor to provide a column as key, on which sessionization will be applied.
- * 4. an interval parameter to specify a inactive activity gap to break sessions.
+ *
+ * <ol>
+ * <li>table as data source</li>
+ * <li>a descriptor to provide a watermarked column name from the input table</li>
+ * <li>a descriptor to provide a column as key, on which sessionization will be applied</li>
+ * <li>an interval parameter to specify a inactive activity gap to break sessions</li>
+ * </ol>
*/
public class SqlSessionTableFunction extends SqlWindowTableFunction {
public SqlSessionTableFunction() {
- super(SqlKind.SESSION.name());
+ super(SqlKind.SESSION.name(), OperandTypeCheckerImpl.INSTANCE);
}
- @Override public SqlOperandCountRange getOperandCountRange() {
- return SqlOperandCountRanges.of(4);
+ @Override public List<String> getParamNames() {
+ return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_KEY, PARAM_SIZE);
}
- @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
- boolean throwOnFailure) {
- final SqlNode operand0 = callBinding.operand(0);
- final SqlValidator validator = callBinding.getValidator();
- final RelDataType type = validator.getValidatedNodeType(operand0);
- if (type.getSqlTypeName() != SqlTypeName.ROW) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /** Operand type checker for SESSION. */
+ private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
+ static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();
+
+ @Override public boolean checkOperandTypes(
+ SqlCallBinding callBinding, boolean throwOnFailure) {
+ final SqlValidator validator = callBinding.getValidator();
+ if (!validateTableWithFollowingDescriptors(callBinding, 2)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
+ if (!SqlTypeUtil.isInterval(type3)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ return true;
}
- final SqlNode operand1 = callBinding.operand(1);
- if (operand1.getKind() != SqlKind.DESCRIPTOR) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(4);
}
- validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
- final SqlNode operand2 = callBinding.operand(2);
- if (operand2.getKind() != SqlKind.DESCRIPTOR) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public String getAllowedSignatures(SqlOperator op, String opName) {
+ return opName + "(TABLE table_name, DESCRIPTOR(timecol), "
+ + "DESCRIPTOR(key), datetime interval)";
}
- validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand2).getOperandList());
- final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
- if (!SqlTypeUtil.isInterval(type3)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public Consistency getConsistency() {
+ return Consistency.NONE;
}
- return true;
- }
- @Override public String getAllowedSignatures(String opNameToUse) {
- return getName() + "(TABLE table_name, DESCRIPTOR(col), "
- + "DESCRIPTOR(col), datetime interval)";
+ @Override public boolean isOptional(int i) {
+ return false;
+ }
}
}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
index 75f746d..f32f13d 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
@@ -16,59 +16,73 @@
*/
package org.apache.calcite.sql;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeUtil;
-import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
/**
* SqlTumbleTableFunction implements an operator for tumbling.
*
* <p>It allows three parameters:
- * 1. a table;
- * 2. a descriptor to provide a watermarked column name from the input table;
- * 3. an interval parameter to specify the length of window size.
+ *
+ * <ol>
+ * <li>a table</li>
+ * <li>a descriptor to provide a watermarked column name from the input table</li>
+ * <li>an interval parameter to specify the length of window size</li>
+ * </ol>
*/
public class SqlTumbleTableFunction extends SqlWindowTableFunction {
public SqlTumbleTableFunction() {
- super(SqlKind.TUMBLE.name());
+ super(SqlKind.TUMBLE.name(), OperandTypeCheckerImpl.INSTANCE);
}
@Override public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.between(3, 4);
}
- @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
- boolean throwOnFailure) {
- // There should only be three operands, and number of operands are checked before
- // this call.
- final SqlNode operand0 = callBinding.operand(0);
- final SqlValidator validator = callBinding.getValidator();
- final RelDataType type = validator.getValidatedNodeType(operand0);
- if (type.getSqlTypeName() != SqlTypeName.ROW) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ @Override public List<String> getParamNames() {
+ return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE, PARAM_OFFSET);
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /** Operand type checker for SESSION. */
+ private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker {
+ static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl();
+
+ @Override public boolean checkOperandTypes(
+ SqlCallBinding callBinding, boolean throwOnFailure) {
+ // There should only be three operands, and number of operands are checked before
+ // this call.
+ if (!validateTableWithFollowingDescriptors(callBinding, 1)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ if (!validateTailingIntervals(callBinding, 2)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ return true;
}
- final SqlNode operand1 = callBinding.operand(1);
- if (operand1.getKind() != SqlKind.DESCRIPTOR) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(4);
}
- validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
- final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
- if (!SqlTypeUtil.isInterval(type2)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+
+ @Override public String getAllowedSignatures(SqlOperator op, String opName) {
+ return opName + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval"
+ + "[, datetime interval])";
}
- if (callBinding.getOperandCount() > 3) {
- final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
- if (!SqlTypeUtil.isInterval(type3)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
- }
+
+ @Override public Consistency getConsistency() {
+ return Consistency.NONE;
}
- return true;
- }
- @Override public String getAllowedSignatures(String opNameToUse) {
- return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval"
- + "[, datetime interval])";
+ @Override public boolean isOptional(int i) {
+ return i == 3;
+ }
}
}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
index a1670c9..2b4dfd3 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
@@ -19,8 +19,10 @@ package org.apache.calcite.sql;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.SqlNameMatcher;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -34,6 +36,25 @@ import static org.apache.calcite.util.Static.RESOURCE;
*/
public class SqlWindowTableFunction extends SqlFunction
implements SqlTableFunction {
+
+ /** The data source which the table function computes with. */
+ protected static final String PARAM_DATA = "DATA";
+
+ /** The time attribute column. Also known as the event time. */
+ protected static final String PARAM_TIMECOL = "TIMECOL";
+
+ /** The window duration INTERVAL. */
+ protected static final String PARAM_SIZE = "SIZE";
+
+ /** The optional align offset for each window. */
+ protected static final String PARAM_OFFSET = "OFFSET";
+
+ /** The session key(s), only used for SESSION window. */
+ protected static final String PARAM_KEY = "KEY";
+
+ /** The slide interval, only used for HOP window. */
+ protected static final String PARAM_SLIDE = "SLIDE";
+
/**
* Type-inference strategy whereby the row type of a table function call is a
* ROW, which is combined from the row type of operand #0 (which is a TABLE)
@@ -48,16 +69,16 @@ public class SqlWindowTableFunction extends SqlFunction
SqlWindowTableFunction::inferRowType;
/** Creates a window table function with a given name. */
- public SqlWindowTableFunction(String name) {
- super(name, SqlKind.OTHER_FUNCTION, ReturnTypes.CURSOR, null, null,
- SqlFunctionCategory.SYSTEM);
+ public SqlWindowTableFunction(String name, SqlOperandTypeChecker operandTypeChecker) {
+ super(name, SqlKind.OTHER_FUNCTION, ReturnTypes.CURSOR, null,
+ operandTypeChecker, SqlFunctionCategory.SYSTEM);
}
@Override public SqlReturnTypeInference getRowTypeInference() {
return ARG0_TABLE_FUNCTION_WINDOWING;
}
- protected boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
+ protected static boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
boolean throwOnFailure) {
if (throwOnFailure) {
throw callBinding.newValidationSignatureError();
@@ -66,7 +87,53 @@ public class SqlWindowTableFunction extends SqlFunction
}
}
- protected void validateColumnNames(SqlValidator validator,
+ /**
+ * Validate the heading operands are in the form:
+ * (ROW, DESCRIPTOR, DESCRIPTOR ..., other params).
+ *
+ * @param callBinding The call binding
+ * @param descriptors The number of descriptors following the first operand (e.g. the table)
+ *
+ * @return true if validation passes
+ */
+ protected static boolean validateTableWithFollowingDescriptors(
+ SqlCallBinding callBinding, int descriptors) {
+ final SqlNode operand0 = callBinding.operand(0);
+ final SqlValidator validator = callBinding.getValidator();
+ final RelDataType type = validator.getValidatedNodeType(operand0);
+ if (type.getSqlTypeName() != SqlTypeName.ROW) {
+ return false;
+ }
+ for (int i = 1; i < descriptors + 1; i++) {
+ final SqlNode operand = callBinding.operand(i);
+ if (operand.getKind() != SqlKind.DESCRIPTOR) {
+ return false;
+ }
+ validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand).getOperandList());
+ }
+ return true;
+ }
+
+ /**
+ * Validate the operands starting from position {@code startPos} are all INTERVAL.
+ *
+ * @param callBinding The call binding
+ * @param startPos The start position to validate (starting index is 0)
+ *
+ * @return true if validation passes
+ */
+ protected static boolean validateTailingIntervals(SqlCallBinding callBinding, int startPos) {
+ final SqlValidator validator = callBinding.getValidator();
+ for (int i = startPos; i < callBinding.getOperandCount(); i++) {
+ final RelDataType type = validator.getValidatedNodeType(callBinding.operand(i));
+ if (!SqlTypeUtil.isInterval(type)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void validateColumnNames(SqlValidator validator,
List<String> fieldNames, List<SqlNode> columnNames) {
final SqlNameMatcher matcher = validator.getCatalogReader().nameMatcher();
for (SqlNode columnName : columnNames) {
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java
index f303eda..4d5e3f5 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java
@@ -47,4 +47,8 @@ class SqlArgumentAssignmentOperator extends SqlAsOperator {
writer.keyword(getName());
call.operand(0).unparse(writer, getRightPrec(), rightPrec);
}
+
+ @Override public boolean argumentMustBeScalar(int ordinal) {
+ return false;
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 19e0e21..9123f92 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -3318,7 +3318,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
* @param clause Name of clause: "WHERE", "GROUP BY", "ON"
*/
private void validateNoAggs(AggFinder aggFinder, SqlNode node,
- String clause) {
+ String clause) {
final SqlCall agg = aggFinder.findAgg(node);
if (agg == null) {
return;
@@ -3500,7 +3500,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
}
private void checkRollUp(SqlNode grandParent, SqlNode parent,
- SqlNode current, SqlValidatorScope scope, String optionalClause) {
+ SqlNode current, SqlValidatorScope scope, String optionalClause) {
current = stripAs(current);
if (current instanceof SqlCall && !(current instanceof SqlSelect)) {
// Validate OVER separately
@@ -3525,7 +3525,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
}
private void checkRollUp(SqlNode grandParent, SqlNode parent,
- SqlNode current, SqlValidatorScope scope) {
+ SqlNode current, SqlValidatorScope scope) {
checkRollUp(grandParent, parent, current, scope, null);
}
@@ -3568,7 +3568,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
// Returns true iff the given column is valid inside the given aggCall.
private boolean isRolledUpColumnAllowedInAgg(SqlIdentifier identifier, SqlValidatorScope scope,
- SqlCall aggCall, SqlNode parent) {
+ SqlCall aggCall, SqlNode parent) {
Pair<String, String> pair = findTableColumnPair(identifier, scope);
if (pair == null) {
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 3a42da3..5a91208 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -77,6 +77,7 @@ ColumnNotFoundDidYouMean=Column ''{0}'' not found in any table; did you mean ''{
ColumnNotFoundInTable=Column ''{0}'' not found in table ''{1}''
ColumnNotFoundInTableDidYouMean=Column ''{0}'' not found in table ''{1}''; did you mean ''{2}''?
ColumnAmbiguous=Column ''{0}'' is ambiguous
+ParamNotFoundInFunctionDidYouMean = Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''?
NeedQueryOp=Operand {0} must be a query
NeedSameTypeParameter=Parameters must be of the same type
CanNotApplyOp2Type=Cannot apply ''{0}'' to arguments of type {1}. Supported form(s): {2}
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 4100557..44124a6 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -1829,6 +1829,26 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
sql(sql).ok();
}
+ @Test void testTableFunctionTumbleWithParamNames() {
+ final String sql = "select *\n"
+ + "from table(\n"
+ + "tumble(\n"
+ + " DATA => table Shipments,\n"
+ + " TIMECOL => descriptor(rowtime),\n"
+ + " SIZE => INTERVAL '1' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test void testTableFunctionTumbleWithParamReordered() {
+ final String sql = "select *\n"
+ + "from table(\n"
+ + "tumble(\n"
+ + " DATA => table Shipments,\n"
+ + " SIZE => INTERVAL '1' MINUTE,\n"
+ + " TIMECOL => descriptor(rowtime)))";
+ sql(sql).ok();
+ }
+
@Test void testTableFunctionTumbleWithInnerJoin() {
final String sql = "select *\n"
+ "from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE)) a\n"
@@ -1858,6 +1878,28 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
sql(sql).ok();
}
+ @Test void testTableFunctionHopWithParamNames() {
+ final String sql = "select *\n"
+ + "from table(\n"
+ + "hop(\n"
+ + " DATA => table Shipments,\n"
+ + " TIMECOL => descriptor(rowtime),\n"
+ + " SLIDE => INTERVAL '1' MINUTE,\n"
+ + " SIZE => INTERVAL '2' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test void testTableFunctionHopWithParamReordered() {
+ final String sql = "select *\n"
+ + "from table(\n"
+ + "hop(\n"
+ + " DATA => table Shipments,\n"
+ + " SLIDE => INTERVAL '1' MINUTE,\n"
+ + " TIMECOL => descriptor(rowtime),\n"
+ + " SIZE => INTERVAL '2' MINUTE))";
+ sql(sql).ok();
+ }
+
@Test void testTableFunctionSession() {
final String sql = "select *\n"
+ "from table(session(table Shipments, descriptor(rowtime), "
@@ -1865,6 +1907,28 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
sql(sql).ok();
}
+ @Test void testTableFunctionSessionWithParamNames() {
+ final String sql = "select *\n"
+ + "from table(\n"
+ + "session(\n"
+ + " DATA => table Shipments,\n"
+ + " TIMECOL => descriptor(rowtime),\n"
+ + " KEY => descriptor(orderId),\n"
+ + " SIZE => INTERVAL '10' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test void testTableFunctionSessionWithParamReordered() {
+ final String sql = "select *\n"
+ + "from table(\n"
+ + "session(\n"
+ + " DATA => table Shipments,\n"
+ + " KEY => descriptor(orderId),\n"
+ + " TIMECOL => descriptor(rowtime),\n"
+ + " SIZE => INTERVAL '10' MINUTE))";
+ sql(sql).ok();
+ }
+
@Test void testTableFunctionTumbleWithSubQueryParam() {
final String sql = "select *\n"
+ "from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))";
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 32cc678..bb6a82f 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10287,13 +10287,41 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
}
@Test public void testTumbleTableFunction() {
- sql("select * from table(\n"
- + "^tumble(table orders, descriptor(rowtime))^)")
- .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
sql("select rowtime, productid, orderid, 'window_start', 'window_end' from table(\n"
+ "tumble(table orders, descriptor(rowtime), interval '2' hour))").ok();
sql("select rowtime, productid, orderid, 'window_start', 'window_end' from table(\n"
+ "tumble(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour))").ok();
+ // test named params.
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "tumble(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "size => interval '2' hour))").ok();
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "tumble(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "size => interval '2' hour,\n"
+ + "\"OFFSET\" => interval '1' hour))").ok();
+ // negative tests.
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "tumble(\n"
+ + "^\"data\"^ => table orders,\n"
+ + "TIMECOL => descriptor(rowtime),\n"
+ + "SIZE => interval '2' hour))")
+ .fails("Param 'data' not found in function 'TUMBLE'; did you mean 'DATA'\\?");
+ sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n"
+ + "from table(\n"
+ + "^tumble(\n"
+ + "data => table orders,\n"
+ + "SIZE => interval '2' hour)^)")
+ .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
+ sql("select * from table(\n"
+ + "^tumble(table orders, descriptor(rowtime))^)")
+ .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
sql("select * from table(\n"
+ "^tumble(table orders, descriptor(rowtime), 'test')^)")
.fails("Cannot apply 'TUMBLE' to arguments of type 'TUMBLE\\(<RECORDTYPE\\"
@@ -10326,6 +10354,34 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
sql("select * from table(\n"
+ "hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour, "
+ "interval '20' minute))").ok();
+ // test named params.
+ sql("select * from table(\n"
+ + "hop(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "slide => interval '2' hour,\n"
+ + "size => interval '1' hour))").ok();
+ sql("select * from table(\n"
+ + "hop(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "slide => interval '2' hour,\n"
+ + "size => interval '1' hour,\n"
+ + "\"OFFSET\" => interval '20' minute))").ok();
+ // negative tests.
+ sql("select * from table(\n"
+ + "hop(\n"
+ + "^\"data\"^ => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "slide => interval '2' hour,\n"
+ + "size => interval '1' hour))")
+ .fails("Param 'data' not found in function 'HOP'; did you mean 'DATA'\\?");
+ sql("select * from table(\n"
+ + "^hop(\n"
+ + "data => table orders,\n"
+ + "slide => interval '2' hour,\n"
+ + "size => interval '1' hour)^)")
+ .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments");
sql("select * from table(\n"
+ "^hop(table orders, descriptor(rowtime), interval '2' hour)^)")
.fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments");
@@ -10334,25 +10390,25 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
+ "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
- + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+ + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
sql("select * from table(\n"
+ "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
- + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+ + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
sql("select * from table(\n"
+ "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)")
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
- + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+ + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
sql("select * from table(\n"
+ "^hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour, 'test')^)")
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
+ "<INTERVAL HOUR>, <CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, "
- + "DESCRIPTOR\\(col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
+ + "DESCRIPTOR\\(timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)");
sql("select * from table(\n"
+ "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))")
.fails("Object 'TABLER_NOT_EXIST' not found");
@@ -10362,6 +10418,28 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
sql("select * from table(\n"
+ "session(table orders, descriptor(rowtime), descriptor(productid), interval '1' hour))")
.ok();
+ // test named params.
+ sql("select * from table(\n"
+ + "session(\n"
+ + "data => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "key => descriptor(productid),\n"
+ + "size => interval '1' hour))")
+ .ok();
+ // negative tests.
+ sql("select * from table(\n"
+ + "session(\n"
+ + "^\"data\"^ => table orders,\n"
+ + "timecol => descriptor(rowtime),\n"
+ + "key => descriptor(productid),\n"
+ + "size => interval '1' hour))")
+ .fails("Param 'data' not found in function 'SESSION'; did you mean 'DATA'\\?");
+ sql("select * from table(\n"
+ + "^session(\n"
+ + "data => table orders,\n"
+ + "key => descriptor(productid),\n"
+ + "size => interval '1' hour)^)")
+ .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments");
sql("select * from table(\n"
+ "^session(table orders, descriptor(rowtime), interval '2' hour)^)")
.fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments");
@@ -10370,19 +10448,19 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
.fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <COLUMN_LIST>, "
+ "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
- + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+ + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
sql("select * from table(\n"
+ "^session(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
.fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
- + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+ + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
sql("select * from table(\n"
+ "^session(table orders, 'test', descriptor(productid), interval '2' hour)^)")
.fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <COLUMN_LIST>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
- + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+ + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)");
sql("select * from table(\n"
+ "session(TABLE ^tabler_not_exist^, descriptor(rowtime), descriptor(productid), interval '1' hour))")
.fails("Object 'TABLER_NOT_EXIST' not found");
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 2758df7..69c57e2 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -5019,6 +5019,42 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTableFunctionTumbleWithParamNames">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(
+tumble(
+ DATA => table Shipments,
+ TIMECOL => descriptor(rowtime),
+ SIZE => INTERVAL '1' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTableFunctionTumbleWithParamReordered">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(
+tumble(
+ DATA => table Shipments,
+ SIZE => INTERVAL '1' MINUTE,
+ TIMECOL => descriptor(rowtime)))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableFunctionTumbleWithInnerJoin">
<Resource name="sql">
<![CDATA[select *
@@ -5067,6 +5103,44 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTableFunctionHopWithParamNames">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(
+hop(
+ TIMECOL => descriptor(rowtime),
+ SLIDE => INTERVAL '1' MINUTE,
+ DATA => table Shipments,
+ SIZE => INTERVAL '2' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTableFunctionHopWithParamReordered">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(
+hop(
+ DATA => table Shipments,
+ SLIDE => INTERVAL '1' MINUTE,
+ TIMECOL => descriptor(rowtime),
+ SIZE => INTERVAL '2' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableFunctionHopWithOffset">
<Resource name="sql">
<![CDATA[select *
@@ -5095,6 +5169,44 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTableFunctionSessionWithParamNames">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(
+session(
+ DATA => table Shipments,
+ TIMECOL => descriptor(rowtime),
+ KEY => descriptor(orderId),
+ SIZE => INTERVAL '10' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTableFunctionSessionWithParamReordered">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(
+session(
+ DATA => table Shipments,
+ KEY => descriptor(orderId),
+ TIMECOL => descriptor(rowtime),
+ SIZE => INTERVAL '10' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableFunctionTumbleWithSubQueryParam">
<Resource name="sql">
<![CDATA[select *
diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq
index 724eabd..365994e 100644
--- a/core/src/test/resources/sql/stream.iq
+++ b/core/src/test/resources/sql/stream.iq
@@ -17,6 +17,24 @@
#
!use orinoco
!set outputformat mysql
+SELECT * FROM TABLE(
+ TUMBLE(
+ DATA => TABLE ORDERS,
+ TIMECOL => DESCRIPTOR(ROWTIME),
+ SIZE => INTERVAL '1' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:16:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:24:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:24:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 10:59:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:10:00 | 2015-02-15 11:11:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
+
SELECT * FROM TABLE(TUMBLE(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '1' MINUTE));
+---------------------+----+---------+-------+---------------------+---------------------+
| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
@@ -78,6 +96,30 @@ SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE,
!ok
+SELECT * FROM TABLE(
+ HOP(
+ DATA => TABLE ORDERS,
+ TIMECOL => DESCRIPTOR(ROWTIME),
+ SLIDE => INTERVAL '5' MINUTE,
+ SIZE => INTERVAL '10' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 |
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(10 rows)
+
+!ok
+
SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE, INTERVAL '2' MINUTE));
+---------------------+----+---------+-------+---------------------+---------------------+
| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
@@ -130,6 +172,25 @@ SELECT * FROM TABLE(SESSION(TABLE ORDERS, DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUC
!ok
+SELECT * FROM TABLE(
+ SESSION(
+ DATA => TABLE ORDERS,
+ TIMECOL => DESCRIPTOR(ROWTIME),
+ KEY => DESCRIPTOR(PRODUCT),
+ SIZE => INTERVAL '20' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
+
SELECT * FROM TABLE(SESSION((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE));
+---------------------+----+---------+-------+---------------------+---------------------+
| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index 3bf384f..d5928aa 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -1876,10 +1876,24 @@ is named as "fixed windowing".
| Operator syntax | Description
|:-------------------- |:-----------
-| TUMBLE(table, DESCRIPTOR(datetime), interval [, offset ]) | Indicates a tumbling window of *interval* for *datetime*, optionally aligned at offset.
+| TUMBLE(data, DESCRIPTOR(timecol), size [, offset ]) | Indicates a tumbling window of *size* interval for *timecol*, optionally aligned at *offset*.
Here is an example:
-`SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))`,
+```SQL
+SELECT * FROM TABLE(
+ TUMBLE(
+ TABLE orders,
+ DESCRIPTOR(rowtime),
+ INTERVAL '1' MINUTE));
+
+-- or with the named params
+-- note: the DATA param must be the first
+SELECT * FROM TABLE(
+ TUMBLE(
+ DATA => TABLE orders,
+ TIMECOL => DESCRIPTOR(rowtime),
+ SIZE => INTERVAL '1' MINUTE));
+```
will apply tumbling with 1 minute window size on rows from table orders. rowtime is the
watermarked column of table orders that tells data completeness.
@@ -1890,10 +1904,26 @@ on a timestamp column. Windows assigned could have overlapping so hopping someti
| Operator syntax | Description
|:-------------------- |:-----------
-| HOP(table, DESCRIPTOR(datetime), slide, size [, offset ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide* and optionally aligned at offset.
+| HOP(data, DESCRIPTOR(timecol), slide, size [, offset ]) | Indicates a hopping window for *timecol*, covering rows within the interval of *size*, shifting every *slide* and optionally aligned at *offset*.
Here is an example:
-`SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`,
+```SQL
+SELECT * FROM TABLE(
+ HOP(
+ TABLE orders,
+ DESCRIPTOR(rowtime),
+ INTERVAL '2' MINUTE,
+ INTERVAL '5' MINUTE));
+
+-- or with the named params
+-- note: the DATA param must be the first
+SELECT * FROM TABLE(
+ HOP(
+ DATA => TABLE orders,
+ TIMECOL => DESCRIPTOR(rowtime),
+ SLIDE => INTERVAL '2' MINUTE,
+ SIZE => INTERVAL '5' MINUTE));
+```
will apply hopping with 5-minute interval size on rows from table orders and shifting every 2 minutes. rowtime is the
watermarked column of table orders that tells data completeness.
@@ -1904,13 +1934,34 @@ of rows are less than *interval*. Session window is applied per *key*.
| Operator syntax | Description
|:-------------------- |:-----------
-| session(table, DESCRIPTOR(datetime), DESCRIPTOR(key), interval) | Indicates a session window of *interval* for *datetime*. Session window is applied per *key*.
+| session(data, DESCRIPTOR(timecol), DESCRIPTOR(key), size) | Indicates a session window of *size* interval for *timecol*. Session window is applied per *key*.
Here is an example:
-`SELECT * FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(rowtime), DESCRIPTOR(product), INTERVAL '20' MINUTE))`,
+```SQL
+SELECT * FROM TABLE(
+ SESSION(
+ TABLE orders,
+ DESCRIPTOR(rowtime),
+ DESCRIPTOR(product),
+ INTERVAL '20' MINUTE));
+
+-- or with the named params
+-- note: the DATA param must be the first
+SELECT * FROM TABLE(
+ SESSION(
+ DATA => TABLE orders,
+ TIMECOL => DESCRIPTOR(rowtime),
+ KEY => DESCRIPTOR(product),
+ SIZE => INTERVAL '20' MINUTE));
+```
will apply session with 20-minute inactive gap on rows from table orders. rowtime is the
watermarked column of table orders that tells data completeness. Session is applied per product.
+**Note**: The `Tumble`, `Hop` and `Session` window table functions assign
+each row in the original table to a window. The output table has all
+the same columns as the original table plus two additional columns `window_start`
+and `window_end`, which repesent the start and end of the window interval, respectively.
+
### Grouped window functions
**warning**: grouped window functions are deprecated.