You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/29 15:40:46 UTC

[flink] 04/06: [hotfix][table] Remove deprecated AggregateFunction.requiresOver()

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9aed3a0c2e7e2eed68bec52028ec95b8a17860ea
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 27 16:25:43 2020 +0200

    [hotfix][table] Remove deprecated AggregateFunction.requiresOver()
---
 .../flink/table/functions/AggregateFunction.java   | 27 -------------------
 .../runtime/utils/JavaUserDefinedAggFunctions.java | 11 +++++---
 .../utils/JavaUserDefinedScalarFunctions.java      | 31 ----------------------
 .../planner/plan/stream/table/OverWindowTest.xml   | 18 ++++++-------
 .../sql/validation/OverWindowValidationTest.scala  |  2 +-
 .../plan/batch/sql/agg/OverAggregateTest.scala     |  2 +-
 .../validation/OverWindowValidationTest.scala      |  2 +-
 .../plan/stream/sql/agg/OverAggregateTest.scala    |  2 +-
 .../validation/OverWindowValidationTest.scala      |  2 +-
 .../runtime/utils/JavaUserDefinedAggFunctions.java | 10 +++----
 10 files changed, 24 insertions(+), 83 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
index 13c2745..8abd44c3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
@@ -23,10 +23,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.inference.TypeInference;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * Base class for user-defined aggregates.
  *
@@ -128,20 +124,6 @@ public abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunct
 	 */
 	public abstract T getValue(ACC accumulator);
 
-	/**
-	 * Returns <code>true</code> if this {@link AggregateFunction} can only be applied in an
-	 * OVER window.
-	 *
-	 * @return <code>true</code> if the {@link AggregateFunction} requires an OVER window,
-	 *         <code>false</code> otherwise.
-	 *
-	 * @deprecated Use {@link #getRequirements()} instead.
-	 */
-	@Deprecated
-	public boolean requiresOver() {
-		return false;
-	}
-
 	@Override
 	public final FunctionKind getKind() {
 		return FunctionKind.AGGREGATE;
@@ -151,13 +133,4 @@ public abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunct
 	public TypeInference getTypeInference(DataTypeFactory typeFactory) {
 		throw new TableException("Aggregate functions are not updated to the new type system yet.");
 	}
-
-	@Override
-	public Set<FunctionRequirement> getRequirements() {
-		final HashSet<FunctionRequirement> requirements = new HashSet<>();
-		if (requiresOver()) {
-			requirements.add(FunctionRequirement.OVER_WINDOW_ONLY);
-		}
-		return Collections.unmodifiableSet(requirements);
-	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedAggFunctions.java
index de8a573..3d74f98 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedAggFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedAggFunctions.java
@@ -23,21 +23,24 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionRequirement;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Test aggregator functions.
  */
 public class JavaUserDefinedAggFunctions {
 	/**
-	 * Accumulator for test requiresOver.
+	 * Accumulator for test {@link FunctionRequirement#OVER_WINDOW_ONLY}.
  	 */
 	public static class Accumulator0 extends Tuple2<Long, Integer>{}
 
 	/**
-	 * Test for requiresOver.
+	 * Test for {@link FunctionRequirement#OVER_WINDOW_ONLY}.
 	 */
 	public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
 		@Override
@@ -55,8 +58,8 @@ public class JavaUserDefinedAggFunctions {
 		}
 
 		@Override
-		public boolean requiresOver() {
-			return true;
+		public Set<FunctionRequirement> getRequirements() {
+			return Collections.singleton(FunctionRequirement.OVER_WINDOW_ONLY);
 		}
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index 1c86fc4..e1a0640 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -20,10 +20,8 @@ package org.apache.flink.table.planner.runtime.utils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.python.PythonEnv;
@@ -43,35 +41,6 @@ import static org.junit.Assert.fail;
 public class JavaUserDefinedScalarFunctions {
 
 	/**
-	 * Accumulator for test requiresOver.
-	 */
-	public static class AccumulatorOver extends Tuple2<Long, Integer> {}
-
-	/**
-	 * Test for requiresOver.
-	 */
-	public static class OverAgg0 extends AggregateFunction<Long, AccumulatorOver> {
-		@Override
-		public AccumulatorOver createAccumulator() {
-			return new AccumulatorOver();
-		}
-
-		@Override
-		public Long getValue(AccumulatorOver accumulator) {
-			return 1L;
-		}
-
-		//Overloaded accumulate method
-		public void accumulate(AccumulatorOver accumulator, long iValue, int iWeight) {
-		}
-
-		@Override
-		public boolean requiresOver() {
-			return true;
-		}
-	}
-
-	/**
 	 * Increment input.
 	 */
 	public static class JavaFunc0 extends ScalarFunction {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
index 6a903ff..844e238 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
@@ -53,7 +53,7 @@ Calc(select=[c, w0$o0 AS _c1])
   <TestCase name="testProcTimeBoundedPartitionedRangeOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], myAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'myAvg')])
+LogicalProject(a=[$0], myAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $0 ORDER BY $3 NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'myAvg')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -70,7 +70,7 @@ Calc(select=[a, w0$o0 AS myAvg])
   <TestCase name="testProcTimeBoundedPartitionedRowsOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(c=[$2], _c1=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')])
+LogicalProject(c=[$2], _c1=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -121,7 +121,7 @@ Calc(select=[c, w0$o0 AS _c1])
   <TestCase name="testProcTimeUnboundedPartitionedRangeOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c2')], _c3=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c3')])
+LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c2')], _c3=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c3')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -138,7 +138,7 @@ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3])
   <TestCase name="testProcTimeUnboundedPartitionedRowsOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(c=[$2], _c1=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], _c2=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c2')])
+LogicalProject(c=[$2], _c1=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], _c2=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $2 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c2')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -172,7 +172,7 @@ Calc(select=[a, w0$o0 AS _c1])
   <TestCase name="testRowTimeBoundedPartitionedRangeOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], _c1=[AS(AVG($2) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
+LogicalProject(a=[$0], _c1=[AS(AVG($2) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -206,7 +206,7 @@ Calc(select=[c, w0$o0 AS _c1])
   <TestCase name="testRowTimeBoundedPartitionedRowsOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(c=[$2], _c1=[AS(COUNT($1) OVER (PARTITION BY $1 ORDER BY $4 NULLS FIRST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $1 ORDER BY $4 NULLS FIRST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
+LogicalProject(c=[$2], _c1=[AS(COUNT($1) OVER (PARTITION BY $1 ORDER BY $4 NULLS FIRST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $1 ORDER BY $4 NULLS FIRST ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -257,7 +257,7 @@ Calc(select=[c, w0$o0 AS _c1])
   <TestCase name="testRowTimeUnboundedPartitionedRowsOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(c=[$2], _c1=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
+LogicalProject(c=[$2], _c1=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c1')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -274,7 +274,7 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS wAvg])
   <TestCase name="testRowTimeUnboundedPartitionedRangeOver">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c2')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$d1aa8f47e869d02a75edf3bb0ed00059($2, $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
+LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'_c2')], wAvg=[AS(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$4060051ba256b2d83fccd580b20a09be($2, $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wAvg')])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -291,7 +291,7 @@ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS wAvg])
   <TestCase name="testScalarFunctionsOnOverWindow">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(d=[AS(org$apache$flink$table$planner$expressions$utils$Func1$$879c8537562dbe74f3349fa0e6502755(AS(SUM($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wsum')), _UTF-16LE'd')], _c1=[AS(EXP(COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), _UTF-16LE'_c1')], _c2=[AS(+(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetrac [...]
+LogicalProject(d=[AS(org$apache$flink$table$planner$expressions$utils$Func1$$879c8537562dbe74f3349fa0e6502755(AS(SUM($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wsum')), _UTF-16LE'd')], _c1=[AS(EXP(COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), _UTF-16LE'_c1')], _c2=[AS(+(org$apache$flink$table$planner$plan$utils$JavaUserDefinedAggFunctions$WeightedAvgWithRetrac [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
index fc68e5a..3da22d6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.stream.sql.validation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.planner.utils.TableTestBase
 import org.apache.flink.types.Row
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala
index 6e10685..b5632de 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.batch.sql.agg
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.junit.Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/OverWindowValidationTest.scala
index a2a6741..374c1ac 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/OverWindowValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/OverWindowValidationTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.batch.table.validation
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{Tumble, ValidationException, _}
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.junit._
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
index 89a3316..d241fa2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.stream.sql.agg
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
 
 import org.junit.Assert.assertEquals
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/OverWindowValidationTest.scala
index 6905cb0..b9f8995 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/OverWindowValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/OverWindowValidationTest.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.OverAgg0
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0
 import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase, TableTestUtil}
 
 import org.apache.calcite.rel.RelNode
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
index d22fc18..1f059a9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.dataview.ListView;
 import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionRequirement;
 
 import java.util.Iterator;
 import java.util.Map;
@@ -32,12 +33,12 @@ import java.util.Map;
  */
 public class JavaUserDefinedAggFunctions {
 	/**
-	 * Accumulator for test requiresOver.
+	 * Accumulator for test {@link FunctionRequirement#OVER_WINDOW_ONLY}.
  	 */
 	public static class Accumulator0 extends Tuple2<Long, Integer>{}
 
 	/**
-	 * Test for requiresOver.
+	 * Test for {@link FunctionRequirement#OVER_WINDOW_ONLY}.
 	 */
 	public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
 		@Override
@@ -53,11 +54,6 @@ public class JavaUserDefinedAggFunctions {
 		//Overloaded accumulate method
 		public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) {
 		}
-
-		@Override
-		public boolean requiresOver() {
-			return true;
-		}
 	}
 
 	/**