You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:38:21 UTC

[flink] 03/03: [FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72e9f5b70030317e03f3e5fe57543ce970be43f2
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:24:35 2019 +0800

    [FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner
    
    - Remove APPROX_COUNT_DISTINCT for now because we still don't support it yet.
    - Remove INCR_SUM because it is not a standard aggregate function.
    
    This closes #9316
---
 .../functions/sql/FlinkSqlOperatorTable.java       |  6 --
 .../functions/sql/SqlIncrSumAggFunction.java       | 75 ----------------------
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  2 -
 .../planner/plan/utils/AggFunctionFactory.scala    |  4 +-
 .../FlinkRelMdModifiedMonotonicityTest.scala       | 11 ----
 .../table/planner/plan/stream/sql/RankTest.scala   | 75 ----------------------
 .../sql/agg/SortDistinctAggregateITCase.scala      | 10 +--
 .../runtime/stream/sql/AggregateITCase.scala       | 36 -----------
 8 files changed, 2 insertions(+), 217 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index dbabb69..e201400 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -950,11 +950,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	 */
 	public static final SqlListAggFunction LISTAGG = new SqlListAggFunction();
 
-	/**
-	 * <code>INCR_SUM</code> aggregate function.
-	 */
-	public static final SqlIncrSumAggFunction INCR_SUM = new SqlIncrSumAggFunction();
-
 	// -----------------------------------------------------------------------------
 	// Window SQL functions
 	// -----------------------------------------------------------------------------
@@ -1072,7 +1067,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM;
 	public static final SqlAggFunction SUM0 = SqlStdOperatorTable.SUM0;
 	public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT;
-	public static final SqlAggFunction APPROX_COUNT_DISTINCT = SqlStdOperatorTable.APPROX_COUNT_DISTINCT;
 	public static final SqlAggFunction COLLECT = SqlStdOperatorTable.COLLECT;
 	public static final SqlAggFunction MIN = SqlStdOperatorTable.MIN;
 	public static final SqlAggFunction MAX = SqlStdOperatorTable.MAX;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
deleted file mode 100644
index 0b92186..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.sql;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlSplittableAggFunction;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.List;
-
-/**
- * <code>INCR_SUM</code> is an aggregator which returns the sum of the values which
- * go into it like SUM. It differs in that the modified monotonicity of
- * INCR_SUM is INCREASING, while that of SUM should be inferred using
- * extra information.
- */
-public class SqlIncrSumAggFunction extends SqlAggFunction {
-
-	public SqlIncrSumAggFunction() {
-		super(
-				"INCR_SUM",
-				null,
-				SqlKind.SUM,
-				ReturnTypes.AGG_SUM,
-				null,
-				OperandTypes.NUMERIC,
-				SqlFunctionCategory.NUMERIC,
-				false,
-				false);
-	}
-
-	@Override
-	public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
-		return ImmutableList.of(
-				typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true));
-	}
-
-	@Override
-	public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
-		return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true);
-	}
-
-	@Override
-	public <T> T unwrap(Class<T> clazz) {
-		if (clazz == SqlSplittableAggFunction.class) {
-			return clazz.cast(SqlSplittableAggFunction.CountSplitter.INSTANCE);
-		} else {
-			return super.unwrap(clazz);
-		}
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 57de91f..9c6c77a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.functions.sql.SqlIncrSumAggFunction
 import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ModifiedMonotonicity
@@ -337,7 +336,6 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
         case SqlKind.MIN => DECREASING
         case _ => NOT_MONOTONIC
       }
-      case _: SqlIncrSumAggFunction => INCREASING
       case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction =>
         val valueInterval = fmq.getFilteredColumnInterval(
           input, aggCall.getArgList.head, aggCall.filterArg)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index a8e00d3..e8707f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
 import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
 import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction}
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -75,8 +75,6 @@ class AggFunctionFactory(
 
       case _: SqlSumEmptyIsZeroAggFunction => createSum0AggFunction(argTypes)
 
-      case _: SqlIncrSumAggFunction => createIncrSumAggFunction(argTypes, index)
-
       case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MIN =>
         createMinAggFunction(argTypes, index)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index cdd0191..3f79b07 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.plan.metadata
 
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
 import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType}
@@ -171,16 +170,6 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
       mq.getRelModifiedMonotonicity(aggWithAvg)
     )
 
-    // incr_sum agg
-    val aggWithIncrSum = relBuilder.scan("MyTable3").aggregate(
-      relBuilder.groupKey(relBuilder.field("a")),
-      relBuilder.aggregateCall(FlinkSqlOperatorTable.INCR_SUM, false, null,
-        "incr_sum_b", relBuilder.field("b"))).build()
-    assertEquals(
-      new RelModifiedMonotonicity(Array(CONSTANT, INCREASING)),
-      mq.getRelModifiedMonotonicity(aggWithIncrSum)
-    )
-
     // test monotonicity lost because group by a agg field
     // select max_c, max(sum_d) as max_sum_d from (
     //   select a, b, max(c) as max_c, sum(d) as sum_d from MyTable4 group by a, b
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index e6624fb..bcea370 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -579,28 +579,6 @@ class RankTest extends TableTestBase {
   }
 
   @Test
-  def testTopNOrderByIncrSum(): Unit = {
-    val subquery =
-      """
-        |SELECT a, b, incr_sum(c) as sum_c
-        |FROM MyTable
-        |GROUP BY a, b
-      """.stripMargin
-
-    val sql =
-      s"""
-         |SELECT *
-         |FROM (
-         |  SELECT a, b, sum_c,
-         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num
-         |  FROM ($subquery))
-         |WHERE row_num <= 10
-      """.stripMargin
-
-    util.verifyPlanWithTrait(sql)
-  }
-
-  @Test
   def testNestedTopN(): Unit = {
     val subquery =
       """
@@ -661,58 +639,5 @@ class RankTest extends TableTestBase {
     util.verifyPlanWithTrait(sql)
   }
 
-  @Test
-  def testTopNWithoutRowNumber2(): Unit = {
-    util.addTableSource[(String, String, String, String, Long, String, Long, String)](
-      "stream_source",
-      'seller_id, 'sku_id, 'venture, 'stat_date, 'trd_amt, 'trd_buyer_id, 'log_pv, 'log_visitor_id)
-
-    val group_sql =
-      """
-        |SELECT
-        |    seller_id
-        |    ,sku_id
-        |    ,venture
-        |    ,stat_date
-        |    ,incr_sum(trd_amt) AS amt_dtr
-        |    ,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr
-        |    ,SUM(log_pv) AS pv_dtr
-        |    ,COUNT(DISTINCT log_visitor_id) AS uv_dtr
-        |FROM stream_source
-        |GROUP BY seller_id,sku_id,venture,stat_date
-      """.stripMargin
-
-    val sql =
-      s"""
-         |SELECT
-         |    CONCAT(seller_id, venture, stat_date, sku_id) as rowkey,
-         |    seller_id,
-         |    sku_id,
-         |    venture,
-         |    stat_date,
-         |    amt_dtr,
-         |    byr_cnt_dtr,
-         |    pv_dtr,
-         |    uv_dtr
-         |FROM (
-         |  SELECT
-         |        seller_id,
-         |        sku_id,
-         |        venture,
-         |        stat_date,
-         |        amt_dtr,
-         |        byr_cnt_dtr,
-         |        pv_dtr,
-         |        uv_dtr,
-         |        ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date
-         |           ORDER BY amt_dtr DESC) AS rownum
-         |  FROM ($group_sql)
-         |)
-         |WHERE rownum <= 10
-      """.stripMargin
-
-    util.verifyPlanWithTrait(sql)
-  }
-
   // TODO add tests about multi-sinks and udf
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
index b5c51e1..d9ccb89 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
 import org.apache.flink.table.planner.utils.{CountAggFunction, IntSumAggFunction}
 
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import scala.collection.Seq
 
@@ -86,12 +86,4 @@ class SortDistinctAggregateITCase extends DistinctAggregateITCaseBase {
     )
   }
 
-  @Ignore
-  @Test
-  def testApproximateCountDistinct(): Unit = {
-    checkResult(
-      "SELECT APPROX_COUNT_DISTINCT(b) FROM Table3",
-      Seq(row(6))
-    )
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 75178e5..7d8796d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -398,42 +398,6 @@ class AggregateITCase(
   }
 
   @Test
-  def testIncrSum(): Unit = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "A"))
-    data.+=((-2, 2L, "B"))
-    data.+=((3, 2L, "B"))
-    data.+=((-4, 3L, "C"))
-    data.+=((5, 3L, "C"))
-    data.+=((6, 3L, "C"))
-    data.+=((-7, 4L, "B"))
-    data.+=((8, 4L, "A"))
-    data.+=((9, 4L, "D"))
-    data.+=((10, 4L, "E"))
-    data.+=((-11, 5L, "A"))
-    data.+=((12, 5L, "B"))
-
-    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable("T", t)
-
-    val sql =
-      """
-        |SELECT b, incr_sum(a)
-        |FROM T
-        |GROUP BY b
-      """.stripMargin
-
-    val t1 = tEnv.sqlQuery(sql)
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("1,1", "2,3", "3,11", "4,27", "5,12")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-
-  }
-
-  @Test
   def testNestedGroupByAgg(): Unit = {
     val data = new mutable.MutableList[(Int, Long, String)]
     data.+=((1, 1L, "A"))