You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/06 04:21:33 UTC

[flink] branch release-1.9 updated: [FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type into account when generate literal expression

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 1983e6f  [FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type into account when generate literal expression
1983e6f is described below

commit 1983e6f8cae2b2a5ca0c52f129f9756d0ecd91b4
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Aug 5 13:36:12 2019 +0200

    [FLINK-13584][table-planner-blink] RankLikeAggFunctionBase should take type into account when generate literal expression
    
    This closes #9360
---
 .../aggfunctions/RankLikeAggFunctionBase.java      | 47 +++++++++++++++-------
 .../runtime/batch/sql/OverWindowITCase.scala       | 25 ++++++++++++
 2 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
index a379775..5646b0b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.planner.functions.aggfunctions;
 
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.Expression;
@@ -26,17 +29,17 @@ import org.apache.flink.table.planner.expressions.ExpressionBuilder;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Optional;
 
 import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
 
 /**
  * built-in rank like aggregate function, e.g. rank, dense_rank
@@ -98,34 +101,48 @@ public abstract class RankLikeAggFunctionBase extends DeclarativeAggregateFuncti
 	}
 
 	protected Expression generateInitLiteral(LogicalType orderType) {
+		Object value;
 		switch (orderType.getTypeRoot()) {
 			case BOOLEAN:
-				return literal(false);
+				value = false;
+				break;
 			case TINYINT:
-				return literal((byte) 0);
+				value = (byte) 0;
+				break;
 			case SMALLINT:
-				return literal((short) 0);
+				value = (short) 0;
+				break;
 			case INTEGER:
-				return literal(0);
+				value = 0;
+				break;
 			case BIGINT:
-				return literal(0L);
+				value = 0L;
+				break;
 			case FLOAT:
-				return literal(0.0f);
+				value = 0.0f;
+				break;
 			case DOUBLE:
-				return literal(0.0d);
+				value = 0.0d;
+				break;
 			case DECIMAL:
-				return literal(java.math.BigDecimal.ZERO);
+				value = BigDecimal.ZERO;
+				break;
 			case CHAR:
 			case VARCHAR:
-				return literal("");
+				value = "";
+				break;
 			case DATE:
-				return literal(new Date(0));
+				value = LocalDateSerializer.INSTANCE.createInstance();
+				break;
 			case TIME_WITHOUT_TIME_ZONE:
-				return literal(new Time(0));
+				value = LocalTimeSerializer.INSTANCE.createInstance();
+				break;
 			case TIMESTAMP_WITHOUT_TIME_ZONE:
-				return literal(new Timestamp(0));
+				value = LocalDateTimeSerializer.INSTANCE.createInstance();
+				break;
 			default:
 				throw new TableException("Unsupported type: " + orderType);
 		}
+		return valueLiteral(value, fromLogicalTypeToDataType(orderType));
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
index faac2d4..a4881b9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverWindowITCase.scala
@@ -292,6 +292,31 @@ class OverWindowITCase extends BatchTestBase {
   }
 
   @Test
+  def testRankByDecimal(): Unit = {
+    checkResult(
+      "SELECT d, de, rank() over (order by de desc), dense_rank() over (order by de desc) FROM" +
+          " (select d, cast(e as decimal(10, 0)) as de from Table5)",
+      Seq(
+        row(5, 15, 1, 1),
+        row(5, 14, 2, 2),
+        row(5, 13, 3, 3),
+        row(5, 12, 4, 4),
+        row(5, 11, 5, 5),
+        row(4, 10, 6, 6),
+        row(4, 9, 7, 7),
+        row(4, 8, 8, 8),
+        row(4, 7, 9, 9),
+        row(3, 6, 10, 10),
+        row(3, 5, 11, 11),
+        row(3, 4, 12, 12),
+        row(2, 3, 13, 13),
+        row(2, 2, 14, 14),
+        row(1, 1, 15, 15)
+      )
+    )
+  }
+
+  @Test
   def testWindowAggregationRank3(): Unit = {
 
     checkResult(