You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 11:19:34 UTC

[flink] 04/05: [FLINK-13107][table-planner-blink] Fix Bug to check whether OverCall is RowMode or RangeMode.

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6e5954e8a03ad5d440447a57098976b0250f4f72
Author: beyond1920 <be...@126.com>
AuthorDate: Mon Jul 8 12:10:13 2019 +0800

    [FLINK-13107][table-planner-blink] Fix Bug to check whether OverCall is RowMode or RangeMode.
---
 .../flink/table/expressions/RexNodeConverter.java  |   7 +-
 .../org/apache/flink/table/expressions/call.scala  |  50 ---
 .../runtime/stream/table/OverWindowITCase.scala    | 439 +++++++++++++++++++++
 3 files changed, 444 insertions(+), 52 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index c311328..f6121a5 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.expressions;
 
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.calcite.FlinkRelBuilder;
@@ -45,10 +44,12 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.util.Preconditions;
 
 import com.google.common.collect.ImmutableList;
@@ -529,7 +530,9 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> {
 				.collect(Collectors.toList());
 		// assemble bounds
 		Expression preceding = args.get(2);
-		boolean isPhysical = ((ResolvedExpression) preceding).getOutputDataType().equals(DataTypes.BIGINT());
+		boolean isPhysical = LogicalTypeChecks.hasRoot(
+				fromDataTypeToLogicalType(((ResolvedExpression) preceding).getOutputDataType()),
+				LogicalTypeRoot.BIGINT);
 		Expression following = args.get(3);
 		RexWindowBound lowerBound = createBound(preceding, SqlKind.PRECEDING);
 		RexWindowBound upperBound = createBound(following, SqlKind.FOLLOWING);
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
index 9c5a212..406571f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -18,17 +18,6 @@
 package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-
-import org.apache.calcite.rex.RexWindowBound._
-import org.apache.calcite.rex.{RexNode, RexWindowBound}
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`.OrdinalReturnTypeInference
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.tools.RelBuilder
-
-import java.util
-import org.apache.flink.table.api._
-import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
@@ -78,45 +67,6 @@ case class OverCall(
     s"PRECEDING $preceding " +
     s"FOLLOWING $following)"
 
-  private def createBound(
-    relBuilder: RelBuilder,
-    bound: PlannerExpression,
-    sqlKind: SqlKind): RexWindowBound = {
-
-    bound match {
-      case _: UnboundedRow | _: UnboundedRange =>
-        val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
-        create(unbounded, null)
-      case _: CurrentRow | _: CurrentRange =>
-        val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
-        create(currentRow, null)
-      case b: Literal =>
-        val returnType = relBuilder
-          .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-          .createFieldTypeFromLogicalType(fromTypeInfoToLogicalType(Types.DECIMAL))
-
-        val sqlOperator = new SqlPostfixOperator(
-          sqlKind.name,
-          sqlKind,
-          2,
-          new OrdinalReturnTypeInference(0),
-          null,
-          null)
-
-        val operands: Array[SqlNode] = new Array[SqlNode](1)
-        operands(0) = SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)
-
-        val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
-
-        val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
-        expressions.add(relBuilder.literal(b.value))
-
-        val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
-
-        create(node, rexNode)
-    }
-  }
-
   override private[flink] def children: Seq[PlannerExpression] =
     Seq(agg) ++ Seq(orderBy) ++ partitionBy ++ Seq(preceding) ++ Seq(following)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
new file mode 100644
index 0000000..75bcd56
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{DataTypes, Over}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithRetractAndReset, WeightedAvg}
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeProcessOperator
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink}
+import org.apache.flink.table.util.CountAggFunction
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
+
+  @Test
+  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
+
+    val data = List(
+      (1L, 1, "Hello"),
+      (2L, 2, "Hello"),
+      (3L, 3, "Hello"),
+      (4L, 4, "Hello"),
+      (5L, 5, "Hello"),
+      (6L, 6, "Hello"),
+      (7L, 7, "Hello World"),
+      (8L, 8, "Hello World"),
+      (8L, 8, "Hello World"),
+      (20L, 20, "Hello World"),
+      (20L, 20, null.asInstanceOf[String]))
+
+    val stream = failingDataSource(data)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime)
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+    val countDist = new CountDistinct
+
+    val windowedTable = table//.select('a, 'b, 'c, proctime() as 'proctime)
+      .window(
+      Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+      .select('c,
+        countFun('b) over 'w as 'mycount,
+        weightAvgFun('a, 'b) over 'w as 'wAvg,
+        countDist('a) over 'w as 'countDist)
+      .select('c, 'mycount, 'wAvg, 'countDist)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello World,1,7,1", "Hello World,2,7,2", "Hello World,3,7,2", "Hello World,4,13,3",
+      "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6",
+      "null,1,20,1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testOverWindowWithConstant(): Unit = {
+
+    val data = List(
+      (1L, 1, "Hello"),
+      (2L, 2, "Hello"),
+      (3L, 3, "Hello"),
+      (4L, 4, "Hello"),
+      (5L, 5, "Hello"),
+      (6L, 6, "Hello"),
+      (7L, 7, "Hello World"),
+      (8L, 8, "Hello World"),
+      (8L, 8, "Hello World"),
+      (20L, 20, "Hello World"))
+
+    val stream = failingDataSource(data)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime)
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(
+        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+      .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello World,12", "Hello World,9", "Hello World,9", "Hello World,9", "Hello,3",
+      "Hello,3", "Hello,4", "Hello,4", "Hello,5", "Hello,5")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+    val countFun = new CountAggFunction
+    val weightAvgFun = new WeightedAvg
+    val plusOne = new JavaFunc0
+    val countDist = new CountDistinct
+
+    val windowedTable = table
+      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
+        CURRENT_RANGE as 'w)
+      .select(
+        'a, 'b, 'c,
+        'b.sum over 'w,
+        "SUM:".toExpr + ('b.sum over 'w),
+        countFun('b) over 'w,
+        (countFun('b) over 'w) + 1,
+        plusOne(countFun('b) over 'w),
+        array('b.avg over 'w, 'b.cast(DataTypes.DOUBLE()).max over 'w),
+        'b.avg over 'w,
+        'b.max over 'w,
+        'b.min over 'w,
+        ('b.min over 'w).abs(),
+        weightAvgFun('b, 'a) over 'w,
+        countDist('c) over 'w as 'countDist)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hello,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
+      "1,2,Hello,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
+      "1,3,Hello world,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
+      "1,1,Hi,7,SUM:7,4,5,5,[1.75, 3.0],1.75,3,1,1,1,3",
+      "2,1,Hello,1,SUM:1,1,2,2,[1.0, 1.0],1.0,1,1,1,1,1",
+      "2,2,Hello world,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
+      "2,3,Hello world,6,SUM:6,3,4,4,[2.0, 3.0],2.0,3,1,1,2,2",
+      "1,4,Hello world,11,SUM:11,5,6,6,[2.2, 4.0],2.2,4,1,1,2,3",
+      "1,5,Hello world,29,SUM:29,8,9,9,[3.625, 7.0],3.625,7,1,1,3,3",
+      "1,6,Hello world,29,SUM:29,8,9,9,[3.625, 7.0],3.625,7,1,1,3,3",
+      "1,7,Hello world,29,SUM:29,8,9,9,[3.625, 7.0],3.625,7,1,1,3,3",
+      "2,4,Hello world,15,SUM:15,5,6,6,[3.0, 5.0],3.0,5,1,1,3,2",
+      "2,5,Hello world,15,SUM:15,5,6,6,[3.0, 5.0],3.0,5,1,1,3,2"
+    )
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+
+    val data = List(
+      (1, 1L, 0, "Hallo", 1L),
+      (2, 2L, 1, "Hallo Welt", 2L),
+      (2, 3L, 2, "Hallo Welt wie", 1L),
+      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
+      (3, 5L, 4, "ABC", 2L),
+      (3, 6L, 5, "BCD", 3L),
+      (4, 7L, 6, "CDE", 2L),
+      (4, 8L, 7, "DEF", 1L),
+      (4, 9L, 8, "EFG", 1L),
+      (4, 10L, 9, "FGH", 2L),
+      (5, 11L, 10, "GHI", 1L),
+      (5, 12L, 11, "HIJ", 3L),
+      (5, 13L, 12, "IJK", 3L),
+      (5, 14L, 13, "JKL", 2L),
+      (5, 15L, 14, "KLM", 2L))
+
+    val countDist = new CountDistinctWithRetractAndReset
+    val stream = failingDataSource(data)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+
+    val windowedTable = table.select('a, 'b, 'c, 'd, 'e, 'proctime)
+      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
+      .select('a, 'c.sum over 'w, 'c.min over 'w, countDist('e) over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0,1",
+      "2,1,1,1",
+      "2,3,1,2",
+      "3,3,3,1",
+      "3,7,3,1",
+      "3,12,3,2",
+      "4,6,6,1",
+      "4,13,6,2",
+      "4,21,6,2",
+      "4,30,6,2",
+      "5,10,10,1",
+      "5,21,10,2",
+      "5,33,10,2",
+      "5,46,10,3",
+      "5,60,10,3")
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testProcTimeBoundedPartitionedRowsOverWithJavaAPI(): Unit = {
+
+    val data = List(
+      (1, 1L, 0, "Hallo", 1L),
+      (2, 2L, 1, "Hallo Welt", 2L),
+      (2, 3L, 2, "Hallo Welt wie", 1L),
+      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
+      (3, 5L, 4, "ABC", 2L),
+      (3, 6L, 5, "BCD", 3L),
+      (4, 7L, 6, "CDE", 2L),
+      (4, 8L, 7, "DEF", 1L),
+      (4, 9L, 8, "EFG", 1L),
+      (4, 10L, 9, "FGH", 2L),
+      (5, 11L, 10, "GHI", 1L),
+      (5, 12L, 11, "HIJ", 3L),
+      (5, 13L, 12, "IJK", 3L),
+      (5, 14L, 13, "JKL", 2L),
+      (5, 15L, 14, "KLM", 2L))
+
+    val countDist = new CountDistinctWithRetractAndReset
+    val stream = failingDataSource(data)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime)
+
+    val windowedTable = table.select("a, b, c, d, e, proctime")
+      .window(Over
+        .partitionBy("a")
+        .orderBy("proctime")
+        .preceding("4.rows")
+        .following("CURRENT_ROW")
+        .as("w"))
+      .select('a, 'c.sum over 'w, 'c.min over 'w, countDist('e) over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,0,0,1",
+      "2,1,1,1",
+      "2,3,1,2",
+      "3,3,3,1",
+      "3,7,3,1",
+      "3,12,3,2",
+      "4,6,6,1",
+      "4,13,6,2",
+      "4,21,6,2",
+      "4,30,6,2",
+      "5,10,10,1",
+      "5,21,10,2",
+      "5,33,10,2",
+      "5,46,10,3",
+      "5,60,10,3")
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRowOver(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((3L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((3L, (3L, 3, "Hello"))),
+      Left((4L, (4L, 4, "Hello"))),
+      Left((5L, (5L, 5, "Hello"))),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L),
+      Left((8L, (8L, 8, "Hello World"))),
+      Left((7L, (7L, 7, "Hello World"))),
+      Right(20L))
+
+    val countDist = new CountDistinctWithRetractAndReset
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+      .select('c, 'a, 'a.count over 'w, 'a.sum over 'w, countDist('a) over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello,1,1,1,1", "Hello,1,2,2,1", "Hello,1,3,3,1",
+      "Hello,2,3,4,2", "Hello,2,3,5,2", "Hello,2,3,6,1",
+      "Hello,3,3,7,2", "Hello,4,3,9,3", "Hello,5,3,12,3",
+      "Hello,6,3,15,3",
+      "Hello World,7,1,7,1", "Hello World,7,2,14,1", "Hello World,7,3,21,1",
+      "Hello World,7,3,21,1", "Hello World,8,3,22,2", "Hello World,20,3,35,3")
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6500L, (6L, 65, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((9500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 9, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))),
+      Right(19000L))
+
+    val countDist = new CountDistinctWithRetractAndReset
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    val windowedTable = table
+      .window(
+        Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w)
+      .select('c, 'b, 'a.count over 'w, 'a.sum over 'w, countDist('a) over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello,1,1,1,1", "Hello,15,2,2,1", "Hello,16,3,3,1",
+      "Hello,2,6,9,2", "Hello,3,6,9,2", "Hello,2,6,9,2",
+      "Hello,3,4,9,2",
+      "Hello,4,2,7,2",
+      "Hello,5,2,9,2",
+      "Hello,6,2,11,2", "Hello,65,2,12,1",
+      "Hello,9,2,12,1", "Hello,9,2,12,1", "Hello,18,3,18,1",
+      "Hello World,7,1,7,1", "Hello World,17,3,21,1",
+      "Hello World,77,3,21,1", "Hello World,18,1,7,1",
+      "Hello World,8,2,15,2",
+      "Hello World,20,1,20,1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testOverAggWithDiv(): Unit = {
+    val data: Seq[Either[(Long, (Long, Int, String)), Long]] = Seq(
+      Left((2L, (2L, 2, "Hello"))),
+      Left((1L, (1L, 1, "Hello"))),
+      Left((1L, (7L, 7, "Hello World"))),
+      Right(2L),
+      Left((6L, (6L, 6, "Hello"))),
+      Left((20L, (20L, 20, "Hello World"))),
+      Right(6L))
+
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Long, Int, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+      .select('c, 'a, 'a.count over 'w, ('a / 'a).sum over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Hello World,20,2,2.0", "Hello World,7,1,1.0", "Hello,1,1,1.0",
+      "Hello,2,2,2.0", "Hello,6,3,3.0")
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+}