You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/06 04:21:33 UTC
[flink] branch release-1.9 updated:
[FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type
into account when generate literal expression
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 1983e6f [FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type into account when generate literal expression
1983e6f is described below
commit 1983e6f8cae2b2a5ca0c52f129f9756d0ecd91b4
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Aug 5 13:36:12 2019 +0200
[FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type into account when generate literal expression
This closes #9360
---
.../aggfunctions/RankLikeAggFunctionBase.java | 47 +++++++++++++++-------
.../runtime/batch/sql/OverWindowITCase.scala | 25 ++++++++++++
2 files changed, 57 insertions(+), 15 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
index a379775..5646b0b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -18,6 +18,9 @@
package org.apache.flink.table.planner.functions.aggfunctions;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.expressions.Expression;
@@ -26,17 +29,17 @@ import org.apache.flink.table.planner.expressions.ExpressionBuilder;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Optional;
import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
/**
* built-in rank like aggregate function, e.g. rank, dense_rank
@@ -98,34 +101,48 @@ public abstract class RankLikeAggFunctionBase extends DeclarativeAggregateFuncti
}
protected Expression generateInitLiteral(LogicalType orderType) {
+ Object value;
switch (orderType.getTypeRoot()) {
case BOOLEAN:
- return literal(false);
+ value = false;
+ break;
case TINYINT:
- return literal((byte) 0);
+ value = (byte) 0;
+ break;
case SMALLINT:
- return literal((short) 0);
+ value = (short) 0;
+ break;
case INTEGER:
- return literal(0);
+ value = 0;
+ break;
case BIGINT:
- return literal(0L);
+ value = 0L;
+ break;
case FLOAT:
- return literal(0.0f);
+ value = 0.0f;
+ break;
case DOUBLE:
- return literal(0.0d);
+ value = 0.0d;
+ break;
case DECIMAL:
- return literal(java.math.BigDecimal.ZERO);
+ value = BigDecimal.ZERO;
+ break;
case CHAR:
case VARCHAR:
- return literal("");
+ value = "";
+ break;
case DATE:
- return literal(new Date(0));
+ value = LocalDateSerializer.INSTANCE.createInstance();
+ break;
case TIME_WITHOUT_TIME_ZONE:
- return literal(new Time(0));
+ value = LocalTimeSerializer.INSTANCE.createInstance();
+ break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return literal(new Timestamp(0));
+ value = LocalDateTimeSerializer.INSTANCE.createInstance();
+ break;
default:
throw new TableException("Unsupported type: " + orderType);
}
+ return valueLiteral(value, fromLogicalTypeToDataType(orderType));
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
index faac2d4..a4881b9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
@@ -292,6 +292,31 @@ class OverWindowITCase extends BatchTestBase {
}
@Test
+ def testRankByDecimal(): Unit = {
+ checkResult(
+ "SELECT d, de, rank() over (order by de desc), dense_rank() over (order by de desc) FROM" +
+ " (select d, cast(e as decimal(10, 0)) as de from Table5)",
+ Seq(
+ row(5, 15, 1, 1),
+ row(5, 14, 2, 2),
+ row(5, 13, 3, 3),
+ row(5, 12, 4, 4),
+ row(5, 11, 5, 5),
+ row(4, 10, 6, 6),
+ row(4, 9, 7, 7),
+ row(4, 8, 8, 8),
+ row(4, 7, 9, 9),
+ row(3, 6, 10, 10),
+ row(3, 5, 11, 11),
+ row(3, 4, 12, 12),
+ row(2, 3, 13, 13),
+ row(2, 2, 14, 14),
+ row(1, 1, 15, 15)
+ )
+ )
+ }
+
+ @Test
def testWindowAggregationRank3(): Unit = {
checkResult(