You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by fe...@apache.org on 2020/05/09 12:23:55 UTC
[calcite] 01/02: [CALCITE-3737] HOP Table Function (Rui Wang)
This is an automated email from the ASF dual-hosted git repository.
fengzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
commit 40e588de5f999034e5030b12cdbc90f4073808fe
Author: amaliujia <am...@163.com>
AuthorDate: Mon Dec 23 23:48:25 2019 -0800
[CALCITE-3737] HOP Table Function (Rui Wang)
---
core/src/main/codegen/config.fmpp | 1 +
core/src/main/codegen/templates/Parser.jj | 16 +++-
.../calcite/adapter/enumerable/EnumUtils.java | 85 +++++++++++++++++++++-
.../enumerable/EnumerableTableFunctionScan.java | 6 +-
.../calcite/adapter/enumerable/RexImpTable.java | 36 +++++++--
.../adapter/enumerable/RexToLixTranslator.java | 8 +-
...ntor.java => TableFunctionCallImplementor.java} | 2 +-
.../apache/calcite/sql/SqlHopTableFunction.java | 71 ++++++++++++++++++
.../apache/calcite/sql/SqlTumbleTableFunction.java | 65 +++++++++++++++++
.../apache/calcite/sql/SqlWindowTableFunction.java | 49 +++----------
.../calcite/sql/fun/SqlStdOperatorTable.java | 10 ++-
.../org/apache/calcite/util/BuiltInMethod.java | 5 +-
.../org/apache/calcite/test/SqlValidatorTest.java | 35 ++++++++-
.../apache/calcite/test/SqlToRelConverterTest.xml | 6 +-
core/src/test/resources/sql/stream.iq | 38 ++++++++++
site/_docs/reference.md | 17 +++++
16 files changed, 384 insertions(+), 66 deletions(-)
diff --git a/core/src/main/codegen/config.fmpp b/core/src/main/codegen/config.fmpp
index eb9adff..ecf6dcf 100644
--- a/core/src/main/codegen/config.fmpp
+++ b/core/src/main/codegen/config.fmpp
@@ -145,6 +145,7 @@ data: {
"GOTO"
"GRANTED"
"HIERARCHY"
+ "HOP"
"HOURS"
"IGNORE"
"IMMEDIATE"
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index 3ea3a5a..b0c1395 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -6052,10 +6052,17 @@ SqlCall GroupByWindowingCall():
final List<SqlNode> args;
}
{
- <TUMBLE> { s = span(); }
- args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
- return SqlStdOperatorTable.TUMBLE.createCall(s.end(this), args);
- }
+ (
+ <TUMBLE> { s = span(); }
+ args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+ return SqlStdOperatorTable.TUMBLE.createCall(s.end(this), args);
+ }
+ |
+ <HOP> { s = span(); }
+ args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+ return SqlStdOperatorTable.HOP.createCall(s.end(this), args);
+ }
+ )
}
SqlCall MatchRecognizeFunctionCall() :
@@ -6956,6 +6963,7 @@ SqlPostfixOperator PostfixRowOperator() :
| < HAVING: "HAVING" >
| < HIERARCHY: "HIERARCHY" >
| < HOLD: "HOLD" >
+| < HOP: "HOP" >
| < HOUR: "HOUR" >
| < HOURS: "HOURS" >
| < IDENTITY: "IDENTITY" >
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index 6fca91d..b50a832 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -17,6 +17,9 @@
package org.apache.calcite.adapter.enumerable;
import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.JoinType;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.linq4j.function.Function1;
@@ -56,6 +59,7 @@ import java.math.BigDecimal;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
/**
@@ -746,7 +750,7 @@ public class EnumUtils {
/** Generates a window selector which appends attribute of the window based on
* the parameters. */
- static Expression windowSelector(
+ static Expression tumblingWindowSelector(
PhysType inputPhysType,
PhysType outputPhysType,
Expression wmColExpr,
@@ -796,4 +800,83 @@ public class EnumUtils {
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies hopping on each element from the input
+ * enumerator and produces at least one element for each input element.
+ */
+ public static Enumerable<Object[]> hopping(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new HopEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, emitFrequency, intervalSize);
+ }
+ };
+ }
+
+ private static class HopEnumerator implements Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final long emitFrequency;
+ private final long intervalSize;
+ private LinkedList<Object[]> list;
+
+ HopEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.emitFrequency = emitFrequency;
+ this.intervalSize = intervalSize;
+ list = new LinkedList<>();
+ }
+
+ public Object[] current() {
+ if (list.size() > 0) {
+ return takeOne();
+ } else {
+ Object[] current = inputEnumerator.current();
+ List<Pair> windows = hopWindows(SqlFunctions.toLong(current[indexOfWatermarkedColumn]),
+ emitFrequency, intervalSize);
+ for (Pair window : windows) {
+ Object[] curWithWindow = new Object[current.length + 2];
+ System.arraycopy(current, 0, curWithWindow, 0, current.length);
+ curWithWindow[current.length] = window.left;
+ curWithWindow[current.length + 1] = window.right;
+ list.offer(curWithWindow);
+ }
+ return takeOne();
+ }
+ }
+
+ public boolean moveNext() {
+ if (list.size() > 0) {
+ return true;
+ }
+ return inputEnumerator.moveNext();
+ }
+
+ public void reset() {
+ inputEnumerator.reset();
+ list.clear();
+ }
+
+ public void close() {
+ }
+
+ private Object[] takeOne() {
+ return list.pollFirst();
+ }
+ }
+
+ private static List<Pair> hopWindows(long tsMillis, long periodMillis, long sizeMillis) {
+ ArrayList<Pair> ret = new ArrayList<>(Math.toIntExact(sizeMillis / periodMillis));
+ long lastStart = tsMillis - ((tsMillis + periodMillis) % periodMillis);
+ for (long start = lastStart;
+ start > tsMillis - sizeMillis;
+ start -= periodMillis) {
+ ret.add(new Pair(start, start + sizeMillis));
+ }
+ return ret;
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java
index 677b1dd..bd3c7ff 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableTableFunctionScan.java
@@ -69,7 +69,7 @@ public class EnumerableTableFunctionScan extends TableFunctionScan
if (isImplementorDefined((RexCall) getCall())) {
return tvfImplementorBasedImplement(implementor, pref);
} else {
- return defaultTableValuedFunctionImplement(implementor, pref);
+ return defaultTableFunctionImplement(implementor, pref);
}
}
@@ -100,7 +100,7 @@ public class EnumerableTableFunctionScan extends TableFunctionScan
return QueryableTable.class.isAssignableFrom(method.getReturnType());
}
- private Result defaultTableValuedFunctionImplement(
+ private Result defaultTableFunctionImplement(
EnumerableRelImplementor implementor, Prefer pref) {
BlockBuilder bb = new BlockBuilder();
// Non-array user-specified types are not supported yet
@@ -142,7 +142,7 @@ public class EnumerableTableFunctionScan extends TableFunctionScan
SqlConformanceEnum.DEFAULT);
builder.add(
- RexToLixTranslator.translateTableValuedFunction(
+ RexToLixTranslator.translateTableFunction(
typeFactory,
conformance,
builder,
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index e8c5995..77a366f 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -184,6 +184,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING_ID;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.HOP_TVF;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INITCAP;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INTERSECTION;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_A_SET;
@@ -312,7 +313,7 @@ public class RexImpTable {
new HashMap<>();
private final Map<SqlMatchFunction, Supplier<? extends MatchImplementor>> matchMap =
new HashMap<>();
- private final Map<SqlOperator, Supplier<? extends TableValuedFunctionCallImplementor>>
+ private final Map<SqlOperator, Supplier<? extends TableFunctionCallImplementor>>
tvfImplementorMap = new HashMap<>();
RexImpTable() {
@@ -673,6 +674,7 @@ public class RexImpTable {
matchMap.put(LAST, LastImplementor::new);
map.put(PREV, new PrevImplementor());
tvfImplementorMap.put(TUMBLE_TVF, TumbleImplementor::new);
+ tvfImplementorMap.put(HOP_TVF, HopImplementor::new);
}
private <T> Supplier<T> constructorSupplier(Class<T> klass) {
@@ -969,8 +971,8 @@ public class RexImpTable {
}
}
- public TableValuedFunctionCallImplementor get(final SqlWindowTableFunction operator) {
- final Supplier<? extends TableValuedFunctionCallImplementor> supplier =
+ public TableFunctionCallImplementor get(final SqlWindowTableFunction operator) {
+ final Supplier<? extends TableFunctionCallImplementor> supplier =
tvfImplementorMap.get(operator);
if (supplier != null) {
return supplier.get();
@@ -3205,7 +3207,7 @@ public class RexImpTable {
}
/** Implements tumbling. */
- private static class TumbleImplementor implements TableValuedFunctionCallImplementor {
+ private static class TumbleImplementor implements TableFunctionCallImplementor {
@Override public Expression implement(RexToLixTranslator translator,
Expression inputEnumerable,
RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
@@ -3225,11 +3227,35 @@ public class RexImpTable {
return Expressions.call(
BuiltInMethod.TUMBLING.method,
inputEnumerable,
- EnumUtils.windowSelector(
+ EnumUtils.tumblingWindowSelector(
inputPhysType,
outputPhysType,
translatedOperands.get(0),
translatedOperands.get(1)));
}
}
+
+ /** Implements hopping. */
+ private static class HopImplementor implements TableFunctionCallImplementor {
+ @Override public Expression implement(RexToLixTranslator translator,
+ Expression inputEnumerable, RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
+ Expression intervalExpression = translator.translate(call.getOperands().get(2));
+ Expression intervalExpression2 = translator.translate(call.getOperands().get(3));
+ RexCall descriptor = (RexCall) call.getOperands().get(1);
+ List<Expression> translatedOperands = new ArrayList<>();
+ Expression wmColIndexExpr =
+ Expressions.constant(((RexInputRef) descriptor.getOperands().get(0)).getIndex());
+ translatedOperands.add(wmColIndexExpr);
+ translatedOperands.add(intervalExpression);
+ translatedOperands.add(intervalExpression2);
+
+ return Expressions.call(
+ BuiltInMethod.HOPPING.method,
+ Expressions.list(
+ Expressions.call(inputEnumerable, BuiltInMethod.ENUMERABLE_ENUMERATOR.method),
+ wmColIndexExpr,
+ intervalExpression,
+ intervalExpression2));
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
index 7bb3123..934f982 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
@@ -168,14 +168,14 @@ public class RexToLixTranslator {
.translateList(program.getProjectList(), storageTypes);
}
- public static Expression translateTableValuedFunction(JavaTypeFactory typeFactory,
+ public static Expression translateTableFunction(JavaTypeFactory typeFactory,
SqlConformance conformance, BlockBuilder blockBuilder,
Expression root, RexCall rexCall, Expression inputEnumerable,
PhysType inputPhysType, PhysType outputPhysType) {
return new RexToLixTranslator(null, typeFactory, root, null,
blockBuilder, Collections.emptyMap(), new RexBuilder(typeFactory), conformance,
null, null)
- .translateTableValuedFunction(rexCall, inputEnumerable, inputPhysType, outputPhysType);
+ .translateTableFunction(rexCall, inputEnumerable, inputPhysType, outputPhysType);
}
/** Creates a translator for translating aggregate functions. */
@@ -946,10 +946,10 @@ public class RexToLixTranslator {
return list;
}
- private Expression translateTableValuedFunction(RexCall rexCall, Expression inputEnumerable,
+ private Expression translateTableFunction(RexCall rexCall, Expression inputEnumerable,
PhysType inputPhysType, PhysType outputPhysType) {
assert rexCall.getOperator() instanceof SqlWindowTableFunction;
- TableValuedFunctionCallImplementor implementor =
+ TableFunctionCallImplementor implementor =
RexImpTable.INSTANCE.get((SqlWindowTableFunction) rexCall.getOperator());
if (implementor == null) {
throw Util.needToImplement("implementor of " + rexCall.getOperator().getName());
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/TableValuedFunctionCallImplementor.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/TableFunctionCallImplementor.java
similarity index 96%
rename from core/src/main/java/org/apache/calcite/adapter/enumerable/TableValuedFunctionCallImplementor.java
rename to core/src/main/java/org/apache/calcite/adapter/enumerable/TableFunctionCallImplementor.java
index 5a2d6a4..5492f06 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/TableValuedFunctionCallImplementor.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/TableFunctionCallImplementor.java
@@ -24,7 +24,7 @@ import org.apache.calcite.rex.RexCall;
* Implements a table-valued function call.
*/
@Experimental
-public interface TableValuedFunctionCallImplementor {
+public interface TableFunctionCallImplementor {
/**
* Implements a table-valued function call.
*
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
new file mode 100644
index 0000000..d32b909
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+/**
+ * SqlHopTableFunction implements an operator for hopping. It allows four parameters:
+ * 1. a table.
+ * 2. a descriptor to provide a watermarked column name from the input table.
+ * 3. an interval parameter to specify the length of window shifting.
+ * 4. an interval parameter to specify the length of window size.
+ */
+public class SqlHopTableFunction extends SqlWindowTableFunction {
+ public SqlHopTableFunction() {
+ super(SqlKind.HOP.name());
+ }
+
+ @Override public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(4);
+ }
+
+ @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
+ boolean throwOnFailure) {
+ // There should only be three operands, and number of operands are checked before
+ // this call.
+ final SqlNode operand0 = callBinding.operand(0);
+ final SqlValidator validator = callBinding.getValidator();
+ final RelDataType type = validator.getValidatedNodeType(operand0);
+ if (type.getSqlTypeName() != SqlTypeName.ROW) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ final SqlNode operand1 = callBinding.operand(1);
+ if (operand1.getKind() != SqlKind.DESCRIPTOR) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
+ final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
+ if (!SqlTypeUtil.isInterval(type2)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
+ if (!SqlTypeUtil.isInterval(type3)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ return true;
+ }
+
+ @Override public String getAllowedSignatures(String opNameToUse) {
+ return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), "
+ + "datetime interval, datetime interval)";
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
new file mode 100644
index 0000000..e3a5001
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+/**
+ * SqlTumbleTableFunction implements an operator for tumbling. It allows three parameters:
+ * 1. a table.
+ * 2. a descriptor to provide a watermarked column name from the input table.
+ * 3. an interval parameter to specify the length of window size.
+ */
+public class SqlTumbleTableFunction extends SqlWindowTableFunction {
+ public SqlTumbleTableFunction() {
+ super(SqlKind.TUMBLE.name());
+ }
+
+ @Override public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(3);
+ }
+
+ @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
+ boolean throwOnFailure) {
+ // There should only be three operands, and number of operands are checked before
+ // this call.
+ final SqlNode operand0 = callBinding.operand(0);
+ final SqlValidator validator = callBinding.getValidator();
+ final RelDataType type = validator.getValidatedNodeType(operand0);
+ if (type.getSqlTypeName() != SqlTypeName.ROW) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ final SqlNode operand1 = callBinding.operand(1);
+ if (operand1.getKind() != SqlKind.DESCRIPTOR) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
+ final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
+ if (!SqlTypeUtil.isInterval(type2)) {
+ return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ }
+ return true;
+ }
+
+ @Override public String getAllowedSignatures(String opNameToUse) {
+ return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval)";
+ }
+}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
index f72a260..e86551b 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java
@@ -20,10 +20,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.SqlValidator;
import java.util.ArrayList;
@@ -44,28 +42,21 @@ public class SqlWindowTableFunction extends SqlFunction {
SqlFunctionCategory.SYSTEM);
}
- @Override public SqlOperandCountRange getOperandCountRange() {
- return SqlOperandCountRanges.of(3);
- }
-
- @Override public boolean checkOperandTypes(SqlCallBinding callBinding,
+ protected boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
boolean throwOnFailure) {
- // There should only be three operands, and number of operands are checked before
- // this call.
- final SqlNode operand0 = callBinding.operand(0);
- final SqlValidator validator = callBinding.getValidator();
- final RelDataType type = validator.getValidatedNodeType(operand0);
- if (type.getSqlTypeName() != SqlTypeName.ROW) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
- }
- final SqlNode operand1 = callBinding.operand(1);
- if (operand1.getKind() != SqlKind.DESCRIPTOR) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
+ if (throwOnFailure) {
+ throw callBinding.newValidationSignatureError();
+ } else {
+ return false;
}
- for (SqlNode descOperand: ((SqlCall) operand1).getOperandList()) {
+ }
+
+ protected void validateColumnNames(SqlValidator validator,
+ List<String> fieldNames, List<SqlNode> unvalidatedColumnNames) {
+ for (SqlNode descOperand: unvalidatedColumnNames) {
final String colName = ((SqlIdentifier) descOperand).getSimple();
boolean matches = false;
- for (String field : type.getFieldNames()) {
+ for (String field : fieldNames) {
if (validator.getCatalogReader().nameMatcher().matches(field, colName)) {
matches = true;
break;
@@ -76,24 +67,6 @@ public class SqlWindowTableFunction extends SqlFunction {
RESOURCE.unknownIdentifier(colName));
}
}
- final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
- if (!SqlTypeUtil.isInterval(type2)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
- }
- return true;
- }
-
- private boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding,
- boolean throwOnFailure) {
- if (throwOnFailure) {
- throw callBinding.newValidationSignatureError();
- } else {
- return false;
- }
- }
-
- @Override public String getAllowedSignatures(String opNameToUse) {
- return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval)";
}
/**
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 23fc9c3..526df03 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -27,6 +27,7 @@ import org.apache.calcite.sql.SqlFilterOperator;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlGroupedWindowFunction;
+import org.apache.calcite.sql.SqlHopTableFunction;
import org.apache.calcite.sql.SqlInternalOperator;
import org.apache.calcite.sql.SqlJsonConstructorNullClause;
import org.apache.calcite.sql.SqlKind;
@@ -47,11 +48,11 @@ import org.apache.calcite.sql.SqlSampleSpec;
import org.apache.calcite.sql.SqlSetOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlTumbleTableFunction;
import org.apache.calcite.sql.SqlUnnestOperator;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlValuesOperator;
import org.apache.calcite.sql.SqlWindow;
-import org.apache.calcite.sql.SqlWindowTableFunction;
import org.apache.calcite.sql.SqlWithinGroupOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.type.InferTypes;
@@ -2294,7 +2295,10 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlOperator DESCRIPTOR = new SqlDescriptorOperator();
/** TUMBLE as a table-value function. */
- public static final SqlFunction TUMBLE_TVF = new SqlWindowTableFunction(SqlKind.TUMBLE.name());
+ public static final SqlFunction TUMBLE_TVF = new SqlTumbleTableFunction();
+
+ /** HOP as a table-value function. */
+ public static final SqlFunction HOP_TVF = new SqlHopTableFunction();
/** The {@code TUMBLE} group function.
*
@@ -2332,7 +2336,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
/** The {@code HOP} group function. */
public static final SqlGroupedWindowFunction HOP =
- new SqlGroupedWindowFunction(SqlKind.HOP.name(), SqlKind.HOP, null,
+ new SqlGroupedWindowFunction("$HOP", SqlKind.HOP, null,
ReturnTypes.ARG0, null,
OperandTypes.or(OperandTypes.DATETIME_INTERVAL_INTERVAL,
OperandTypes.DATETIME_INTERVAL_INTERVAL_TIME),
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 8bfdee5..fc60dcc 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.AggregateLambdaFactory;
import org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory;
import org.apache.calcite.adapter.enumerable.BasicLazyAccumulator;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
import org.apache.calcite.adapter.enumerable.LazyAggregateLambdaFactory;
import org.apache.calcite.adapter.enumerable.MatchUtils;
import org.apache.calcite.adapter.enumerable.SourceSorter;
@@ -591,7 +592,9 @@ public enum BuiltInMethod {
"resultSelector", Function2.class),
AGG_LAMBDA_FACTORY_ACC_SINGLE_GROUP_RESULT_SELECTOR(AggregateLambdaFactory.class,
"singleGroupResultSelector", Function1.class),
- TUMBLING(EnumerableDefaults.class, "tumbling", Enumerable.class, Function1.class);
+ TUMBLING(EnumerableDefaults.class, "tumbling", Enumerable.class, Function1.class),
+ HOPPING(EnumUtils.class, "hopping", Enumerator.class, int.class, long.class,
+ long.class);
public final Method method;
public final Constructor constructor;
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index f361759..241898e 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10214,7 +10214,7 @@ class SqlValidatorTest extends SqlValidatorTestCase {
.fails("Unknown identifier 'COLUMN_NOT_EXIST'");
}
- @Test void testTumbleTableValuedFunction() {
+ @Test public void testTumbleTableFunction() {
sql("select * from table(\n"
+ "^tumble(table orders, descriptor(rowtime), interval '2' hour, 'test')^)")
.fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments");
@@ -10237,7 +10237,36 @@ class SqlValidatorTest extends SqlValidatorTestCase {
.fails("Object 'TABLER_NOT_EXIST' not found");
}
- @Test void testStreamTumble() {
+ @Test public void testHopTableFunction() {
+ sql("select * from table(\n"
+ + "hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour))").ok();
+ sql("select * from table(\n"
+ + "^hop(table orders, descriptor(rowtime), interval '2' hour)^)")
+ .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments");
+ sql("select * from table(\n"
+ + "^hop(table orders, descriptor(rowtime), interval '2' hour, 'test')^)")
+ .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
+ + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
+ + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+ sql("select * from table(\n"
+ + "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
+ .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+ + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
+ + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+ sql("select * from table(\n"
+ + "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)")
+ .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, "
+ + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
+ + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+ sql("select * from table(\n"
+ + "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))")
+ .fails("Object 'TABLER_NOT_EXIST' not found");
+ }
+
+ @Test public void testStreamTumble() {
// TUMBLE
sql("select stream tumble_end(rowtime, interval '2' hour) as rowtime\n"
+ "from orders\n"
@@ -10297,7 +10326,7 @@ class SqlValidatorTest extends SqlValidatorTestCase {
+ "from orders\n"
+ "group by hop(rowtime, interval '1' hour, interval '3' hour)")
.fails("Call to auxiliary group function 'HOP_START' must have "
- + "matching call to group function 'HOP' in GROUP BY clause");
+ + "matching call to group function '\\$HOP' in GROUP BY clause");
// HOP with align
sql("select stream\n"
+ " hop_start(rowtime, interval '1' hour, interval '3' hour,\n"
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 880eefe..c19e32a 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -3537,7 +3537,7 @@ group by hop(rowtime, interval '1' hour, interval '3' hour)]]>
LogicalDelta
LogicalProject(ROWTIME=[$0], C=[$1])
LogicalAggregate(group=[{0}], C=[COUNT()])
- LogicalProject($f0=[HOP($0, 3600000:INTERVAL HOUR, 10800000:INTERVAL HOUR)])
+ LogicalProject($f0=[$HOP($0, 3600000:INTERVAL HOUR, 10800000:INTERVAL HOUR)])
LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
]]>
</Resource>
@@ -4917,7 +4917,7 @@ LogicalDelta
]]>
</Resource>
</TestCase>
- <TestCase name="testTableValuedFunctionTumble">
+ <TestCase name="testTableFunctionTumble">
<Resource name="sql">
<![CDATA[select *
from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE))]]>
@@ -4931,7 +4931,7 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
]]>
</Resource>
</TestCase>
- <TestCase name="testTableValuedFunctionTumbleWithSubQueryParam">
+ <TestCase name="testTableFunctionTumbleWithSubQueryParam">
<Resource name="sql">
<![CDATA[select *
from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))]]>
diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq
index 18ac66e..15f5832 100644
--- a/core/src/test/resources/sql/stream.iq
+++ b/core/src/test/resources/sql/stream.iq
@@ -44,3 +44,41 @@ SELECT * FROM TABLE(TUMBLE((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), INTERVAL
(5 rows)
!ok
+
+SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 |
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(10 rows)
+
+!ok
+
+SELECT * FROM TABLE(HOP((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 |
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(10 rows)
+
+!ok
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index a223aa7..f8aa8ec 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -580,6 +580,7 @@ GRANTED,
**HAVING**,
HIERARCHY,
**HOLD**,
+HOP,
**HOUR**,
HOURS,
**IDENTITY**,
@@ -1882,7 +1883,23 @@ Here is an example:
will apply tumbling with 1 minute window size on rows from table orders. rowtime is the
watermarked column of table orders that tells data completeness.
+#### HOP
+In streaming queries, HOP assigns windows that cover rows within the interval of *size*, shifting every *slide*,
+and optionally aligned at *time* based on a timestamp column. Windows assigned could have overlapping so hopping
+sometime is named as "sliding windowing".
+
+
+| Operator syntax | Description
+|:-------------------- |:-----------
+| HOP(table, DESCRIPTOR(column_name), slide, size, [, time ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide*, and optionally aligned at *time*. Hopping is applied on table in which there is a watermarked column specified by descriptor.
+
+Here is an example:
+`SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`,
+will apply hopping with 5-minute interval size on rows from table orders, shifting every 2 minutes. rowtime is the
+watermarked column of table orders that tells data completeness.
+
### Grouped window functions
+**warning**: grouped window functions are deprecated.
Grouped window functions occur in the `GROUP BY` clause and define a key value
that represents a window containing several rows.