You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:38:20 UTC
[flink] 02/03: [FLINK-13529][table-planner-blink] Remove the second
parameter of FIRST_VALUE and LAST_VALUE
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 95275e2ea0ca1bfd35e4638d52649074190a83e0
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Aug 1 17:38:19 2019 +0800
[FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE
According to ANSI-SQL, FIRST_VALUE and LAST_VALUE are ordered set function which require the within group clause to specify an order instead of pass the order field as a parameter.
This closes #9316
---
.../sql/SqlFirstLastValueAggFunction.java | 34 +++++++----
.../rules/logical/SplitAggregateRuleTest.scala | 14 -----
.../stream/sql/agg/DistinctAggregateTest.scala | 15 -----
.../runtime/stream/sql/AggregateITCase.scala | 71 ----------------------
.../runtime/stream/sql/OverWindowITCase.scala | 37 +++++------
.../runtime/stream/sql/SplitAggregateITCase.scala | 22 -------
6 files changed, 43 insertions(+), 150 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
index e4b8a11..305f3e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.planner.functions.sql;
+import org.apache.flink.util.Preconditions;
+
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.type.RelDataType;
@@ -27,8 +29,8 @@ import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Optionality;
import java.util.List;
@@ -36,30 +38,40 @@ import java.util.List;
* <code>FIRST_VALUE</code> and <code>LAST_VALUE</code> aggregate functions
* return the first or the last value in a list of values that are input to the
* function.
+ *
+ * <p>NOTE: The difference between this and {@link org.apache.calcite.sql.fun.SqlFirstLastValueAggFunction}
+ * is that this can be used without over clause.
*/
public class SqlFirstLastValueAggFunction extends SqlAggFunction {
- public SqlFirstLastValueAggFunction(SqlKind sqlKind) {
- super(sqlKind.name(),
+ public SqlFirstLastValueAggFunction(SqlKind kind) {
+ super(
+ kind.name(),
null,
- sqlKind,
+ kind,
ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
null,
- OperandTypes.or(OperandTypes.ANY, OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY)),
+ OperandTypes.ANY,
SqlFunctionCategory.NUMERIC,
false,
- false);
+ false,
+ Optionality.FORBIDDEN);
+ Preconditions.checkArgument(kind == SqlKind.FIRST_VALUE
+ || kind == SqlKind.LAST_VALUE);
}
- @Override
+ //~ Methods ----------------------------------------------------------------
+
+ @SuppressWarnings("deprecation")
public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
return ImmutableList.of(
- typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true),
- typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true));
+ typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.ANY), true));
}
- @Override
+ @SuppressWarnings("deprecation")
public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
- return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true);
+ return typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.ANY), true);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index 5d58dfb..2680482 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -75,25 +75,11 @@ class SplitAggregateRuleTest extends TableTestBase {
}
@Test
- def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
- // FIRST_VALUE with order is not splittable,
- // so SplitAggregateRule can not be applied to the plan
- util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
- }
-
- @Test
def testSingleLastValueWithDistinctAgg(): Unit = {
util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
}
@Test
- def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
- // LAST_VALUE with order is not splittable,
- // so SplitAggregateRule can not be applied to the plan
- util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
- }
-
- @Test
def testSingleConcatAggWithDistinctAgg(): Unit = {
util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 8e36305..5cbbab4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -81,27 +81,12 @@ class DistinctAggregateTest(
}
@Test
- def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
- // FIRST_VALUE is not mergeable, so the final plan does not contain local agg
- // FIRST_VALUE with order is not splittable,
- // so SplitAggregateRule can not be applied to the plan
- util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
- }
-
- @Test
def testSingleLastValueWithDistinctAgg(): Unit = {
// LAST_VALUE is not mergeable, so the final plan does not contain local agg
util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
}
@Test
- def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
- // LAST_VALUE is not mergeable, so the final plan does not contain local agg
- // LAST_VALUE with order is not splittable, so SplitAggregateRule can not be applied to the plan
- util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
- }
-
- @Test
def testSingleConcatAggWithDistinctAgg(): Unit = {
util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 41561a3..75178e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -17,7 +17,6 @@
*/
package org.apache.flink.table.planner.runtime.stream.sql
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
@@ -472,76 +471,6 @@ class AggregateITCase(
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
- @Test
- def testFirstLastWithOrder(): Unit = {
- // set all operator parallelism to 1 to make sure the processed input element is in order
- env.setParallelism(1)
- val data = new mutable.MutableList[(Long, String, String, Int, Long, String)]
- data.+=((2L, "u1", "i1", 0, 0L, "b1"))
- data.+=((-1L, "u1", "i1", 1, 1L, "b1"))
- data.+=((3L, "u2", "i1", 1, 1L, "b1"))
- data.+=((4L, "u2", null, 0, 0L, "b1"))
-
- val t = failingDataSource(data).toTable(tEnv, 'o, 'u, 'i, 'v, 's, 'b)
- tEnv.registerTable("T", t)
- val t1 = tEnv.sqlQuery(
- """
- |SELECT first_value(u, lo) as f, last_value(u, lo) as l
- |FROM (
- | SELECT b, u, i, last_value(o) as lo, last_value(v, o) as lv,
- | first_value(o) as fo, first_value(v, o) as fv
- | FROM T
- | GROUP BY u, i, b)
- |GROUP BY i
- """.stripMargin)
-
- val sink = new TestingRetractSink
- t1.toRetractStream[Row].addSink(sink)
- env.execute()
-
- val expected = List(
- "u1,u2",
- "u2,u2")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
- }
-
- @Test
- def testFirstValueWithInputContainingNull(): Unit = {
- val data = List(
- Row.of("blond", null, Long.box(23L)),
- Row.of("slim", null, Long.box(21L)),
- Row.of("slim", null, Long.box(17L)),
- Row.of("blond", null, Long.box(19L))
- )
-
- implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO) // tpe is automatically
-
- val t = failingDataSource(data).toTable(tEnv, 't, 'name, 'age)
- tEnv.registerTable("T", t)
-
- /* use sql grammar to generate null input for firstValue,
- * since fromCollection will throw exception when serializing null as Long
- */
- val t1 = tEnv.sqlQuery(
- """
- |SELECT t,
- |first_value(name, age) as c,
- |last_value(name, age) as d
- |FROM T
- |GROUP BY t
- """.stripMargin)
-
- val sink = new TestingRetractSink
- t1.toRetractStream[Row].addSink(sink)
- env.execute()
-
- val expected = List("slim,null,null", "blond,null,null")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
- }
-
/** test unbounded groupBy (without window) **/
@Test
def testUnboundedGroupBy(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index fddb45f..0c11794 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -339,10 +339,6 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
" c, b, " +
" LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
" BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
- " first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
- " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
- " last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
- " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
" COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
" BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
" SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
@@ -354,19 +350,26 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
env.execute()
val expected = List(
- "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3",
- "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9",
- "Hello,3,0,2,3,4,9",
- "Hello,4,0,3,4,2,7",
- "Hello,5,1,4,5,2,9",
- "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12",
- "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18",
- "Hello World,17,3,7,7,3,21",
- "Hello World,7,1,7,7,1,7",
- "Hello World,77,3,7,7,3,21",
- "Hello World,18,1,7,7,1,7",
- "Hello World,8,2,7,8,2,15",
- "Hello World,20,1,20,20,1,20")
+ "Hello,1,0,1,1",
+ "Hello,15,0,2,2",
+ "Hello,16,0,3,3",
+ "Hello,2,0,6,9",
+ "Hello,3,0,6,9",
+ "Hello,2,0,6,9",
+ "Hello,3,0,4,9",
+ "Hello,4,0,2,7",
+ "Hello,5,1,2,9",
+ "Hello,6,2,2,11",
+ "Hello,65,2,2,12",
+ "Hello,9,2,2,12",
+ "Hello,9,2,2,12",
+ "Hello,18,3,3,18",
+ "Hello World,17,3,3,21",
+ "Hello World,7,1,1,7",
+ "Hello World,77,3,3,21",
+ "Hello World,18,1,1,7",
+ "Hello World,8,2,2,15",
+ "Hello World,20,1,1,20")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 66bbfcc..6f7df33 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -195,28 +195,6 @@ class SplitAggregateITCase(
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
- @Test
- def testFirstValueLastValueWithRetraction(): Unit = {
- val t1 = tEnv.sqlQuery(
- s"""
- |SELECT
- | b, FIRST_VALUE(c, a), LAST_VALUE(c, a), COUNT(DISTINCT c)
- |FROM(
- | SELECT
- | a, COUNT(DISTINCT b) as b, MAX(b) as c
- | FROM T
- | GROUP BY a
- |) GROUP BY b
- """.stripMargin)
-
- val sink = new TestingRetractSink
- t1.toRetractStream[Row].addSink(sink)
- env.execute()
-
- val expected = List("2,2,6,2", "4,5,5,1", "1,5,5,1")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
- }
-
@Ignore("[FLINK-12088]: JOIN is not supported")
@Test
def testAggWithJoin(): Unit = {