You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 03:38:21 UTC
[flink] 03/03: [FLINK-13529][table-planner-blink] Remove
APPROX_COUNT_DISTINCT and INCR_SUM in blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72e9f5b70030317e03f3e5fe57543ce970be43f2
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:24:35 2019 +0800
[FLINK-13529][table-planner-blink] Remove APPROX_COUNT_DISTINCT and INCR_SUM in blink planner
- Remove APPROX_COUNT_DISTINCT for now because we still don't support it yet.
- Remove INCR_SUM because it is not a standard aggregate function.
This closes #9316
---
.../functions/sql/FlinkSqlOperatorTable.java | 6 --
.../functions/sql/SqlIncrSumAggFunction.java | 75 ----------------------
.../metadata/FlinkRelMdModifiedMonotonicity.scala | 2 -
.../planner/plan/utils/AggFunctionFactory.scala | 4 +-
.../FlinkRelMdModifiedMonotonicityTest.scala | 11 ----
.../table/planner/plan/stream/sql/RankTest.scala | 75 ----------------------
.../sql/agg/SortDistinctAggregateITCase.scala | 10 +--
.../runtime/stream/sql/AggregateITCase.scala | 36 -----------
8 files changed, 2 insertions(+), 217 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index dbabb69..e201400 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -950,11 +950,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
*/
public static final SqlListAggFunction LISTAGG = new SqlListAggFunction();
- /**
- * <code>INCR_SUM</code> aggregate function.
- */
- public static final SqlIncrSumAggFunction INCR_SUM = new SqlIncrSumAggFunction();
-
// -----------------------------------------------------------------------------
// Window SQL functions
// -----------------------------------------------------------------------------
@@ -1072,7 +1067,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlAggFunction SUM = SqlStdOperatorTable.SUM;
public static final SqlAggFunction SUM0 = SqlStdOperatorTable.SUM0;
public static final SqlAggFunction COUNT = SqlStdOperatorTable.COUNT;
- public static final SqlAggFunction APPROX_COUNT_DISTINCT = SqlStdOperatorTable.APPROX_COUNT_DISTINCT;
public static final SqlAggFunction COLLECT = SqlStdOperatorTable.COLLECT;
public static final SqlAggFunction MIN = SqlStdOperatorTable.MIN;
public static final SqlAggFunction MAX = SqlStdOperatorTable.MAX;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
deleted file mode 100644
index 0b92186..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlIncrSumAggFunction.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.sql;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlFunctionCategory;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlSplittableAggFunction;
-import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.List;
-
-/**
- * <code>INCR_SUM</code> is an aggregator which returns the sum of the values which
- * go into it like SUM. It differs in that the modified monotonicity of
- * INCR_SUM is INCREASING, while that of SUM should be inferred using
- * extra information.
- */
-public class SqlIncrSumAggFunction extends SqlAggFunction {
-
- public SqlIncrSumAggFunction() {
- super(
- "INCR_SUM",
- null,
- SqlKind.SUM,
- ReturnTypes.AGG_SUM,
- null,
- OperandTypes.NUMERIC,
- SqlFunctionCategory.NUMERIC,
- false,
- false);
- }
-
- @Override
- public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
- return ImmutableList.of(
- typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true));
- }
-
- @Override
- public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
- return typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.ANY), true);
- }
-
- @Override
- public <T> T unwrap(Class<T> clazz) {
- if (clazz == SqlSplittableAggFunction.class) {
- return clazz.cast(SqlSplittableAggFunction.CountSplitter.INSTANCE);
- } else {
- return super.unwrap(clazz);
- }
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 57de91f..9c6c77a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.functions.sql.SqlIncrSumAggFunction
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ModifiedMonotonicity
@@ -337,7 +336,6 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
case SqlKind.MIN => DECREASING
case _ => NOT_MONOTONIC
}
- case _: SqlIncrSumAggFunction => INCREASING
case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction =>
val valueInterval = fmq.getFilteredColumnInterval(
input, aggCall.getArgList.head, aggCall.filterArg)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index a8e00d3..e8707f2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.planner.functions.aggfunctions.MinWithRetractAggFu
import org.apache.flink.table.planner.functions.aggfunctions.SingleValueAggFunction._
import org.apache.flink.table.planner.functions.aggfunctions.SumWithRetractAggFunction._
import org.apache.flink.table.planner.functions.aggfunctions._
-import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction, SqlIncrSumAggFunction}
+import org.apache.flink.table.planner.functions.sql.{SqlListAggFunction, SqlFirstLastValueAggFunction}
import org.apache.flink.table.planner.functions.utils.AggSqlFunction
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo
@@ -75,8 +75,6 @@ class AggFunctionFactory(
case _: SqlSumEmptyIsZeroAggFunction => createSum0AggFunction(argTypes)
- case _: SqlIncrSumAggFunction => createIncrSumAggFunction(argTypes, index)
-
case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MIN =>
createMinAggFunction(argTypes, index)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index cdd0191..3f79b07 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.plan.metadata
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType}
@@ -171,16 +170,6 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
mq.getRelModifiedMonotonicity(aggWithAvg)
)
- // incr_sum agg
- val aggWithIncrSum = relBuilder.scan("MyTable3").aggregate(
- relBuilder.groupKey(relBuilder.field("a")),
- relBuilder.aggregateCall(FlinkSqlOperatorTable.INCR_SUM, false, null,
- "incr_sum_b", relBuilder.field("b"))).build()
- assertEquals(
- new RelModifiedMonotonicity(Array(CONSTANT, INCREASING)),
- mq.getRelModifiedMonotonicity(aggWithIncrSum)
- )
-
// test monotonicity lost because group by a agg field
// select max_c, max(sum_d) as max_sum_d from (
// select a, b, max(c) as max_c, sum(d) as sum_d from MyTable4 group by a, b
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index e6624fb..bcea370 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -579,28 +579,6 @@ class RankTest extends TableTestBase {
}
@Test
- def testTopNOrderByIncrSum(): Unit = {
- val subquery =
- """
- |SELECT a, b, incr_sum(c) as sum_c
- |FROM MyTable
- |GROUP BY a, b
- """.stripMargin
-
- val sql =
- s"""
- |SELECT *
- |FROM (
- | SELECT a, b, sum_c,
- | ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num
- | FROM ($subquery))
- |WHERE row_num <= 10
- """.stripMargin
-
- util.verifyPlanWithTrait(sql)
- }
-
- @Test
def testNestedTopN(): Unit = {
val subquery =
"""
@@ -661,58 +639,5 @@ class RankTest extends TableTestBase {
util.verifyPlanWithTrait(sql)
}
- @Test
- def testTopNWithoutRowNumber2(): Unit = {
- util.addTableSource[(String, String, String, String, Long, String, Long, String)](
- "stream_source",
- 'seller_id, 'sku_id, 'venture, 'stat_date, 'trd_amt, 'trd_buyer_id, 'log_pv, 'log_visitor_id)
-
- val group_sql =
- """
- |SELECT
- | seller_id
- | ,sku_id
- | ,venture
- | ,stat_date
- | ,incr_sum(trd_amt) AS amt_dtr
- | ,COUNT(DISTINCT trd_buyer_id) AS byr_cnt_dtr
- | ,SUM(log_pv) AS pv_dtr
- | ,COUNT(DISTINCT log_visitor_id) AS uv_dtr
- |FROM stream_source
- |GROUP BY seller_id,sku_id,venture,stat_date
- """.stripMargin
-
- val sql =
- s"""
- |SELECT
- | CONCAT(seller_id, venture, stat_date, sku_id) as rowkey,
- | seller_id,
- | sku_id,
- | venture,
- | stat_date,
- | amt_dtr,
- | byr_cnt_dtr,
- | pv_dtr,
- | uv_dtr
- |FROM (
- | SELECT
- | seller_id,
- | sku_id,
- | venture,
- | stat_date,
- | amt_dtr,
- | byr_cnt_dtr,
- | pv_dtr,
- | uv_dtr,
- | ROW_NUMBER() OVER (PARTITION BY seller_id, venture, stat_date
- | ORDER BY amt_dtr DESC) AS rownum
- | FROM ($group_sql)
- |)
- |WHERE rownum <= 10
- """.stripMargin
-
- util.verifyPlanWithTrait(sql)
- }
-
// TODO add tests about multi-sinks and udf
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
index b5c51e1..d9ccb89 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortDistinctAggregateITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
import org.apache.flink.table.planner.utils.{CountAggFunction, IntSumAggFunction}
-import org.junit.{Ignore, Test}
+import org.junit.Test
import scala.collection.Seq
@@ -86,12 +86,4 @@ class SortDistinctAggregateITCase extends DistinctAggregateITCaseBase {
)
}
- @Ignore
- @Test
- def testApproximateCountDistinct(): Unit = {
- checkResult(
- "SELECT APPROX_COUNT_DISTINCT(b) FROM Table3",
- Seq(row(6))
- )
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 75178e5..7d8796d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -398,42 +398,6 @@ class AggregateITCase(
}
@Test
- def testIncrSum(): Unit = {
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "A"))
- data.+=((-2, 2L, "B"))
- data.+=((3, 2L, "B"))
- data.+=((-4, 3L, "C"))
- data.+=((5, 3L, "C"))
- data.+=((6, 3L, "C"))
- data.+=((-7, 4L, "B"))
- data.+=((8, 4L, "A"))
- data.+=((9, 4L, "D"))
- data.+=((10, 4L, "E"))
- data.+=((-11, 5L, "A"))
- data.+=((12, 5L, "B"))
-
- val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
- tEnv.registerTable("T", t)
-
- val sql =
- """
- |SELECT b, incr_sum(a)
- |FROM T
- |GROUP BY b
- """.stripMargin
-
- val t1 = tEnv.sqlQuery(sql)
- val sink = new TestingRetractSink
- t1.toRetractStream[Row].addSink(sink)
- env.execute()
-
- val expected = List("1,1", "2,3", "3,11", "4,27", "5,12")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
-
- }
-
- @Test
def testNestedGroupByAgg(): Unit = {
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "A"))