You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/09 13:02:51 UTC

[flink] 03/04: [FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0196a95c74cba8a9fb072385dbc7e5920d357d70
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Thu Aug 8 18:01:23 2019 +0800

    [FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner
---
 .../planner/codegen/calls/BuiltInMethods.scala     | 22 ++++++
 .../planner/codegen/calls/FunctionGenerator.scala  | 51 ++++++++++++
 .../planner/expressions/ScalarFunctionsTest.scala  | 92 ++++++++++++++++++++++
 .../planner/expressions/SqlExpressionTest.scala    |  2 +
 .../validation/ScalarFunctionsValidationTest.scala | 40 +++++++++-
 .../table/runtime/functions/SqlFunctionUtils.java  | 31 ++++++++
 6 files changed, 237 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 11c6149..3307fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -436,4 +436,26 @@ object BuiltInMethods {
 
   val STRING_TO_TIME = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "timeStringToUnixDate", classOf[String])
+
+  val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Double])
+  val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Float])
+  val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Int])
+  val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Long])
+  val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Decimal])
+
+  val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Double], classOf[Int])
+  val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Float], classOf[Int])
+  val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Int], classOf[Int])
+  val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Long], classOf[Int])
+  val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Decimal], classOf[Int])
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index d1eb672..d3c34a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -701,6 +701,57 @@ object FunctionGenerator {
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
 
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(BIGINT),
+    BuiltInMethods.TRUNCATE_LONG_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(INTEGER),
+    BuiltInMethods.TRUNCATE_INT_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DECIMAL),
+    BuiltInMethods.TRUNCATE_DEC_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DOUBLE),
+    BuiltInMethods.TRUNCATE_DOUBLE_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(FLOAT),
+    BuiltInMethods.TRUNCATE_FLOAT_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(BIGINT, INTEGER),
+    BuiltInMethods.TRUNCATE_LONG)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(INTEGER, INTEGER),
+    BuiltInMethods.TRUNCATE_INT)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DECIMAL, INTEGER),
+    BuiltInMethods.TRUNCATE_DEC)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DOUBLE, INTEGER),
+    BuiltInMethods.TRUNCATE_DOUBLE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(FLOAT, INTEGER),
+    BuiltInMethods.TRUNCATE_FLOAT)
+
+
   // ----------------------------------------------------------------------------------------------
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 56fa16d..9411a75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -1395,6 +1395,98 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "-")
   }
 
+  @Test
+  def testTruncate(): Unit = {
+    testAllApis(
+      'f29.truncate('f30),
+      "f29.truncate(f30)",
+      "truncate(f29, f30)",
+      "0.4")
+
+    testAllApis(
+      'f31.truncate('f7),
+      "f31.truncate(f7)",
+      "truncate(f31, f7)",
+      "-0.123")
+
+    testAllApis(
+      'f4.truncate('f32),
+      "f4.truncate(f32)",
+      "truncate(f4, f32)",
+      "40")
+
+    testAllApis(
+      'f28.cast(DataTypes.DOUBLE).truncate(1),
+      "f28.cast(DOUBLE).truncate(1)",
+      "truncate(cast(f28 as DOUBLE), 1)",
+      "0.4")
+
+    // TODO: ignore TableApiTest for cast to DECIMAL(p, s) is not support now.
+    //  see https://issues.apache.org/jira/browse/FLINK-13651
+//    testAllApis(
+//      'f31.cast(DataTypes.DECIMAL(38, 18)).truncate(2),
+//      "f31.cast(DECIMAL(10, 10)).truncate(2)",
+//      "truncate(cast(f31 as decimal(38, 18)), 2)",
+//      "-0.12")
+//
+//    testAllApis(
+//      'f36.cast(DataTypes.DECIMAL(38, 18)).truncate(),
+//      "f36.cast(DECIMAL(10, 10)).truncate()",
+//      "truncate(42.324)",
+//      "42")
+
+    testSqlApi("truncate(cast(f31 as decimal(38, 18)), 2)", "-0.12")
+
+    testAllApis(
+      'f5.cast(DataTypes.FLOAT).truncate(),
+      "f5.cast(FLOAT).truncate()",
+      "truncate(cast(f5 as float))",
+      "4.0")
+
+    testAllApis(
+      42.truncate(-1),
+      "42.truncate(-1)",
+      "truncate(42, -1)",
+      "40")
+
+    testAllApis(
+      42.truncate(-3),
+      "42.truncate(-3)",
+      "truncate(42, -3)",
+      "0")
+
+    //    The validation parameter is null
+    testAllApis(
+      'f33.cast(DataTypes.INT).truncate(1),
+      "f33.cast(INT).truncate(1)",
+      "truncate(cast(null as integer), 1)",
+      "null")
+
+    testAllApis(
+      43.21.truncate('f33.cast(DataTypes.INT)),
+      "43.21.truncate(f33.cast(INT))",
+      "truncate(43.21, cast(null as integer))",
+      "null")
+
+    testAllApis(
+      'f33.cast(DataTypes.DOUBLE).truncate(1),
+      "f33.cast(DOUBLE).truncate(1)",
+      "truncate(cast(null as double), 1)",
+      "null")
+
+    testAllApis(
+      'f33.cast(DataTypes.INT).truncate(1),
+      "f33.cast(INT).truncate(1)",
+      "truncate(cast(null as integer))",
+      "null")
+
+    testAllApis(
+      'f33.cast(DataTypes.DOUBLE).truncate(),
+      "f33.cast(DOUBLE).truncate()",
+      "truncate(cast(null as double))",
+      "null")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Math functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 35a60c1..d31c937 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -119,6 +119,8 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("ROUND(-12.345, 2)", "-12.35")
     testSqlApi("PI()", "3.141592653589793")
     testSqlApi("E()", "2.718281828459045")
+    testSqlApi("truncate(42.345)", "42")
+    testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.34")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index da4763f..8299a12 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.expressions.validation
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{SqlParserException, ValidationException}
 import org.apache.flink.table.expressions.TimePointUnit
+import org.apache.flink.table.planner.codegen.CodeGenException
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
 import org.apache.calcite.avatica.util.TimeUnit
 import org.junit.{Ignore, Test}
 
@@ -69,6 +69,44 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
     testSqlApi("BIN(f16)", "101010") // Date type
   }
 
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTruncate1(): Unit = {
+    // All arguments are string type
+    testSqlApi(
+      "TRUNCATE('abc', 'def')",
+      "FAIL")
+
+    // The second argument is of type String
+    testSqlApi(
+      "TRUNCATE(f12, f0)",
+      "FAIL")
+
+    // The second argument is of type Float
+    testSqlApi(
+      "TRUNCATE(f12,f12)",
+      "FAIL")
+
+    // The second argument is of type Double
+    testSqlApi(
+      "TRUNCATE(f12, cast(f28 as DOUBLE))",
+      "FAIL")
+
+    // The second argument is of type BigDecimal
+    testSqlApi(
+      "TRUNCATE(f12,f15)",
+      "FAIL")
+  }
+
+  @Test
+  def testInvalidTruncate2(): Unit = {
+    thrown.expect(classOf[CodeGenException])
+    // The one argument is of type String
+    testSqlApi(
+      "TRUNCATE('abc')",
+      "FAIL")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // String functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index f904fd5..9a993a5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1074,4 +1074,35 @@ public class SqlFunctionUtils {
 	public static String uuid(byte[] b){
 		return UUID.nameUUIDFromBytes(b).toString();
 	}
+
+	/** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */
+	public static Decimal struncate(Decimal b0) {
+		return struncate(b0, 0);
+	}
+
+	public static Decimal struncate(Decimal b0, int b1) {
+		if (b1 >= b0.getScale()) {
+			return b0;
+		}
+
+		BigDecimal b2 = b0.toBigDecimal().movePointRight(b1)
+			.setScale(0, RoundingMode.DOWN).movePointLeft(b1);
+		int p = b0.getPrecision();
+		int s = b0.getScale();
+
+		if (b1 < 0) {
+			return Decimal.fromBigDecimal(b2, Math.min(38, 1 + p - s), 0);
+		} else {
+			return Decimal.fromBigDecimal(b2, 1 + p - s + b1, b1);
+		}
+	}
+
+	/** SQL <code>TRUNCATE</code> operator applied to double values. */
+	public static float struncate(float b0) {
+		return struncate(b0, 0);
+	}
+
+	public static float struncate(float b0, int b1) {
+		return (float) struncate(Decimal.castFrom((double) b0, 38, 18), b1).doubleValue();
+	}
 }