You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/22 11:28:53 UTC
[flink] branch master updated: [FLINK-12566][table] Remove row
interval type
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 02a0cf3 [FLINK-12566][table] Remove row interval type
02a0cf3 is described below
commit 02a0cf3d4e26f0217b87b7aaef604a6f39a56977
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 11:43:28 2019 +0200
[FLINK-12566][table] Remove row interval type
This closes #8497.
---
.../table/expressions/ApiExpressionUtils.java | 5 ++-
.../table/expressions/ValueLiteralExpression.java | 3 --
.../flink/table/typeutils/RowIntervalTypeInfo.java | 41 ----------------------
.../codegen/agg/batch/WindowCodeGenerator.scala | 20 +++++------
.../flink/table/plan/util/AggregateUtil.scala | 33 +++++++++--------
.../expressions/rules/OverWindowResolverRule.java | 4 +--
.../operations/AggregateOperationFactory.java | 12 +++----
.../table/expressions/PlannerExpressionUtils.scala | 7 ++--
.../org/apache/flink/table/expressions/call.scala | 14 ++++----
.../apache/flink/table/expressions/literals.scala | 3 +-
.../flink/table/expressions/overOffsets.scala | 7 ++--
.../table/runtime/aggregate/AggregateUtil.scala | 7 ++--
.../table/api/batch/table/GroupWindowTest.scala | 8 ++---
.../table/GroupWindowTableAggregateTest.scala | 8 ++---
.../table/api/stream/table/GroupWindowTest.scala | 10 +++---
15 files changed, 67 insertions(+), 115 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 2bbfaa2..1ef8db3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
import java.util.Arrays;
@@ -124,10 +123,10 @@ public final class ApiExpressionUtils {
public static Expression toRowInterval(Expression e) {
final Optional<Expression> intInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
- .map((v) -> valueLiteral((long) v, RowIntervalTypeInfo.INTERVAL_ROWS));
+ .map((v) -> valueLiteral((long) v, BasicTypeInfo.LONG_TYPE_INFO));
final Optional<Expression> longInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.LONG_TYPE_INFO)
- .map((v) -> valueLiteral(v, RowIntervalTypeInfo.INTERVAL_ROWS));
+ .map((v) -> valueLiteral(v, BasicTypeInfo.LONG_TYPE_INFO));
if (intInterval.isPresent()) {
return intInterval.get();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
index f8138f6..186c90c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
import org.apache.flink.util.Preconditions;
@@ -109,8 +108,6 @@ public final class ValueLiteralExpression implements Expression {
return value + ".millis";
} else if (type.equals(TimeIntervalTypeInfo.INTERVAL_MONTHS)) {
return value + ".months";
- } else if (type.equals(RowIntervalTypeInfo.INTERVAL_ROWS)) {
- return value + ".rows";
} else {
return stringifyValue(value);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java
deleted file mode 100644
index a7cd362..0000000
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Type information for row intervals.
- */
-@Internal
-public final class RowIntervalTypeInfo extends InternalTypeInfo<Long> {
-
- private static final long serialVersionUID = -1306179424364925258L;
-
- public static final RowIntervalTypeInfo INTERVAL_ROWS = new RowIntervalTypeInfo();
-
- private RowIntervalTypeInfo() {
- super(Long.class);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof RowIntervalTypeInfo;
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
index 121de16..2e42df3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
@@ -18,6 +18,12 @@
package org.apache.flink.table.codegen.agg.batch
+import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.tools.RelBuilder
+import org.apache.commons.math3.util.ArithmeticUtils
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
import org.apache.flink.table.`type`.{InternalType, InternalTypes, RowType}
import org.apache.flink.table.api.Types
@@ -30,9 +36,9 @@ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.codegen.OperatorCodeGenerator.generateCollect
import org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping, genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, genInitFlatAggregateBuffer}
import org.apache.flink.table.codegen.agg.batch.WindowCodeGenerator.{asLong, isTimeIntervalLiteral}
-import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, ExprCodeGenerator, GenerateUtils, GeneratedExpression}
+import org.apache.flink.table.codegen._
import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, JoinedRow}
-import org.apache.flink.table.expressions.ExpressionBuilder.{ifThenElse, lessThan, literal, minus, mod, plus, reinterpretCast, times, typeLiteral}
+import org.apache.flink.table.expressions.ExpressionBuilder._
import org.apache.flink.table.expressions.{Expression, RexNodeConverter, ValueLiteralExpression}
import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
@@ -41,13 +47,7 @@ import org.apache.flink.table.plan.logical.{LogicalWindow, SlidingGroupWindow, T
import org.apache.flink.table.plan.util.{AggregateInfoList, AggregateUtil}
import org.apache.flink.table.runtime.util.RowIterator
import org.apache.flink.table.runtime.window.grouping.{HeapWindowsGrouping, WindowsGrouping}
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-
-import org.apache.calcite.avatica.util.DateTimeUtils
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
-import org.apache.commons.math3.util.ArithmeticUtils
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
abstract class WindowCodeGenerator(
relBuilder: RelBuilder,
@@ -752,7 +752,7 @@ object WindowCodeGenerator {
def asLong(expr: Expression): Long = expr match {
case literal: ValueLiteralExpression
if literal.getType == TimeIntervalTypeInfo.INTERVAL_MILLIS ||
- literal.getType == RowIntervalTypeInfo.INTERVAL_ROWS =>
+ literal.getType == BasicTypeInfo.LONG_TYPE_INFO =>
literal.getValue.asInstanceOf[java.lang.Long]
case _ => throw new IllegalArgumentException()
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 9f5d627..3d1798b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -17,7 +17,18 @@
*/
package org.apache.flink.table.plan.util
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import java.lang.{Long => JLong}
+import java.time.Duration
+import java.util
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.sql.fun._
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
import org.apache.flink.table.`type`.InternalTypes._
import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes, TypeConverters}
import org.apache.flink.table.api.{TableConfig, TableConfigOptions, TableException}
@@ -26,7 +37,7 @@ import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType
import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec}
-import org.apache.flink.table.expressions.{FieldReferenceExpression, ProctimeAttribute, RexNodeConverter, RowtimeAttribute, ValueLiteralExpression, WindowEnd, WindowStart}
+import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
import org.apache.flink.table.functions.sql.{FlinkSqlOperatorTable, SqlConcatAggFunction, SqlFirstLastValueAggFunction}
import org.apache.flink.table.functions.utils.AggSqlFunction
@@ -34,19 +45,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
import org.apache.flink.table.plan.`trait`.RelModifiedMonotonicity
import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger
-import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryStringTypeInfo, DecimalTypeInfo, MapViewTypeInfo, RowIntervalTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
-
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
-import org.apache.calcite.rex.RexInputRef
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.validate.SqlMonotonicity
-import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
-import org.apache.calcite.tools.RelBuilder
-
-import java.lang.{Long => JLong}
-import java.time.Duration
-import java.util
+import org.apache.flink.table.typeutils._
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -714,11 +713,11 @@ object AggregateUtil extends Enumeration {
}
def isRowIntervalType(intervalType: TypeInformation[_]): Boolean = {
- intervalType == RowIntervalTypeInfo.INTERVAL_ROWS
+ intervalType == BasicTypeInfo.LONG_TYPE_INFO
}
def toLong(literalExpr: ValueLiteralExpression): JLong = {
- if (literalExpr.getType == RowIntervalTypeInfo.INTERVAL_ROWS) {
+ if (literalExpr.getType == BasicTypeInfo.LONG_TYPE_INFO) {
literalExpr.getValue match {
case v: JLong => v
case _ => throw new IllegalArgumentException()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index c3603d7..ca726c0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -19,13 +19,13 @@
package org.apache.flink.table.expressions.rules;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.PlannerExpression;
import org.apache.flink.table.plan.logical.LogicalOverWindow;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
import java.util.ArrayList;
import java.util.List;
@@ -84,7 +84,7 @@ final class OverWindowResolverRule implements ResolverRule {
private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
return referenceWindow.following().orElseGet(() -> {
PlannerExpression preceding = resolutionContext.bridge(referenceWindow.preceding());
- if (preceding.resultType() instanceof RowIntervalTypeInfo) {
+ if (preceding.resultType() == BasicTypeInfo.LONG_TYPE_INFO) {
return new CallExpression(BuiltInFunctionDefinitions.CURRENT_ROW, emptyList());
} else {
return new CallExpression(BuiltInFunctionDefinitions.CURRENT_RANGE, emptyList());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
index 30eaca5..1e6d84f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
@@ -49,7 +49,6 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils;
import org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
@@ -69,7 +68,6 @@ import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGA
import static org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.SLIDE;
import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.TUMBLE;
-import static org.apache.flink.table.typeutils.RowIntervalTypeInfo.INTERVAL_ROWS;
import static org.apache.flink.table.typeutils.TimeIntervalTypeInfo.INTERVAL_MILLIS;
/**
@@ -204,7 +202,7 @@ public class AggregateOperationFactory {
* <li>The alias is represented with an unresolved reference</li>
* <li>The time attribute is a single field reference of a {@link TimeIndicatorTypeInfo}(stream),
* {@link SqlTimeTypeInfo}(batch), or {@link BasicTypeInfo#LONG_TYPE_INFO}(batch) type</li>
- * <li>The size & slide are value literals of either {@link RowIntervalTypeInfo#INTERVAL_ROWS},
+ * <li>The size & slide are value literals of either {@link BasicTypeInfo#LONG_TYPE_INFO},
* or {@link TimeIntervalTypeInfo} type</li>
* <li>The size & slide are of the same type</li>
* <li>The gap is a value literal of a {@link TimeIntervalTypeInfo} type</li>
@@ -292,7 +290,7 @@ public class AggregateOperationFactory {
"A tumble window expects a size value literal.");
TypeInformation<?> sizeType = windowSize.getType();
- if (sizeType != INTERVAL_ROWS && sizeType != INTERVAL_MILLIS) {
+ if (sizeType != LONG_TYPE_INFO && sizeType != INTERVAL_MILLIS) {
throw new ValidationException(
"Tumbling window expects size literal of type Interval of Milliseconds or Interval of Rows.");
}
@@ -316,7 +314,7 @@ public class AggregateOperationFactory {
TypeInformation<?> windowSizeType = windowSize.getType();
- if (windowSizeType != INTERVAL_ROWS && windowSizeType != INTERVAL_MILLIS) {
+ if (windowSizeType != LONG_TYPE_INFO && windowSizeType != INTERVAL_MILLIS) {
throw new ValidationException(
"A sliding window expects size literal of type Interval of Milliseconds or Interval of Rows.");
}
@@ -353,7 +351,7 @@ public class AggregateOperationFactory {
}
private void validateWindowIntervalType(FieldReferenceExpression timeField, TypeInformation<?> intervalType) {
- if (isRowTimeIndicator(timeField) && intervalType == INTERVAL_ROWS) {
+ if (isRowTimeIndicator(timeField) && intervalType == LONG_TYPE_INFO) {
// unsupported row intervals on event-time
throw new ValidationException(
"Event-time grouping windows on row intervals in a stream environment " +
@@ -377,7 +375,7 @@ public class AggregateOperationFactory {
if (!windowProperties.isEmpty()) {
if (window.getType() == TUMBLE || window.getType() == SLIDE) {
TypeInformation<?> resultType = window.getSize().map(expressionBridge::bridge).get().resultType();
- if (resultType == INTERVAL_ROWS) {
+ if (resultType == LONG_TYPE_INFO) {
throw new ValidationException(String.format("Window start and Window end cannot be selected " +
"for a row-count %s window.", window.getType().toString().toLowerCase()));
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
index 0dfef22..7f7397f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -18,9 +18,10 @@
package org.apache.flink.table.expressions
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
object PlannerExpressionUtils {
@@ -30,7 +31,7 @@ object PlannerExpressionUtils {
}
private[flink] def isRowCountLiteral(expr: PlannerExpression): Boolean = expr match {
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) => true
case _ => false
}
@@ -61,7 +62,7 @@ object PlannerExpressionUtils {
}
private[flink] def toLong(expr: PlannerExpression): Long = expr match {
- case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
case _ => throw new IllegalArgumentException()
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index 6a050a8..77eeb8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -26,12 +26,12 @@ import org.apache.calcite.sql._
import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
import _root_.scala.collection.JavaConverters._
@@ -96,7 +96,7 @@ case class OverCall(
val partitionKeys = partitionBy.map(_.toRexNode).asJava
// assemble bounds
- val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+ val isPhysical: Boolean = preceding.resultType == BasicTypeInfo.LONG_TYPE_INFO
val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
@@ -186,9 +186,9 @@ case class OverCall(
preceding match {
case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
ValidationSuccess
- case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 =>
+ case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
ValidationSuccess
- case Literal(_, _: RowIntervalTypeInfo) =>
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
return ValidationFailure("Preceding row interval must be larger than 0.")
case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
ValidationSuccess
@@ -202,9 +202,9 @@ case class OverCall(
following match {
case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
ValidationSuccess
- case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 =>
+ case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
ValidationSuccess
- case Literal(_, _: RowIntervalTypeInfo) =>
+ case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
return ValidationFailure("Following row interval must be larger than 0.")
case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
ValidationSuccess
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
index bf39d99..03ff076 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
object Literal {
private[flink] val UTC = TimeZone.getTimeZone("UTC")
@@ -60,7 +60,6 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
- case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
case _ => s"Literal($value, $resultType)"
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
index 38068b7..ce1966f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
@@ -18,10 +18,11 @@
package org.apache.flink.table.expressions
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
case class CurrentRow() extends PlannerExpression {
- override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+ override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
override private[flink] def children = Seq()
@@ -37,7 +38,7 @@ case class CurrentRange() extends PlannerExpression {
}
case class UnboundedRow() extends PlannerExpression {
- override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+ override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
override private[flink] def children = Seq()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 33795ca..f44b4bd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -28,9 +28,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun._
import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
@@ -47,8 +46,8 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, UserDefinedAggregateFunction}
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
@@ -1842,7 +1841,7 @@ object AggregateUtil {
private[flink] def asLong(expr: PlannerExpression): Long = expr match {
case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => value
- case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
case _ => throw new IllegalArgumentException()
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index 64b6a42..d040f82 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -48,7 +48,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", "TumblingGroupWindow('w, 'long, 2.rows)"),
+ term("window", "TumblingGroupWindow('w, 'long, 2)"),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -140,7 +140,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("select", "long", "int")
),
- term("window", "TumblingGroupWindow('w, 'long, 2.rows)"),
+ term("window", "TumblingGroupWindow('w, 'long, 2)"),
term("select", "COUNT(int) AS TMP_0")
)
@@ -230,7 +230,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", "SlidingGroupWindow('w, 'long, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'long, 2, 1)"),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -301,7 +301,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("select", "long", "int")
),
- term("window", "SlidingGroupWindow('w, 'long, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'long, 2, 1)"),
term("select", "COUNT(int) AS TMP_0")
)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
index 71f9631..37796c0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
@@ -115,7 +115,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
term("select", "a", "b", "c", "e")
),
term("groupBy", "c"),
- term("window", "TumblingGroupWindow('w, 'e, 2.rows)"),
+ term("window", "TumblingGroupWindow('w, 'e, 2)"),
term("select", "c", "EmptyTableAggFunc(a, b) AS (f0, f1)")
)
@@ -191,7 +191,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
term("select", "a", "b", "c", "e")
),
term("groupBy", "c"),
- term("window", "SlidingGroupWindow('w, 'e, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'e, 2, 1)"),
term("select", "c", "EmptyTableAggFunc(a, b) AS (f0, f1)")
)
@@ -285,7 +285,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
streamTableNode(0),
term("select", "a", "b", "e")
),
- term("window", "TumblingGroupWindow('w, 'e, 2.rows)"),
+ term("window", "TumblingGroupWindow('w, 'e, 2)"),
term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)")
)
@@ -355,7 +355,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
streamTableNode(0),
term("select", "a", "b", "e")
),
- term("window", "SlidingGroupWindow('w, 'e, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'e, 2, 1)"),
term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)")
)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index 2289c2c..d1aade2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -107,7 +107,7 @@ class GroupWindowTest extends TableTestBase {
term("select", "int", "string", "proctime")
),
term("groupBy", "string"),
- term("window", "TumblingGroupWindow('w, 'proctime, 2.rows)"),
+ term("window", "TumblingGroupWindow('w, 'proctime, 2)"),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -201,7 +201,7 @@ class GroupWindowTest extends TableTestBase {
term("select", "int", "string", "proctime")
),
term("groupBy", "string"),
- term("window", "SlidingGroupWindow('w, 'proctime, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'proctime, 2, 1)"),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -364,7 +364,7 @@ class GroupWindowTest extends TableTestBase {
streamTableNode(0),
term("select", "int", "proctime")
),
- term("window", "TumblingGroupWindow('w, 'proctime, 2.rows)"),
+ term("window", "TumblingGroupWindow('w, 'proctime, 2)"),
term("select", "COUNT(int) AS TMP_0")
)
@@ -461,7 +461,7 @@ class GroupWindowTest extends TableTestBase {
streamTableNode(0),
term("select", "int", "proctime")
),
- term("window", "SlidingGroupWindow('w, 'proctime, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'proctime, 2, 1)"),
term("select", "COUNT(int) AS TMP_0")
)
@@ -595,7 +595,7 @@ class GroupWindowTest extends TableTestBase {
"DataStreamGroupWindowAggregate",
streamTableNode(0),
term("groupBy", "string, int2, int3"),
- term("window", "SlidingGroupWindow('w, 'proctime, 2.rows, 1.rows)"),
+ term("window", "SlidingGroupWindow('w, 'proctime, 2, 1)"),
term(
"select",
"string",