You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/09 13:02:51 UTC
[flink] 03/04: [FLINK-13547][table-planner-blink] Align the
implementation of TRUNCATE() function with old planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0196a95c74cba8a9fb072385dbc7e5920d357d70
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Thu Aug 8 18:01:23 2019 +0800
[FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner
---
.../planner/codegen/calls/BuiltInMethods.scala | 22 ++++++
.../planner/codegen/calls/FunctionGenerator.scala | 51 ++++++++++++
.../planner/expressions/ScalarFunctionsTest.scala | 92 ++++++++++++++++++++++
.../planner/expressions/SqlExpressionTest.scala | 2 +
.../validation/ScalarFunctionsValidationTest.scala | 40 +++++++++-
.../table/runtime/functions/SqlFunctionUtils.java | 31 ++++++++
6 files changed, 237 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 11c6149..3307fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -436,4 +436,26 @@ object BuiltInMethods {
val STRING_TO_TIME = Types.lookupMethod(
classOf[SqlDateTimeUtils], "timeStringToUnixDate", classOf[String])
+
+ val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ classOf[Double])
+ val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+ classOf[Float])
+ val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ classOf[Int])
+ val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ classOf[Long])
+ val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+ classOf[Decimal])
+
+ val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ classOf[Double], classOf[Int])
+ val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+ classOf[Float], classOf[Int])
+ val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ classOf[Int], classOf[Int])
+ val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+ classOf[Long], classOf[Int])
+ val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+ classOf[Decimal], classOf[Int])
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index d1eb672..d3c34a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -701,6 +701,57 @@ object FunctionGenerator {
addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(BIGINT),
+ BuiltInMethods.TRUNCATE_LONG_ONE)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(INTEGER),
+ BuiltInMethods.TRUNCATE_INT_ONE)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(DECIMAL),
+ BuiltInMethods.TRUNCATE_DEC_ONE)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(DOUBLE),
+ BuiltInMethods.TRUNCATE_DOUBLE_ONE)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(FLOAT),
+ BuiltInMethods.TRUNCATE_FLOAT_ONE)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(BIGINT, INTEGER),
+ BuiltInMethods.TRUNCATE_LONG)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(INTEGER, INTEGER),
+ BuiltInMethods.TRUNCATE_INT)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(DECIMAL, INTEGER),
+ BuiltInMethods.TRUNCATE_DEC)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(DOUBLE, INTEGER),
+ BuiltInMethods.TRUNCATE_DOUBLE)
+
+ addSqlFunctionMethod(
+ TRUNCATE,
+ Seq(FLOAT, INTEGER),
+ BuiltInMethods.TRUNCATE_FLOAT)
+
+
// ----------------------------------------------------------------------------------------------
/**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 56fa16d..9411a75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -1395,6 +1395,98 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"-")
}
+ @Test
+ def testTruncate(): Unit = {
+ testAllApis(
+ 'f29.truncate('f30),
+ "f29.truncate(f30)",
+ "truncate(f29, f30)",
+ "0.4")
+
+ testAllApis(
+ 'f31.truncate('f7),
+ "f31.truncate(f7)",
+ "truncate(f31, f7)",
+ "-0.123")
+
+ testAllApis(
+ 'f4.truncate('f32),
+ "f4.truncate(f32)",
+ "truncate(f4, f32)",
+ "40")
+
+ testAllApis(
+ 'f28.cast(DataTypes.DOUBLE).truncate(1),
+ "f28.cast(DOUBLE).truncate(1)",
+ "truncate(cast(f28 as DOUBLE), 1)",
+ "0.4")
+
+ // TODO: ignore TableApiTest for cast to DECIMAL(p, s) is not support now.
+ // see https://issues.apache.org/jira/browse/FLINK-13651
+// testAllApis(
+// 'f31.cast(DataTypes.DECIMAL(38, 18)).truncate(2),
+// "f31.cast(DECIMAL(10, 10)).truncate(2)",
+// "truncate(cast(f31 as decimal(38, 18)), 2)",
+// "-0.12")
+//
+// testAllApis(
+// 'f36.cast(DataTypes.DECIMAL(38, 18)).truncate(),
+// "f36.cast(DECIMAL(10, 10)).truncate()",
+// "truncate(42.324)",
+// "42")
+
+ testSqlApi("truncate(cast(f31 as decimal(38, 18)), 2)", "-0.12")
+
+ testAllApis(
+ 'f5.cast(DataTypes.FLOAT).truncate(),
+ "f5.cast(FLOAT).truncate()",
+ "truncate(cast(f5 as float))",
+ "4.0")
+
+ testAllApis(
+ 42.truncate(-1),
+ "42.truncate(-1)",
+ "truncate(42, -1)",
+ "40")
+
+ testAllApis(
+ 42.truncate(-3),
+ "42.truncate(-3)",
+ "truncate(42, -3)",
+ "0")
+
+ // The validation parameter is null
+ testAllApis(
+ 'f33.cast(DataTypes.INT).truncate(1),
+ "f33.cast(INT).truncate(1)",
+ "truncate(cast(null as integer), 1)",
+ "null")
+
+ testAllApis(
+ 43.21.truncate('f33.cast(DataTypes.INT)),
+ "43.21.truncate(f33.cast(INT))",
+ "truncate(43.21, cast(null as integer))",
+ "null")
+
+ testAllApis(
+ 'f33.cast(DataTypes.DOUBLE).truncate(1),
+ "f33.cast(DOUBLE).truncate(1)",
+ "truncate(cast(null as double), 1)",
+ "null")
+
+ testAllApis(
+ 'f33.cast(DataTypes.INT).truncate(1),
+ "f33.cast(INT).truncate(1)",
+ "truncate(cast(null as integer))",
+ "null")
+
+ testAllApis(
+ 'f33.cast(DataTypes.DOUBLE).truncate(),
+ "f33.cast(DOUBLE).truncate()",
+ "truncate(cast(null as double))",
+ "null")
+ }
+
// ----------------------------------------------------------------------------------------------
// Math functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 35a60c1..d31c937 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -119,6 +119,8 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("ROUND(-12.345, 2)", "-12.35")
testSqlApi("PI()", "3.141592653589793")
testSqlApi("E()", "2.718281828459045")
+ testSqlApi("truncate(42.345)", "42")
+ testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.34")
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index da4763f..8299a12 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.expressions.validation
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{SqlParserException, ValidationException}
import org.apache.flink.table.expressions.TimePointUnit
+import org.apache.flink.table.planner.codegen.CodeGenException
import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
import org.apache.calcite.avatica.util.TimeUnit
import org.junit.{Ignore, Test}
@@ -69,6 +69,44 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
testSqlApi("BIN(f16)", "101010") // Date type
}
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidTruncate1(): Unit = {
+ // All arguments are string type
+ testSqlApi(
+ "TRUNCATE('abc', 'def')",
+ "FAIL")
+
+ // The second argument is of type String
+ testSqlApi(
+ "TRUNCATE(f12, f0)",
+ "FAIL")
+
+ // The second argument is of type Float
+ testSqlApi(
+ "TRUNCATE(f12,f12)",
+ "FAIL")
+
+ // The second argument is of type Double
+ testSqlApi(
+ "TRUNCATE(f12, cast(f28 as DOUBLE))",
+ "FAIL")
+
+ // The second argument is of type BigDecimal
+ testSqlApi(
+ "TRUNCATE(f12,f15)",
+ "FAIL")
+ }
+
+ @Test
+ def testInvalidTruncate2(): Unit = {
+ thrown.expect(classOf[CodeGenException])
+ // The one argument is of type String
+ testSqlApi(
+ "TRUNCATE('abc')",
+ "FAIL")
+ }
+
// ----------------------------------------------------------------------------------------------
// String functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index f904fd5..9a993a5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1074,4 +1074,35 @@ public class SqlFunctionUtils {
public static String uuid(byte[] b){
return UUID.nameUUIDFromBytes(b).toString();
}
+
+ /** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */
+ public static Decimal struncate(Decimal b0) {
+ return struncate(b0, 0);
+ }
+
+ public static Decimal struncate(Decimal b0, int b1) {
+ if (b1 >= b0.getScale()) {
+ return b0;
+ }
+
+ BigDecimal b2 = b0.toBigDecimal().movePointRight(b1)
+ .setScale(0, RoundingMode.DOWN).movePointLeft(b1);
+ int p = b0.getPrecision();
+ int s = b0.getScale();
+
+ if (b1 < 0) {
+ return Decimal.fromBigDecimal(b2, Math.min(38, 1 + p - s), 0);
+ } else {
+ return Decimal.fromBigDecimal(b2, 1 + p - s + b1, b1);
+ }
+ }
+
+ /** SQL <code>TRUNCATE</code> operator applied to double values. */
+ public static float struncate(float b0) {
+ return struncate(b0, 0);
+ }
+
+ public static float struncate(float b0, int b1) {
+ return (float) struncate(Decimal.castFrom((double) b0, 38, 18), b1).doubleValue();
+ }
}