You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by fe...@apache.org on 2020/05/09 12:23:56 UTC
[calcite] 02/02: [CALCITE-3780] SESSION Table function (Rui Wang)
This is an automated email from the ASF dual-hosted git repository.
fengzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
commit 890eb61efcccc486e2192110cefe4cac5aa6f150
Author: amaliujia <am...@163.com>
AuthorDate: Sat Feb 8 22:35:35 2020 -0800
[CALCITE-3780] SESSION Table function (Rui Wang)
---
core/src/main/codegen/templates/Parser.jj | 24 ++-
.../calcite/adapter/enumerable/EnumUtils.java | 201 ++++++++++++++++++++-
.../calcite/adapter/enumerable/RexImpTable.java | 53 ++++--
.../apache/calcite/sql/SqlHopTableFunction.java | 4 +-
...eFunction.java => SqlSessionTableFunction.java} | 24 +--
.../calcite/sql/fun/SqlStdOperatorTable.java | 38 ++--
.../apache/calcite/sql2rel/AuxiliaryConverter.java | 2 +-
.../org/apache/calcite/util/BuiltInMethod.java | 4 +-
.../apache/calcite/test/SqlToRelConverterTest.java | 39 +++-
.../org/apache/calcite/test/SqlValidatorTest.java | 36 +++-
.../apache/calcite/test/SqlToRelConverterTest.xml | 72 +++++++-
core/src/test/resources/sql/stream.iq | 28 +++
.../apache/calcite/linq4j/EnumerableDefaults.java | 32 ----
site/_docs/reference.md | 29 ++-
14 files changed, 480 insertions(+), 106 deletions(-)
diff --git a/core/src/main/codegen/templates/Parser.jj b/core/src/main/codegen/templates/Parser.jj
index b0c1395..18c3242 100644
--- a/core/src/main/codegen/templates/Parser.jj
+++ b/core/src/main/codegen/templates/Parser.jj
@@ -6050,19 +6050,31 @@ SqlCall GroupByWindowingCall():
{
final Span s;
final List<SqlNode> args;
+ final SqlOperator op;
}
{
(
- <TUMBLE> { s = span(); }
- args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
- return SqlStdOperatorTable.TUMBLE.createCall(s.end(this), args);
+ <TUMBLE>
+ {
+ s = span();
+ op = SqlStdOperatorTable.TUMBLE_OLD;
+ }
+ |
+ <HOP>
+ {
+ s = span();
+ op = SqlStdOperatorTable.HOP_OLD;
}
|
- <HOP> { s = span(); }
- args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
- return SqlStdOperatorTable.HOP.createCall(s.end(this), args);
+ <SESSION>
+ {
+ s = span();
+ op = SqlStdOperatorTable.SESSION_OLD;
}
)
+ args = UnquantifiedFunctionParameterList(ExprContext.ACCEPT_SUB_QUERY) {
+ return op.createCall(s.end(this), args);
+ }
}
SqlCall MatchRecognizeFunctionCall() :
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index b50a832..77779a4 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -45,6 +45,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.runtime.SortedMultiMap;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Pair;
@@ -59,8 +60,10 @@ import java.math.BigDecimal;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
/**
* Utilities for generating programs in the Enumerable (functional)
@@ -748,8 +751,12 @@ public class EnumUtils {
return Expressions.lambda(Predicate2.class, builder.toBlock(), left_, right_);
}
- /** Generates a window selector which appends attribute of the window based on
- * the parameters. */
+ /**
+ * Generates a window selector which appends attribute of the window based on
+ * the parameters.
+ *
+ * Note that it only works for batch scenario. E.g. all data is known and there is no late data.
+ */
static Expression tumblingWindowSelector(
PhysType inputPhysType,
PhysType outputPhysType,
@@ -802,6 +809,147 @@ public class EnumUtils {
}
/**
+ * Creates enumerable implementation that applies sessionization to elements from the input
+ * enumerator based on a specified key. Elements are windowed into sessions separated by
+ * periods with no input for at least the duration specified by gap parameter.
+ */
+ public static Enumerable<Object[]> sessionize(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new SessionizationEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+ }
+ };
+ }
+
+ private static class SessionizationEnumerator implements Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final int indexOfKeyColumn;
+ private final long gap;
+ private LinkedList<Object[]> list;
+ private boolean initialized;
+
+ /**
+ * Note that it only works for batch scenario. E.g. all data is known and there is no
+ * late data.
+ *
+ * @param inputEnumerator the enumerator to provide an array of objects as input
+ * @param indexOfWatermarkedColumn the index of timestamp column upon which a watermark is built
+ * @param indexOfKeyColumn the index of column that acts as grouping key
+ * @param gap gap parameter
+ */
+ SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.indexOfKeyColumn = indexOfKeyColumn;
+ this.gap = gap;
+ list = new LinkedList<>();
+ initialized = false;
+ }
+
+ @Override public Object[] current() {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return list.pollFirst();
+ }
+
+ @Override public boolean moveNext() {
+ return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+ }
+
+ @Override public void reset() {
+ list.clear();
+ inputEnumerator.reset();
+ initialized = false;
+ }
+
+ @Override public void close() {
+ list.clear();
+ inputEnumerator.close();
+ initialized = false;
+ }
+
+ private void initialize() {
+ List<Object[]> elements = new ArrayList<>();
+ // initialize() will be called when inputEnumerator.moveNext() is true,
+ // thus firstly should take the current element.
+ elements.add(inputEnumerator.current());
+ // sessionization needs to see all data.
+ while (inputEnumerator.moveNext()) {
+ elements.add(inputEnumerator.current());
+ }
+
+ Map<Object, SortedMultiMap<Pair<Long, Long>, Object[]>> sessionKeyMap = new HashMap<>();
+ for (Object[] element : elements) {
+ sessionKeyMap.putIfAbsent(element[indexOfKeyColumn], new SortedMultiMap<>());
+ Pair initWindow = computeInitWindow(
+ SqlFunctions.toLong(element[indexOfWatermarkedColumn]), gap);
+ sessionKeyMap.get(element[indexOfKeyColumn]).putMulti(initWindow, element);
+ }
+
+ // merge per key session windows if there is any overlap between windows.
+ for (Map.Entry<Object, SortedMultiMap<Pair<Long, Long>, Object[]>> perKeyEntry
+ : sessionKeyMap.entrySet()) {
+ Map<Pair<Long, Long>, List<Object[]>> finalWindowElementsMap = new HashMap<>();
+ Pair<Long, Long> currentWindow = null;
+ List<Object[]> tempElementList = new ArrayList<>();
+ for (Map.Entry<Pair<Long, Long>, List<Object[]>> sessionEntry
+ : perKeyEntry.getValue().entrySet()) {
+ // check the next window can be merged.
+ if (currentWindow == null || !isOverlapped(currentWindow, sessionEntry.getKey())) {
+ // cannot merge window as there is no overlap
+ if (currentWindow != null) {
+ finalWindowElementsMap.put(currentWindow, new ArrayList<>(tempElementList));
+ }
+
+ currentWindow = sessionEntry.getKey();
+ tempElementList.clear();
+ tempElementList.addAll(sessionEntry.getValue());
+ } else {
+ // merge windows.
+ currentWindow = mergeWindows(currentWindow, sessionEntry.getKey());
+ // merge elements in windows.
+ tempElementList.addAll(sessionEntry.getValue());
+ }
+ }
+
+ if (!tempElementList.isEmpty()) {
+ finalWindowElementsMap.put(currentWindow, new ArrayList<>(tempElementList));
+ }
+
+ // construct final results from finalWindowElementsMap.
+ for (Map.Entry<Pair<Long, Long>, List<Object[]>> finalWindowElementsEntry
+ : finalWindowElementsMap.entrySet()) {
+ for (Object[] element : finalWindowElementsEntry.getValue()) {
+ Object[] curWithWindow = new Object[element.length + 2];
+ System.arraycopy(element, 0, curWithWindow, 0, element.length);
+ curWithWindow[element.length] = finalWindowElementsEntry.getKey().left;
+ curWithWindow[element.length + 1] = finalWindowElementsEntry.getKey().right;
+ list.offer(curWithWindow);
+ }
+ }
+ }
+ }
+
+ private boolean isOverlapped(Pair<Long, Long> a, Pair<Long, Long> b) {
+ return !(b.left >= a.right);
+ }
+
+ private Pair<Long, Long> mergeWindows(Pair<Long, Long> a, Pair<Long, Long> b) {
+ return new Pair<>(a.left <= b.left ? a.left : b.left, a.right >= b.right ? a.right : b.right);
+ }
+
+ private Pair<Long, Long> computeInitWindow(long ts, long gap) {
+ return new Pair<>(ts, ts + gap);
+ }
+ }
+
+ /**
* Create enumerable implementation that applies hopping on each element from the input
* enumerator and produces at least one element for each input element.
*/
@@ -822,11 +970,19 @@ public class EnumUtils {
private final long intervalSize;
private LinkedList<Object[]> list;
+ /**
+ * Note that it only works for batch scenario. E.g. all data is known and there is no late data.
+ *
+ * @param inputEnumerator the enumerator to provide an array of objects as input
+ * @param indexOfWatermarkedColumn the index of timestamp column upon which a watermark is built
+ * @param slide sliding size
+ * @param intervalSize window size
+ */
HopEnumerator(Enumerator<Object[]> inputEnumerator,
- int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+ int indexOfWatermarkedColumn, long slide, long intervalSize) {
this.inputEnumerator = inputEnumerator;
this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
- this.emitFrequency = emitFrequency;
+ this.emitFrequency = slide;
this.intervalSize = intervalSize;
list = new LinkedList<>();
}
@@ -850,10 +1006,7 @@ public class EnumUtils {
}
public boolean moveNext() {
- if (list.size() > 0) {
- return true;
- }
- return inputEnumerator.moveNext();
+ return list.size() > 0 || inputEnumerator.moveNext();
}
public void reset() {
@@ -879,4 +1032,36 @@ public class EnumUtils {
}
return ret;
}
+
+ /**
+ * Apply tumbling per row from the enumerable input.
+ */
+ public static <TSource, TResult> Enumerable<TResult> tumbling(
+ Enumerable<TSource> inputEnumerable,
+ Function1<TSource, TResult> outSelector) {
+ return new AbstractEnumerable<TResult>() {
+ // Applies tumbling on each element from the input enumerator and produces
+ // exactly one element for each input element.
+ @Override public Enumerator<TResult> enumerator() {
+ return new Enumerator<TResult>() {
+ Enumerator<TSource> inputs = inputEnumerable.enumerator();
+
+ public TResult current() {
+ return outSelector.apply(inputs.current());
+ }
+
+ public boolean moveNext() {
+ return inputs.moveNext();
+ }
+
+ public void reset() {
+ inputs.reset();
+ }
+
+ public void close() {
+ }
+ };
+ }
+ };
+ }
}
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
index 77a366f..230e9de 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java
@@ -184,7 +184,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.GROUPING_ID;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.HOP_TVF;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.HOP;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INITCAP;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.INTERSECTION;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.IS_A_SET;
@@ -267,6 +267,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.REPLACE;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.ROUND;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.ROW;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.ROW_NUMBER;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SESSION;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SESSION_USER;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SIGN;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SIMILAR_TO;
@@ -283,7 +284,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.SYSTEM_USER;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TAN;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TRIM;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TRUNCATE;
-import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TUMBLE_TVF;
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.TUMBLE;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.UNARY_MINUS;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.UNARY_PLUS;
import static org.apache.calcite.sql.fun.SqlStdOperatorTable.UPPER;
@@ -673,8 +674,9 @@ public class RexImpTable {
matchMap.put(CLASSIFIER, ClassifierImplementor::new);
matchMap.put(LAST, LastImplementor::new);
map.put(PREV, new PrevImplementor());
- tvfImplementorMap.put(TUMBLE_TVF, TumbleImplementor::new);
- tvfImplementorMap.put(HOP_TVF, HopImplementor::new);
+ tvfImplementorMap.put(TUMBLE, TumbleImplementor::new);
+ tvfImplementorMap.put(HOP, HopImplementor::new);
+ tvfImplementorMap.put(SESSION, SessionImplementor::new);
}
private <T> Supplier<T> constructorSupplier(Class<T> klass) {
@@ -3239,23 +3241,50 @@ public class RexImpTable {
private static class HopImplementor implements TableFunctionCallImplementor {
@Override public Expression implement(RexToLixTranslator translator,
Expression inputEnumerable, RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
- Expression intervalExpression = translator.translate(call.getOperands().get(2));
- Expression intervalExpression2 = translator.translate(call.getOperands().get(3));
- RexCall descriptor = (RexCall) call.getOperands().get(1);
+ Expression slidingInterval = translator.translate(call.getOperands().get(1));
+ Expression windowSize = translator.translate(call.getOperands().get(2));
+ RexCall descriptor = (RexCall) call.getOperands().get(0);
List<Expression> translatedOperands = new ArrayList<>();
Expression wmColIndexExpr =
Expressions.constant(((RexInputRef) descriptor.getOperands().get(0)).getIndex());
translatedOperands.add(wmColIndexExpr);
- translatedOperands.add(intervalExpression);
- translatedOperands.add(intervalExpression2);
+ translatedOperands.add(slidingInterval);
+ translatedOperands.add(windowSize);
return Expressions.call(
BuiltInMethod.HOPPING.method,
Expressions.list(
Expressions.call(inputEnumerable, BuiltInMethod.ENUMERABLE_ENUMERATOR.method),
- wmColIndexExpr,
- intervalExpression,
- intervalExpression2));
+ translatedOperands.get(0),
+ translatedOperands.get(1),
+ translatedOperands.get(2)));
+ }
+ }
+
+ /** Implements per-key sessionization. */
+ private static class SessionImplementor implements TableFunctionCallImplementor {
+ @Override public Expression implement(RexToLixTranslator translator,
+ Expression inputEnumerable, RexCall call, PhysType inputPhysType, PhysType outputPhysType) {
+ RexCall timestampDescriptor = (RexCall) call.getOperands().get(0);
+ RexCall keyDescriptor = (RexCall) call.getOperands().get(1);
+ Expression gapInterval = translator.translate(call.getOperands().get(2));
+
+ List<Expression> translatedOperands = new ArrayList<>();
+ Expression wmColIndexExpr =
+ Expressions.constant(((RexInputRef) timestampDescriptor.getOperands().get(0)).getIndex());
+ Expression keyColIndexExpr =
+ Expressions.constant(((RexInputRef) keyDescriptor.getOperands().get(0)).getIndex());
+ translatedOperands.add(wmColIndexExpr);
+ translatedOperands.add(keyColIndexExpr);
+ translatedOperands.add(gapInterval);
+
+ return Expressions.call(
+ BuiltInMethod.SESSIONIZATION.method,
+ Expressions.list(
+ Expressions.call(inputEnumerable, BuiltInMethod.ENUMERABLE_ENUMERATOR.method),
+ translatedOperands.get(0),
+ translatedOperands.get(1),
+ translatedOperands.get(2)));
}
}
}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
index d32b909..43b2b91 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
@@ -40,8 +40,6 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
@Override public boolean checkOperandTypes(SqlCallBinding callBinding,
boolean throwOnFailure) {
- // There should only be three operands, and number of operands are checked before
- // this call.
final SqlNode operand0 = callBinding.operand(0);
final SqlValidator validator = callBinding.getValidator();
final RelDataType type = validator.getValidatedNodeType(operand0);
@@ -65,7 +63,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
}
@Override public String getAllowedSignatures(String opNameToUse) {
- return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), "
+ return getName() + "(TABLE table_name, DESCRIPTOR(col), "
+ "datetime interval, datetime interval)";
}
}
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
similarity index 76%
copy from core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
copy to core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
index d32b909..fef50cc 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java
@@ -23,15 +23,16 @@ import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.SqlValidator;
/**
- * SqlHopTableFunction implements an operator for hopping. It allows four parameters:
+ * SqlSessionTableFunction implements an operator for per-key sessionization. It allows
+ * four parameters:
* 1. a table.
* 2. a descriptor to provide a watermarked column name from the input table.
- * 3. an interval parameter to specify the length of window shifting.
- * 4. an interval parameter to specify the length of window size.
+ * 3. a descriptor to provide a column as key, on which sessionization will be applied.
+ * 4. an interval parameter to specify a inactive activity gap to break sessions.
*/
-public class SqlHopTableFunction extends SqlWindowTableFunction {
- public SqlHopTableFunction() {
- super(SqlKind.HOP.name());
+public class SqlSessionTableFunction extends SqlWindowTableFunction {
+ public SqlSessionTableFunction() {
+ super(SqlKind.SESSION.name());
}
@Override public SqlOperandCountRange getOperandCountRange() {
@@ -40,8 +41,6 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
@Override public boolean checkOperandTypes(SqlCallBinding callBinding,
boolean throwOnFailure) {
- // There should only be three operands, and number of operands are checked before
- // this call.
final SqlNode operand0 = callBinding.operand(0);
final SqlValidator validator = callBinding.getValidator();
final RelDataType type = validator.getValidatedNodeType(operand0);
@@ -53,10 +52,11 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList());
- final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2));
- if (!SqlTypeUtil.isInterval(type2)) {
+ final SqlNode operand2 = callBinding.operand(2);
+ if (operand2.getKind() != SqlKind.DESCRIPTOR) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
}
+ validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand2).getOperandList());
final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3));
if (!SqlTypeUtil.isInterval(type3)) {
return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure);
@@ -65,7 +65,7 @@ public class SqlHopTableFunction extends SqlWindowTableFunction {
}
@Override public String getAllowedSignatures(String opNameToUse) {
- return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), "
- + "datetime interval, datetime interval)";
+ return getName() + "(TABLE table_name, DESCRIPTOR(col), "
+ + "DESCRIPTOR(col), datetime interval)";
}
}
diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
index 526df03..e026b95 100644
--- a/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
+++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java
@@ -45,6 +45,7 @@ import org.apache.calcite.sql.SqlPrefixOperator;
import org.apache.calcite.sql.SqlProcedureCallOperator;
import org.apache.calcite.sql.SqlRankFunction;
import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSessionTableFunction;
import org.apache.calcite.sql.SqlSetOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
@@ -2294,11 +2295,14 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
/** DESCRIPTOR(column_name, ...). */
public static final SqlOperator DESCRIPTOR = new SqlDescriptorOperator();
- /** TUMBLE as a table-value function. */
- public static final SqlFunction TUMBLE_TVF = new SqlTumbleTableFunction();
+ /** TUMBLE as a table function. */
+ public static final SqlFunction TUMBLE = new SqlTumbleTableFunction();
- /** HOP as a table-value function. */
- public static final SqlFunction HOP_TVF = new SqlHopTableFunction();
+ /** HOP as a table function. */
+ public static final SqlFunction HOP = new SqlHopTableFunction();
+
+ /** SESSION as a table function. */
+ public static final SqlFunction SESSION = new SqlSessionTableFunction();
/** The {@code TUMBLE} group function.
*
@@ -2313,7 +2317,7 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
* this TUMBLE group function, and in fact all group functions. See
* [CALCITE-3340] for details.
*/
- public static final SqlGroupedWindowFunction TUMBLE =
+ public static final SqlGroupedWindowFunction TUMBLE_OLD =
new SqlGroupedWindowFunction("$TUMBLE", SqlKind.TUMBLE,
null, ReturnTypes.ARG0, null,
OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
@@ -2327,15 +2331,15 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
/** The {@code TUMBLE_START} auxiliary function of
* the {@code TUMBLE} group function. */
public static final SqlGroupedWindowFunction TUMBLE_START =
- TUMBLE.auxiliary(SqlKind.TUMBLE_START);
+ TUMBLE_OLD.auxiliary(SqlKind.TUMBLE_START);
/** The {@code TUMBLE_END} auxiliary function of
* the {@code TUMBLE} group function. */
public static final SqlGroupedWindowFunction TUMBLE_END =
- TUMBLE.auxiliary(SqlKind.TUMBLE_END);
+ TUMBLE_OLD.auxiliary(SqlKind.TUMBLE_END);
/** The {@code HOP} group function. */
- public static final SqlGroupedWindowFunction HOP =
+ public static final SqlGroupedWindowFunction HOP_OLD =
new SqlGroupedWindowFunction("$HOP", SqlKind.HOP, null,
ReturnTypes.ARG0, null,
OperandTypes.or(OperandTypes.DATETIME_INTERVAL_INTERVAL,
@@ -2349,16 +2353,16 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
/** The {@code HOP_START} auxiliary function of
* the {@code HOP} group function. */
public static final SqlGroupedWindowFunction HOP_START =
- HOP.auxiliary(SqlKind.HOP_START);
+ HOP_OLD.auxiliary(SqlKind.HOP_START);
/** The {@code HOP_END} auxiliary function of
* the {@code HOP} group function. */
public static final SqlGroupedWindowFunction HOP_END =
- HOP.auxiliary(SqlKind.HOP_END);
+ HOP_OLD.auxiliary(SqlKind.HOP_END);
/** The {@code SESSION} group function. */
- public static final SqlGroupedWindowFunction SESSION =
- new SqlGroupedWindowFunction(SqlKind.SESSION.name(), SqlKind.SESSION,
+ public static final SqlGroupedWindowFunction SESSION_OLD =
+ new SqlGroupedWindowFunction("$SESSION", SqlKind.SESSION,
null, ReturnTypes.ARG0, null,
OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
OperandTypes.DATETIME_INTERVAL_TIME),
@@ -2371,12 +2375,12 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
/** The {@code SESSION_START} auxiliary function of
* the {@code SESSION} group function. */
public static final SqlGroupedWindowFunction SESSION_START =
- SESSION.auxiliary(SqlKind.SESSION_START);
+ SESSION_OLD.auxiliary(SqlKind.SESSION_START);
/** The {@code SESSION_END} auxiliary function of
* the {@code SESSION} group function. */
public static final SqlGroupedWindowFunction SESSION_END =
- SESSION.auxiliary(SqlKind.SESSION_END);
+ SESSION_OLD.auxiliary(SqlKind.SESSION_END);
/** {@code |} operator to create alternate patterns
* within {@code MATCH_RECOGNIZE}.
@@ -2495,13 +2499,13 @@ public class SqlStdOperatorTable extends ReflectiveSqlOperatorTable {
switch (kind) {
case TUMBLE_START:
case TUMBLE_END:
- return TUMBLE;
+ return TUMBLE_OLD;
case HOP_START:
case HOP_END:
- return HOP;
+ return HOP_OLD;
case SESSION_START:
case SESSION_END:
- return SESSION;
+ return SESSION_OLD;
default:
return null;
}
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
index a69405c..c0bc1a1 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/AuxiliaryConverter.java
@@ -25,7 +25,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
/** Converts an expression for a group window function (e.g. TUMBLE)
* into an expression for an auxiliary group function (e.g. TUMBLE_START).
*
- * @see SqlStdOperatorTable#TUMBLE
+ * @see SqlStdOperatorTable#TUMBLE_OLD
*/
public interface AuxiliaryConverter {
/** Converts an expression.
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index fc60dcc..f0b8f81 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -592,8 +592,10 @@ public enum BuiltInMethod {
"resultSelector", Function2.class),
AGG_LAMBDA_FACTORY_ACC_SINGLE_GROUP_RESULT_SELECTOR(AggregateLambdaFactory.class,
"singleGroupResultSelector", Function1.class),
- TUMBLING(EnumerableDefaults.class, "tumbling", Enumerable.class, Function1.class),
+ TUMBLING(EnumUtils.class, "tumbling", Enumerable.class, Function1.class),
HOPPING(EnumUtils.class, "hopping", Enumerator.class, int.class, long.class,
+ long.class),
+ SESSIONIZATION(EnumUtils.class, "sessionize", Enumerator.class, int.class, int.class,
long.class);
public final Method method;
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 7f8dc7e..3144d1e 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -1805,18 +1805,53 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
sql(sql).ok();
}
- @Test void testTableValuedFunctionTumble() {
+ @Test void testTableFunctionTumble() {
final String sql = "select *\n"
+ "from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE))";
sql(sql).ok();
}
- @Test void testTableValuedFunctionTumbleWithSubQueryParam() {
+ @Test public void testTableFunctionHop() {
+ final String sql = "select *\n"
+ + "from table(hop(table Shipments, descriptor(rowtime), "
+ + "INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test public void testTableFunctionSession() {
+ final String sql = "select *\n"
+ + "from table(session(table Shipments, descriptor(rowtime), "
+ + "descriptor(orderId), INTERVAL '10' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test void testTableFunctionTumbleWithSubQueryParam() {
final String sql = "select *\n"
+ "from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))";
sql(sql).ok();
}
+ @Test public void testTableFunctionHopWithSubQueryParam() {
+ final String sql = "select *\n"
+ + "from table(hop((select * from Shipments), descriptor(rowtime), "
+ + "INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test public void testTableFunctionSessionWithSubQueryParam() {
+ final String sql = "select *\n"
+ + "from table(session((select * from Shipments), descriptor(rowtime), "
+ + "descriptor(orderId), INTERVAL '10' MINUTE))";
+ sql(sql).ok();
+ }
+
+ @Test public void testTableFunctionSessionCompoundSessionKey() {
+ final String sql = "select *\n"
+ + "from table(session(table Orders, descriptor(rowtime), "
+ + "descriptor(orderId, productId), INTERVAL '10' MINUTE))";
+ sql(sql).ok();
+ }
+
@Test void testNotNotIn() {
final String sql = "select * from EMP where not (ename not in ('Fred') )";
sql(sql).ok();
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 241898e..f199035 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -10248,24 +10248,54 @@ class SqlValidatorTest extends SqlValidatorTestCase {
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, "
+ "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
- + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+ + "col\\), datetime interval, datetime interval\\)");
sql("select * from table(\n"
+ "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
- + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+ + "col\\), datetime interval, datetime interval\\)");
sql("select * from table(\n"
+ "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)")
.fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) "
+ "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, "
+ "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\("
- + "col1, col2 \\.\\.\\.\\), datetime interval, datetime interval\\)");
+ + "col\\), datetime interval, datetime interval\\)");
sql("select * from table(\n"
+ "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))")
.fails("Object 'TABLER_NOT_EXIST' not found");
}
+ @Test public void testSessionTableFunction() {
+ sql("select * from table(\n"
+ + "session(table orders, descriptor(rowtime), descriptor(productid), interval '1' hour))")
+ .ok();
+ sql("select * from table(\n"
+ + "^session(table orders, descriptor(rowtime), interval '2' hour)^)")
+ .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments");
+ sql("select * from table(\n"
+ + "^session(table orders, descriptor(rowtime), descriptor(productid), 'test')^)")
+ .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <COLUMN_LIST>, "
+ + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
+ + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+ sql("select * from table(\n"
+ + "^session(table orders, descriptor(rowtime), 'test', interval '2' hour)^)")
+ .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, "
+ + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
+ + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+ sql("select * from table(\n"
+ + "^session(table orders, 'test', descriptor(productid), interval '2' hour)^)")
+ .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\("
+ + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <COLUMN_LIST>, "
+ + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\("
+ + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)");
+ sql("select * from table(\n"
+ + "session(TABLE ^tabler_not_exist^, descriptor(rowtime), descriptor(productid), interval '1' hour))")
+ .fails("Object 'TABLER_NOT_EXIST' not found");
+ }
+
@Test public void testStreamTumble() {
// TUMBLE
sql("select stream tumble_end(rowtime, interval '2' hour) as rowtime\n"
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index c19e32a..fb92f25 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -3576,7 +3576,7 @@ group by session(rowtime, interval '1' hour)]]>
LogicalDelta
LogicalProject(ROWTIME=[$0], EXPR$1=[$0], C=[$1])
LogicalAggregate(group=[{0}], C=[COUNT()])
- LogicalProject($f0=[SESSION($0, 3600000:INTERVAL HOUR)])
+ LogicalProject($f0=[$SESSION($0, 3600000:INTERVAL HOUR)])
LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
]]>
</Resource>
@@ -4931,6 +4931,34 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTableFunctionHop">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(hop(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTableFunctionSession">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(session(table Shipments, descriptor(rowtime), descriptor(orderId), INTERVAL '10' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTableFunctionTumbleWithSubQueryParam">
<Resource name="sql">
<![CDATA[select *
@@ -4945,6 +4973,48 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTableFunctionHopWithSubQueryParam">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(hop((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE, INTERVAL '2' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTableFunctionSessionWithSubQueryParam">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(session((select * from Shipments), descriptor(rowtime), descriptor(orderId), INTERVAL '10' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
+ LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ORDERID=[$0], ROWTIME=[$1])
+ LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTableFunctionSessionCompoundSessionKey">
+ <Resource name="sql">
+ <![CDATA[select *
+from table(session(table Orders, descriptor(rowtime), descriptor(orderId, productId), INTERVAL '10' MINUTE))]]>
+ </Resource>
+ <Resource name="plan">
+ <![CDATA[
+LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2], window_start=[$3], window_end=[$4])
+ LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($0), DESCRIPTOR($2, $1), 600000:INTERVAL MINUTE)], rowType=[RecordType(TIMESTAMP(0) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
+ LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2])
+ LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTumbleTableRowtimeNotFirstColumn">
<Resource name="sql">
<![CDATA[select stream
diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq
index 15f5832..ba208a7 100644
--- a/core/src/test/resources/sql/stream.iq
+++ b/core/src/test/resources/sql/stream.iq
@@ -82,3 +82,31 @@ SELECT * FROM TABLE(HOP((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), INTERVAL '5
(10 rows)
!ok
+
+SELECT * FROM TABLE(SESSION(TABLE ORDERS, DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
+
+SELECT * FROM TABLE(SESSION((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE));
++---------------------+----+---------+-------+---------------------+---------------------+
+| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end |
++---------------------+----+---------+-------+---------------------+---------------------+
+| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 |
+| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 |
+| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 |
+| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
+| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 |
++---------------------+----+---------+-------+---------------------+---------------------+
+(5 rows)
+
+!ok
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
index ec029ae..bd9b2ab 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
@@ -4256,36 +4256,4 @@ public abstract class EnumerableDefaults {
}
};
}
-
- /**
- * Apply tumbling per row from the enumerable input.
- */
- public static <TSource, TResult> Enumerable<TResult> tumbling(
- Enumerable<TSource> inputEnumerable,
- Function1<TSource, TResult> outSelector) {
- return new AbstractEnumerable<TResult>() {
- // Applies tumbling on each element from the input enumerator and produces
- // exactly one element for each input element.
- @Override public Enumerator<TResult> enumerator() {
- return new Enumerator<TResult>() {
- Enumerator<TSource> inputs = inputEnumerable.enumerator();
-
- public TResult current() {
- return outSelector.apply(inputs.current());
- }
-
- public boolean moveNext() {
- return inputs.moveNext();
- }
-
- public void reset() {
- inputs.reset();
- }
-
- public void close() {
- }
- };
- }
- };
- }
}
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index f8aa8ec..7fd2930 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -1866,8 +1866,8 @@ Not implemented:
|:-------------------- |:-----------
| DESCRIPTOR(name [, name ]*) | DESCRIPTOR appears as an argument in a function to indicate a list of names. The interpretation of names is left to the function.
-### Table-valued functions.
-Table-valued functions occur in the `FROM` clause.
+### Table functions.
+Table functions occur in the `FROM` clause.
#### TUMBLE
In streaming queries, TUMBLE assigns a window for each row of a relation based on a timestamp column. An assigned window
@@ -1876,7 +1876,7 @@ is named as "fixed windowing".
| Operator syntax | Description
|:-------------------- |:-----------
-| TUMBLE(table, DESCRIPTOR(column_name), interval [, time ]) | Indicates a tumbling window of *interval* for *datetime*, optionally aligned at *time*. Tumbling is applied on table in which there is a watermarked column specified by descriptor.
+| TUMBLE(table, DESCRIPTOR(datetime), interval) | Indicates a tumbling window of *interval* for *datetime*.
Here is an example:
`SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))`,
@@ -1884,20 +1884,33 @@ will apply tumbling with 1 minute window size on rows from table orders. rowtime
watermarked column of table orders that tells data completeness.
#### HOP
-In streaming queries, HOP assigns windows that cover rows within the interval of *size*, shifting every *slide*,
-and optionally aligned at *time* based on a timestamp column. Windows assigned could have overlapping so hopping
-sometime is named as "sliding windowing".
+In streaming queries, HOP assigns windows that cover rows within the interval of *size* and shifting every *slide* based
+on a timestamp column. Windows assigned could have overlapping so hopping sometime is named as "sliding windowing".
| Operator syntax | Description
|:-------------------- |:-----------
-| HOP(table, DESCRIPTOR(column_name), slide, size, [, time ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide*, and optionally aligned at *time*. Hopping is applied on table in which there is a watermarked column specified by descriptor.
+| HOP(table, DESCRIPTOR(datetime), slide, size) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide*.
Here is an example:
`SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`,
-will apply hopping with 5-minute interval size on rows from table orders, shifting every 2 minutes. rowtime is the
+will apply hopping with 5-minute interval size on rows from table orders and shifting every 2 minutes. rowtime is the
watermarked column of table orders that tells data completeness.
+#### SESSION
+In streaming queries, SESSION assigns windows that cover rows based on *datetime*. Within a session window, distances
+of rows are less than *interval*. Session window is applied per *key*.
+
+
+| Operator syntax | Description
+|:-------------------- |:-----------
+| session(table, DESCRIPTOR(datetime), DESCRIPTOR(key), interval) | Indicates a session window of *interval* for *datetime*. Session window is applied per *key*.
+
+Here is an example:
+`SELECT * FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(rowtime), DESCRIPTOR(product), INTERVAL '20' MINUTE))`,
+will apply session with 20-minute inactive gap on rows from table orders. rowtime is the
+watermarked column of table orders that tells data completeness. Session is applied per product.
+
### Grouped window functions
**warning**: grouped window functions are deprecated.