You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:38:20 UTC

[flink] 02/03: [FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95275e2ea0ca1bfd35e4638d52649074190a83e0
Author: beyond1920 <be...@126.com>
AuthorDate: Thu Aug 1 17:38:19 2019 +0800

    [FLINK-13529][table-planner-blink] Remove the second parameter of FIRST_VALUE and LAST_VALUE
    
    According to ANSI-SQL, FIRST_VALUE and LAST_VALUE are ordered set function which require the within group clause to specify an order instead of pass the order field as a parameter.
    
    This closes #9316
---
 .../sql/SqlFirstLastValueAggFunction.java          | 34 +++++++----
 .../rules/logical/SplitAggregateRuleTest.scala     | 14 -----
 .../stream/sql/agg/DistinctAggregateTest.scala     | 15 -----
 .../runtime/stream/sql/AggregateITCase.scala       | 71 ----------------------
 .../runtime/stream/sql/OverWindowITCase.scala      | 37 +++++------
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 22 -------
 6 files changed, 43 insertions(+), 150 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
index e4b8a11..305f3e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlFirstLastValueAggFunction.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.functions.sql;
 
+import org.apache.flink.util.Preconditions;
+
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -27,8 +29,8 @@ import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Optionality;
 
 import java.util.List;
 
@@ -36,30 +38,40 @@ import java.util.List;
  * <code>FIRST_VALUE</code> and <code>LAST_VALUE</code> aggregate functions
  * return the first or the last value in a list of values that are input to the
  * function.
+ *
+ * <p>NOTE: The difference between this and {@link org.apache.calcite.sql.fun.SqlFirstLastValueAggFunction}
+ * is that this can be used without over clause.
  */
 public class SqlFirstLastValueAggFunction extends SqlAggFunction {
 
-	public SqlFirstLastValueAggFunction(SqlKind sqlKind) {
-		super(sqlKind.name(),
+	public SqlFirstLastValueAggFunction(SqlKind kind) {
+		super(
+				kind.name(),
 				null,
-				sqlKind,
+				kind,
 				ReturnTypes.ARG0_NULLABLE_IF_EMPTY,
 				null,
-				OperandTypes.or(OperandTypes.ANY, OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.ANY)),
+				OperandTypes.ANY,
 				SqlFunctionCategory.NUMERIC,
 				false,
-				false);
+				false,
+				Optionality.FORBIDDEN);
+		Preconditions.checkArgument(kind == SqlKind.FIRST_VALUE
+				|| kind == SqlKind.LAST_VALUE);
 	}
 
-	@Override
+	//~ Methods ----------------------------------------------------------------
+
+	@SuppressWarnings("deprecation")
 	public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
 		return ImmutableList.of(
-				typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true),
-				typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true));
+				typeFactory.createTypeWithNullability(
+						typeFactory.createSqlType(SqlTypeName.ANY), true));
 	}
 
-	@Override
+	@SuppressWarnings("deprecation")
 	public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-		return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true);
+		return typeFactory.createTypeWithNullability(
+				typeFactory.createSqlType(SqlTypeName.ANY), true);
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
index 5d58dfb..2680482 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRuleTest.scala
@@ -75,25 +75,11 @@ class SplitAggregateRuleTest extends TableTestBase {
   }
 
   @Test
-  def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
-    // FIRST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleLastValueWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
-  def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
-    // LAST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index 8e36305..5cbbab4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -81,27 +81,12 @@ class DistinctAggregateTest(
   }
 
   @Test
-  def testSingleFirstValueWithOrderWithDistinctAgg(): Unit = {
-    // FIRST_VALUE is not mergeable, so the final plan does not contain local agg
-    // FIRST_VALUE with order is not splittable,
-    // so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, FIRST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleLastValueWithDistinctAgg(): Unit = {
     // LAST_VALUE is not mergeable, so the final plan does not contain local agg
     util.verifyPlan("SELECT a, LAST_VALUE(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
 
   @Test
-  def testSingleLastValueWithOrderWithDistinctAgg(): Unit = {
-    // LAST_VALUE is not mergeable, so the final plan does not contain local agg
-    // LAST_VALUE with order is not splittable, so SplitAggregateRule can not be applied to the plan
-    util.verifyPlan("SELECT a, LAST_VALUE(c, b), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
-  }
-
-  @Test
   def testSingleConcatAggWithDistinctAgg(): Unit = {
     util.verifyPlan("SELECT a, LISTAGG(c), COUNT(DISTINCT b) FROM MyTable GROUP BY a")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 41561a3..75178e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -472,76 +471,6 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Test
-  def testFirstLastWithOrder(): Unit = {
-    // set all operator parallelism to 1 to make sure the processed input element is in order
-    env.setParallelism(1)
-    val data = new mutable.MutableList[(Long, String, String, Int, Long, String)]
-    data.+=((2L, "u1", "i1", 0, 0L, "b1"))
-    data.+=((-1L, "u1", "i1", 1, 1L, "b1"))
-    data.+=((3L, "u2", "i1", 1, 1L, "b1"))
-    data.+=((4L, "u2", null, 0, 0L, "b1"))
-
-    val t = failingDataSource(data).toTable(tEnv, 'o, 'u, 'i, 'v, 's, 'b)
-    tEnv.registerTable("T", t)
-    val t1 = tEnv.sqlQuery(
-      """
-        |SELECT first_value(u, lo) as f, last_value(u, lo) as l
-        |FROM (
-        |  SELECT b, u, i, last_value(o) as lo, last_value(v, o) as lv,
-        |    first_value(o) as fo, first_value(v, o) as fv
-        |  FROM T
-        |  GROUP BY u, i, b)
-        |GROUP BY i
-      """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List(
-      "u1,u2",
-      "u2,u2")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
-  def testFirstValueWithInputContainingNull(): Unit = {
-    val data = List(
-      Row.of("blond", null, Long.box(23L)),
-      Row.of("slim", null, Long.box(21L)),
-      Row.of("slim", null, Long.box(17L)),
-      Row.of("blond", null, Long.box(19L))
-    )
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO) // tpe is automatically
-
-    val t = failingDataSource(data).toTable(tEnv, 't, 'name, 'age)
-    tEnv.registerTable("T", t)
-
-    /* use sql grammar to generate null input for firstValue,
-     * since fromCollection will throw exception when serializing null as Long
-     */
-    val t1 = tEnv.sqlQuery(
-      """
-        |SELECT t,
-        |first_value(name, age) as c,
-        |last_value(name, age) as d
-        |FROM T
-        |GROUP BY t
-      """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("slim,null,null", "blond,null,null")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
   /** test unbounded groupBy (without window) **/
   @Test
   def testUnboundedGroupBy(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
index fddb45f..0c11794 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
@@ -339,10 +339,6 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
       "  c, b, " +
       "  LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  first_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
-      "  last_value(a, a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
-      "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
       "  SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
@@ -354,19 +350,26 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
     env.execute()
 
     val expected = List(
-      "Hello,1,0,1,1,1,1", "Hello,15,0,1,1,2,2", "Hello,16,0,1,1,3,3",
-      "Hello,2,0,1,2,6,9", "Hello,3,0,1,2,6,9", "Hello,2,0,1,2,6,9",
-      "Hello,3,0,2,3,4,9",
-      "Hello,4,0,3,4,2,7",
-      "Hello,5,1,4,5,2,9",
-      "Hello,6,2,5,6,2,11", "Hello,65,2,6,6,2,12",
-      "Hello,9,2,6,6,2,12", "Hello,9,2,6,6,2,12", "Hello,18,3,6,6,3,18",
-      "Hello World,17,3,7,7,3,21",
-      "Hello World,7,1,7,7,1,7",
-      "Hello World,77,3,7,7,3,21",
-      "Hello World,18,1,7,7,1,7",
-      "Hello World,8,2,7,8,2,15",
-      "Hello World,20,1,20,20,1,20")
+      "Hello,1,0,1,1",
+      "Hello,15,0,2,2",
+      "Hello,16,0,3,3",
+      "Hello,2,0,6,9",
+      "Hello,3,0,6,9",
+      "Hello,2,0,6,9",
+      "Hello,3,0,4,9",
+      "Hello,4,0,2,7",
+      "Hello,5,1,2,9",
+      "Hello,6,2,2,11",
+      "Hello,65,2,2,12",
+      "Hello,9,2,2,12",
+      "Hello,9,2,2,12",
+      "Hello,18,3,3,18",
+      "Hello World,17,3,3,21",
+      "Hello World,7,1,1,7",
+      "Hello World,77,3,3,21",
+      "Hello World,18,1,1,7",
+      "Hello World,8,2,2,15",
+      "Hello World,20,1,1,20")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 66bbfcc..6f7df33 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -195,28 +195,6 @@ class SplitAggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Test
-  def testFirstValueLastValueWithRetraction(): Unit = {
-    val t1 = tEnv.sqlQuery(
-      s"""
-         |SELECT
-         |  b, FIRST_VALUE(c, a), LAST_VALUE(c, a), COUNT(DISTINCT c)
-         |FROM(
-         |  SELECT
-         |    a, COUNT(DISTINCT b) as b, MAX(b) as c
-         |  FROM T
-         |  GROUP BY a
-         |) GROUP BY b
-       """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("2,2,6,2", "4,5,5,1", "1,5,5,1")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
   @Ignore("[FLINK-12088]: JOIN is not supported")
   @Test
   def testAggWithJoin(): Unit = {