You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/22 11:28:53 UTC

[flink] branch master updated: [FLINK-12566][table] Remove row interval type

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a0cf3  [FLINK-12566][table] Remove row interval type
02a0cf3 is described below

commit 02a0cf3d4e26f0217b87b7aaef604a6f39a56977
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 11:43:28 2019 +0200

    [FLINK-12566][table] Remove row interval type
    
    This closes #8497.
---
 .../table/expressions/ApiExpressionUtils.java      |  5 ++-
 .../table/expressions/ValueLiteralExpression.java  |  3 --
 .../flink/table/typeutils/RowIntervalTypeInfo.java | 41 ----------------------
 .../codegen/agg/batch/WindowCodeGenerator.scala    | 20 +++++------
 .../flink/table/plan/util/AggregateUtil.scala      | 33 +++++++++--------
 .../expressions/rules/OverWindowResolverRule.java  |  4 +--
 .../operations/AggregateOperationFactory.java      | 12 +++----
 .../table/expressions/PlannerExpressionUtils.scala |  7 ++--
 .../org/apache/flink/table/expressions/call.scala  | 14 ++++----
 .../apache/flink/table/expressions/literals.scala  |  3 +-
 .../flink/table/expressions/overOffsets.scala      |  7 ++--
 .../table/runtime/aggregate/AggregateUtil.scala    |  7 ++--
 .../table/api/batch/table/GroupWindowTest.scala    |  8 ++---
 .../table/GroupWindowTableAggregateTest.scala      |  8 ++---
 .../table/api/stream/table/GroupWindowTest.scala   | 10 +++---
 15 files changed, 67 insertions(+), 115 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 2bbfaa2..1ef8db3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 
 import java.util.Arrays;
@@ -124,10 +123,10 @@ public final class ApiExpressionUtils {
 
 	public static Expression toRowInterval(Expression e) {
 		final Optional<Expression> intInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.INT_TYPE_INFO)
-			.map((v) -> valueLiteral((long) v, RowIntervalTypeInfo.INTERVAL_ROWS));
+			.map((v) -> valueLiteral((long) v, BasicTypeInfo.LONG_TYPE_INFO));
 
 		final Optional<Expression> longInterval = ExpressionUtils.extractValue(e, BasicTypeInfo.LONG_TYPE_INFO)
-			.map((v) -> valueLiteral(v, RowIntervalTypeInfo.INTERVAL_ROWS));
+			.map((v) -> valueLiteral(v, BasicTypeInfo.LONG_TYPE_INFO));
 
 		if (intInterval.isPresent()) {
 			return intInterval.get();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
index f8138f6..186c90c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 import org.apache.flink.util.Preconditions;
 
@@ -109,8 +108,6 @@ public final class ValueLiteralExpression implements Expression {
 			return value + ".millis";
 		} else if (type.equals(TimeIntervalTypeInfo.INTERVAL_MONTHS)) {
 			return value + ".months";
-		} else if (type.equals(RowIntervalTypeInfo.INTERVAL_ROWS)) {
-			return value + ".rows";
 		} else {
 			return stringifyValue(value);
 		}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java
deleted file mode 100644
index a7cd362..0000000
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowIntervalTypeInfo.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Type information for row intervals.
- */
-@Internal
-public final class RowIntervalTypeInfo extends InternalTypeInfo<Long> {
-
-	private static final long serialVersionUID = -1306179424364925258L;
-
-	public static final RowIntervalTypeInfo INTERVAL_ROWS = new RowIntervalTypeInfo();
-
-	private RowIntervalTypeInfo() {
-		super(Long.class);
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof RowIntervalTypeInfo;
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
index 121de16..2e42df3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.codegen.agg.batch
 
+import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.tools.RelBuilder
+import org.apache.commons.math3.util.ArithmeticUtils
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
 import org.apache.flink.table.`type`.{InternalType, InternalTypes, RowType}
 import org.apache.flink.table.api.Types
@@ -30,9 +36,9 @@ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.OperatorCodeGenerator.generateCollect
 import org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping, genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, genInitFlatAggregateBuffer}
 import org.apache.flink.table.codegen.agg.batch.WindowCodeGenerator.{asLong, isTimeIntervalLiteral}
-import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, ExprCodeGenerator, GenerateUtils, GeneratedExpression}
+import org.apache.flink.table.codegen._
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, JoinedRow}
-import org.apache.flink.table.expressions.ExpressionBuilder.{ifThenElse, lessThan, literal, minus, mod, plus, reinterpretCast, times, typeLiteral}
+import org.apache.flink.table.expressions.ExpressionBuilder._
 import org.apache.flink.table.expressions.{Expression, RexNodeConverter, ValueLiteralExpression}
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
@@ -41,13 +47,7 @@ import org.apache.flink.table.plan.logical.{LogicalWindow, SlidingGroupWindow, T
 import org.apache.flink.table.plan.util.{AggregateInfoList, AggregateUtil}
 import org.apache.flink.table.runtime.util.RowIterator
 import org.apache.flink.table.runtime.window.grouping.{HeapWindowsGrouping, WindowsGrouping}
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-
-import org.apache.calcite.avatica.util.DateTimeUtils
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.RelBuilder
-import org.apache.commons.math3.util.ArithmeticUtils
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 abstract class WindowCodeGenerator(
     relBuilder: RelBuilder,
@@ -752,7 +752,7 @@ object WindowCodeGenerator {
   def asLong(expr: Expression): Long = expr match {
     case literal: ValueLiteralExpression
       if literal.getType == TimeIntervalTypeInfo.INTERVAL_MILLIS ||
-          literal.getType == RowIntervalTypeInfo.INTERVAL_ROWS =>
+          literal.getType == BasicTypeInfo.LONG_TYPE_INFO =>
       literal.getValue.asInstanceOf[java.lang.Long]
     case _ => throw new IllegalArgumentException()
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index 9f5d627..3d1798b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -17,7 +17,18 @@
  */
 package org.apache.flink.table.plan.util
 
-import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import java.lang.{Long => JLong}
+import java.time.Duration
+import java.util
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.sql.fun._
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
 import org.apache.flink.table.`type`.InternalTypes._
 import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes, TypeConverters}
 import org.apache.flink.table.api.{TableConfig, TableConfigOptions, TableException}
@@ -26,7 +37,7 @@ import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType
 import org.apache.flink.table.dataview.{DataViewSpec, MapViewSpec}
-import org.apache.flink.table.expressions.{FieldReferenceExpression, ProctimeAttribute, RexNodeConverter, RowtimeAttribute, ValueLiteralExpression, WindowEnd, WindowStart}
+import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.functions.sql.{FlinkSqlOperatorTable, SqlConcatAggFunction, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.functions.utils.AggSqlFunction
@@ -34,19 +45,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.runtime.bundle.trigger.CountBundleTrigger
-import org.apache.flink.table.typeutils.{BaseRowTypeInfo, BinaryStringTypeInfo, DecimalTypeInfo, MapViewTypeInfo, RowIntervalTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
-
-import org.apache.calcite.rel.`type`._
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
-import org.apache.calcite.rex.RexInputRef
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.validate.SqlMonotonicity
-import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
-import org.apache.calcite.tools.RelBuilder
-
-import java.lang.{Long => JLong}
-import java.time.Duration
-import java.util
+import org.apache.flink.table.typeutils._
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -714,11 +713,11 @@ object AggregateUtil extends Enumeration {
   }
 
   def isRowIntervalType(intervalType: TypeInformation[_]): Boolean = {
-    intervalType == RowIntervalTypeInfo.INTERVAL_ROWS
+    intervalType == BasicTypeInfo.LONG_TYPE_INFO
   }
 
   def toLong(literalExpr: ValueLiteralExpression): JLong = {
-    if (literalExpr.getType == RowIntervalTypeInfo.INTERVAL_ROWS) {
+    if (literalExpr.getType == BasicTypeInfo.LONG_TYPE_INFO) {
       literalExpr.getValue match {
         case v: JLong => v
         case _ => throw new IllegalArgumentException()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index c3603d7..ca726c0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -19,13 +19,13 @@
 package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.plan.logical.LogicalOverWindow;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -84,7 +84,7 @@ final class OverWindowResolverRule implements ResolverRule {
 		private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
 			return referenceWindow.following().orElseGet(() -> {
 					PlannerExpression preceding = resolutionContext.bridge(referenceWindow.preceding());
-					if (preceding.resultType() instanceof RowIntervalTypeInfo) {
+					if (preceding.resultType() == BasicTypeInfo.LONG_TYPE_INFO) {
 						return new CallExpression(BuiltInFunctionDefinitions.CURRENT_ROW, emptyList());
 					} else {
 						return new CallExpression(BuiltInFunctionDefinitions.CURRENT_RANGE, emptyList());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
index 30eaca5..1e6d84f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
@@ -49,7 +49,6 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils;
 import org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow;
-import org.apache.flink.table.typeutils.RowIntervalTypeInfo;
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
 
@@ -69,7 +68,6 @@ import static org.apache.flink.table.expressions.FunctionDefinition.Type.AGGREGA
 import static org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
 import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.SLIDE;
 import static org.apache.flink.table.operations.WindowAggregateTableOperation.ResolvedGroupWindow.WindowType.TUMBLE;
-import static org.apache.flink.table.typeutils.RowIntervalTypeInfo.INTERVAL_ROWS;
 import static org.apache.flink.table.typeutils.TimeIntervalTypeInfo.INTERVAL_MILLIS;
 
 /**
@@ -204,7 +202,7 @@ public class AggregateOperationFactory {
 	 *     <li>The alias is represented with an unresolved reference</li>
 	 *     <li>The time attribute is a single field reference of a {@link TimeIndicatorTypeInfo}(stream),
 	 *     {@link SqlTimeTypeInfo}(batch), or {@link BasicTypeInfo#LONG_TYPE_INFO}(batch) type</li>
-	 *     <li>The size & slide are value literals of either {@link RowIntervalTypeInfo#INTERVAL_ROWS},
+	 *     <li>The size & slide are value literals of either {@link BasicTypeInfo#LONG_TYPE_INFO},
 	 *     or {@link TimeIntervalTypeInfo} type</li>
 	 *     <li>The size & slide are of the same type</li>
 	 *     <li>The gap is a value literal of a {@link TimeIntervalTypeInfo} type</li>
@@ -292,7 +290,7 @@ public class AggregateOperationFactory {
 			"A tumble window expects a size value literal.");
 
 		TypeInformation<?> sizeType = windowSize.getType();
-		if (sizeType != INTERVAL_ROWS && sizeType != INTERVAL_MILLIS) {
+		if (sizeType != LONG_TYPE_INFO && sizeType != INTERVAL_MILLIS) {
 			throw new ValidationException(
 				"Tumbling window expects size literal of type Interval of Milliseconds or Interval of Rows.");
 		}
@@ -316,7 +314,7 @@ public class AggregateOperationFactory {
 
 		TypeInformation<?> windowSizeType = windowSize.getType();
 
-		if (windowSizeType != INTERVAL_ROWS && windowSizeType != INTERVAL_MILLIS) {
+		if (windowSizeType != LONG_TYPE_INFO && windowSizeType != INTERVAL_MILLIS) {
 			throw new ValidationException(
 				"A sliding window expects size literal of type Interval of Milliseconds or Interval of Rows.");
 		}
@@ -353,7 +351,7 @@ public class AggregateOperationFactory {
 	}
 
 	private void validateWindowIntervalType(FieldReferenceExpression timeField, TypeInformation<?> intervalType) {
-		if (isRowTimeIndicator(timeField) && intervalType == INTERVAL_ROWS) {
+		if (isRowTimeIndicator(timeField) && intervalType == LONG_TYPE_INFO) {
 			// unsupported row intervals on event-time
 			throw new ValidationException(
 				"Event-time grouping windows on row intervals in a stream environment " +
@@ -377,7 +375,7 @@ public class AggregateOperationFactory {
 		if (!windowProperties.isEmpty()) {
 			if (window.getType() == TUMBLE || window.getType() == SLIDE) {
 				TypeInformation<?> resultType = window.getSize().map(expressionBridge::bridge).get().resultType();
-				if (resultType == INTERVAL_ROWS) {
+				if (resultType == LONG_TYPE_INFO) {
 					throw new ValidationException(String.format("Window start and Window end cannot be selected " +
 						"for a row-count %s window.", window.getType().toString().toLowerCase()));
 				}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
index 0dfef22..7f7397f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.table.expressions
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 object PlannerExpressionUtils {
 
@@ -30,7 +31,7 @@ object PlannerExpressionUtils {
   }
 
   private[flink] def isRowCountLiteral(expr: PlannerExpression): Boolean = expr match {
-    case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true
+    case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) => true
     case _ => false
   }
 
@@ -61,7 +62,7 @@ object PlannerExpressionUtils {
   }
 
   private[flink] def toLong(expr: PlannerExpression): Long = expr match {
-    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
     case _ => throw new IllegalArgumentException()
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index 6a050a8..77eeb8e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -26,12 +26,12 @@ import org.apache.calcite.sql._
 import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 import _root_.scala.collection.JavaConverters._
@@ -96,7 +96,7 @@ case class OverCall(
     val partitionKeys = partitionBy.map(_.toRexNode).asJava
 
     // assemble bounds
-    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+    val isPhysical: Boolean = preceding.resultType == BasicTypeInfo.LONG_TYPE_INFO
 
     val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
     val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
@@ -186,9 +186,9 @@ case class OverCall(
     preceding match {
       case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
         ValidationSuccess
-      case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 =>
+      case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
         ValidationSuccess
-      case Literal(_, _: RowIntervalTypeInfo) =>
+      case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
         return ValidationFailure("Preceding row interval must be larger than 0.")
       case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
         ValidationSuccess
@@ -202,9 +202,9 @@ case class OverCall(
     following match {
       case _: CurrentRow | _: CurrentRange | _: UnboundedRow | _: UnboundedRange =>
         ValidationSuccess
-      case Literal(v: Long, _: RowIntervalTypeInfo) if v > 0 =>
+      case Literal(v: Long, BasicTypeInfo.LONG_TYPE_INFO) if v > 0 =>
         ValidationSuccess
-      case Literal(_, _: RowIntervalTypeInfo) =>
+      case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) =>
         return ValidationFailure("Following row interval must be larger than 0.")
       case Literal(v: Long, _: TimeIntervalTypeInfo[_]) if v >= 0 =>
         ValidationSuccess
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
index bf39d99..03ff076 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 object Literal {
   private[flink] val UTC = TimeZone.getTimeZone("UTC")
@@ -60,7 +60,6 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
     case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
     case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
     case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
-    case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
     case _ => s"Literal($value, $resultType)"
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
index 38068b7..ce1966f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/overOffsets.scala
@@ -18,10 +18,11 @@
 
 package org.apache.flink.table.expressions
 
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 case class CurrentRow() extends PlannerExpression {
-  override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
 
   override private[flink] def children = Seq()
 
@@ -37,7 +38,7 @@ case class CurrentRange() extends PlannerExpression {
 }
 
 case class UnboundedRow() extends PlannerExpression {
-  override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
+  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
 
   override private[flink] def children = Seq()
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 33795ca..f44b4bd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -28,9 +28,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
 import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
@@ -47,8 +46,8 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, UserDefinedAggregateFunction}
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
@@ -1842,7 +1841,7 @@ object AggregateUtil {
 
   private[flink] def asLong(expr: PlannerExpression): Long = expr match {
     case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => value
-    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) => value
     case _ => throw new IllegalArgumentException()
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
index 64b6a42..d040f82 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala
@@ -48,7 +48,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", "TumblingGroupWindow('w, 'long, 2.rows)"),
+      term("window", "TumblingGroupWindow('w, 'long, 2)"),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -140,7 +140,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "long", "int")
       ),
-      term("window", "TumblingGroupWindow('w, 'long, 2.rows)"),
+      term("window", "TumblingGroupWindow('w, 'long, 2)"),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -230,7 +230,7 @@ class GroupWindowTest extends TableTestBase {
       "DataSetWindowAggregate",
       batchTableNode(0),
       term("groupBy", "string"),
-      term("window", "SlidingGroupWindow('w, 'long, 2.rows, 1.rows)"),
+      term("window", "SlidingGroupWindow('w, 'long, 2, 1)"),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -301,7 +301,7 @@ class GroupWindowTest extends TableTestBase {
         batchTableNode(0),
         term("select", "long", "int")
       ),
-      term("window", "SlidingGroupWindow('w, 'long, 2.rows, 1.rows)"),
+      term("window", "SlidingGroupWindow('w, 'long, 2, 1)"),
       term("select", "COUNT(int) AS TMP_0")
     )
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
index 71f9631..37796c0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
@@ -115,7 +115,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
           term("select", "a", "b", "c", "e")
         ),
         term("groupBy", "c"),
-        term("window", "TumblingGroupWindow('w, 'e, 2.rows)"),
+        term("window", "TumblingGroupWindow('w, 'e, 2)"),
         term("select",  "c", "EmptyTableAggFunc(a, b) AS (f0, f1)")
       )
 
@@ -191,7 +191,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
           term("select", "a", "b", "c", "e")
         ),
         term("groupBy", "c"),
-        term("window", "SlidingGroupWindow('w, 'e, 2.rows, 1.rows)"),
+        term("window", "SlidingGroupWindow('w, 'e, 2, 1)"),
         term("select",  "c", "EmptyTableAggFunc(a, b) AS (f0, f1)")
       )
 
@@ -285,7 +285,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
           streamTableNode(0),
           term("select", "a", "b", "e")
         ),
-        term("window", "TumblingGroupWindow('w, 'e, 2.rows)"),
+        term("window", "TumblingGroupWindow('w, 'e, 2)"),
         term("select",  "EmptyTableAggFunc(a, b) AS (f0, f1)")
       )
 
@@ -355,7 +355,7 @@ class GroupWindowTableAggregateTest extends TableTestBase {
           streamTableNode(0),
           term("select", "a", "b", "e")
         ),
-        term("window", "SlidingGroupWindow('w, 'e, 2.rows, 1.rows)"),
+        term("window", "SlidingGroupWindow('w, 'e, 2, 1)"),
         term("select",  "EmptyTableAggFunc(a, b) AS (f0, f1)")
       )
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index 2289c2c..d1aade2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -107,7 +107,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "int", "string", "proctime")
       ),
       term("groupBy", "string"),
-      term("window", "TumblingGroupWindow('w, 'proctime, 2.rows)"),
+      term("window", "TumblingGroupWindow('w, 'proctime, 2)"),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -201,7 +201,7 @@ class GroupWindowTest extends TableTestBase {
         term("select", "int", "string", "proctime")
       ),
       term("groupBy", "string"),
-      term("window", "SlidingGroupWindow('w, 'proctime, 2.rows, 1.rows)"),
+      term("window", "SlidingGroupWindow('w, 'proctime, 2, 1)"),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -364,7 +364,7 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int", "proctime")
       ),
-      term("window", "TumblingGroupWindow('w, 'proctime, 2.rows)"),
+      term("window", "TumblingGroupWindow('w, 'proctime, 2)"),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -461,7 +461,7 @@ class GroupWindowTest extends TableTestBase {
         streamTableNode(0),
         term("select", "int", "proctime")
       ),
-      term("window", "SlidingGroupWindow('w, 'proctime, 2.rows, 1.rows)"),
+      term("window", "SlidingGroupWindow('w, 'proctime, 2, 1)"),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -595,7 +595,7 @@ class GroupWindowTest extends TableTestBase {
           "DataStreamGroupWindowAggregate",
           streamTableNode(0),
           term("groupBy", "string, int2, int3"),
-          term("window", "SlidingGroupWindow('w, 'proctime, 2.rows, 1.rows)"),
+          term("window", "SlidingGroupWindow('w, 'proctime, 2, 1)"),
           term(
             "select",
             "string",