You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/09 13:02:48 UTC

[flink] branch master updated (6515807 -> 7e13f5b)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6515807  [hotfix][tests] Add NoOpJobFailCall singleton
     new 5e697cf  [FLINK-13547][table-planner-blink] Refactor CONCAT() and CONCAT_WS() to keep it compatible with old planner
     new 2ae0e8a  [FLINK-13547][table-planner-blink] Fix FROM_BASE64() should return STRING type instead of BINARY
     new 0196a95  [FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner
     new 7e13f5b  [FLINK-13547][table-planner-blink] Remove LENGTH(), JSONVALUE(), KEYVALUE(), SUBSTR() builtin functions which are not standard.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../functions/sql/FlinkSqlOperatorTable.java       |  81 ++----------
 .../planner/codegen/calls/BuiltInMethods.scala     |  22 ++++
 .../planner/codegen/calls/FunctionGenerator.scala  |  51 ++++++++
 .../planner/codegen/calls/StringCallGen.scala      |   8 +-
 .../planner/expressions/ScalarFunctionsTest.scala  | 141 ++++++++++++++-------
 .../planner/expressions/SqlExpressionTest.scala    |   8 +-
 .../planner/expressions/TemporalTypesTest.scala    |   2 +-
 .../validation/ScalarFunctionsValidationTest.scala |  40 +++++-
 .../flink/table/dataformat/BinaryStringUtil.java   |  28 ++--
 .../table/runtime/functions/SqlFunctionUtils.java  |  31 +++++
 .../flink/table/dataformat/BinaryStringTest.java   |  12 +-
 11 files changed, 277 insertions(+), 147 deletions(-)


[flink] 04/04: [FLINK-13547][table-planner-blink] Remove LENGTH(), JSONVALUE(), KEYVALUE(), SUBSTR() builtin functions which are not standard.

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e13f5b43f417b8e29e1964aa109a463b152991a
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Thu Aug 8 11:40:24 2019 +0800

    [FLINK-13547][table-planner-blink] Remove LENGTH(), JSONVALUE(), KEYVALUE(), SUBSTR() builtin functions which are not standard.
    
    LENGTH, SUBSTR, KEYVALUE can be covered by existing functions, e.g. CHAR_LENGTH, SUBSTRING, STR_TO_MAP(str)[key].
---
 .../functions/sql/FlinkSqlOperatorTable.java       | 79 ++++------------------
 .../planner/codegen/calls/StringCallGen.scala      |  8 +--
 .../planner/expressions/ScalarFunctionsTest.scala  | 43 +-----------
 .../planner/expressions/SqlExpressionTest.scala    |  6 --
 .../planner/expressions/TemporalTypesTest.scala    |  2 +-
 5 files changed, 16 insertions(+), 122 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 04c867b..a1228fc 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -263,14 +263,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 			OperandTypes.family(SqlTypeFamily.STRING)),
 		SqlFunctionCategory.NUMERIC);
 
-	public static final SqlFunction JSONVALUE = new SqlFunction(
-		"JSONVALUE",
-		SqlKind.OTHER_FUNCTION,
-		VARCHAR_2000_NULLABLE,
-		null,
-		OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER),
-		SqlFunctionCategory.STRING);
-
 	public static final SqlFunction STR_TO_MAP = new SqlFunction(
 		"STR_TO_MAP",
 		SqlKind.OTHER_FUNCTION,
@@ -398,18 +390,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 			OperandTypes.STRING_STRING),
 		SqlFunctionCategory.STRING);
 
-	public static final SqlFunction KEYVALUE = new SqlFunction(
-		"KEYVALUE",
-		SqlKind.OTHER_FUNCTION,
-		VARCHAR_2000_NULLABLE,
-		null,
-		OperandTypes.family(
-			SqlTypeFamily.STRING,
-			SqlTypeFamily.STRING,
-			SqlTypeFamily.STRING,
-			SqlTypeFamily.STRING),
-		SqlFunctionCategory.STRING);
-
 	public static final SqlFunction HASH_CODE = new SqlFunction(
 		"HASH_CODE",
 		SqlKind.OTHER_FUNCTION,
@@ -429,9 +409,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction SHA1 = new SqlFunction(
@@ -439,9 +417,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction SHA224 = new SqlFunction(
@@ -449,9 +425,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction SHA256 = new SqlFunction(
@@ -459,9 +433,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction SHA384 = new SqlFunction(
@@ -469,9 +441,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction SHA512 = new SqlFunction(
@@ -479,9 +449,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction SHA2 = new SqlFunction(
@@ -489,9 +457,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		VARCHAR_2000_NULLABLE,
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.INTEGER)),
+		OperandTypes.sequence("'SHA2(DATA, HASH_LENGTH)'",
+			OperandTypes.STRING,  OperandTypes.NUMERIC_INTEGER),
 		SqlFunctionCategory.STRING);
 
 	public static final SqlFunction DATE_FORMAT = new SqlFunction(
@@ -625,9 +592,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		ReturnTypes.VARCHAR_2000,
 		null,
-		OperandTypes.or(
-			OperandTypes.NILADIC,
-			OperandTypes.ANY),
+		OperandTypes.NILADIC,
 		SqlFunctionCategory.STRING) {
 
 		@Override
@@ -647,19 +612,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		ARG0_VARCHAR_FORCE_NULLABLE,
 		null,
 		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)
-		),
-		SqlFunctionCategory.STRING);
-
-	public static final SqlFunction SUBSTR = new SqlFunction(
-		"SUBSTR",
-		SqlKind.OTHER_FUNCTION,
-		ARG0_VARCHAR_FORCE_NULLABLE,
-		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)
+			OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER),
+			OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)
 		),
 		SqlFunctionCategory.STRING);
 
@@ -717,21 +671,12 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.INTEGER)),
 		SqlFunctionCategory.NUMERIC);
 
-	public static final SqlFunction LENGTH = new SqlFunction(
-		"LENGTH",
-		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.INTEGER_NULLABLE,
-		null,
-		OperandTypes.family(SqlTypeFamily.STRING),
-		SqlFunctionCategory.NUMERIC);
-
-
 	public static final SqlFunction ASCII = new SqlFunction(
 		"ASCII",
 		SqlKind.OTHER_FUNCTION,
 		ReturnTypes.INTEGER_NULLABLE,
 		null,
-		OperandTypes.family(SqlTypeFamily.STRING),
+		OperandTypes.family(SqlTypeFamily.CHARACTER),
 		SqlFunctionCategory.NUMERIC);
 
 	public static final SqlFunction ENCODE = new SqlFunction(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 7c4a129..0e7c5d1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -63,13 +63,13 @@ object StringCallGen {
       case NOT_LIKE =>
         generateNot(ctx, new LikeCallGen().generate(ctx, operands, new BooleanType()))
 
-      case SUBSTRING | SUBSTR => generateSubString(ctx, operands)
+      case SUBSTRING => generateSubString(ctx, operands)
 
       case LEFT => generateLeft(ctx, operands.head, operands(1))
 
       case RIGHT => generateRight(ctx, operands.head, operands(1))
 
-      case CHAR_LENGTH | CHARACTER_LENGTH | LENGTH => generateCharLength(ctx, operands)
+      case CHAR_LENGTH | CHARACTER_LENGTH => generateCharLength(ctx, operands)
 
       case SIMILAR_TO => generateSimilarTo(ctx, operands)
 
@@ -109,8 +109,6 @@ object StringCallGen {
 
       case SPLIT_INDEX => generateSplitIndex(ctx, operands)
 
-      case KEYVALUE => generateKeyValue(ctx, operands)
-
       case HASH_CODE if isCharacterString(operands.head.resultType) =>
         generateHashCode(ctx, operands)
 
@@ -138,8 +136,6 @@ object StringCallGen {
 
       case REGEXP => generateRegExp(ctx, operands)
 
-      case JSONVALUE => generateJsonValue(ctx, operands)
-
       case BIN => generateBin(ctx, operands)
 
       case CONCAT_FUNCTION =>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 9411a75..921b2ac 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -518,21 +518,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testLength(): Unit = {
-    testSqlApi(
-      "LENGTH(f0)",
-      "22")
-
-    testSqlApi(
-      "LENGTH(f0)",
-      "22")
-
-    testSqlApi(
-      "length(uuid())",
-      "36")
-  }
-
-  @Test
   def testUpperCase(): Unit = {
     testAllApis(
       'f0.upperCase(),
@@ -953,7 +938,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   @Test
   def testSubString(): Unit = {
-    Array("substring", "substr").foreach {
+    Array("substring").foreach {
       substr =>
         testSqlApi(s"$substr(f0, 2, 3)", "his")
         testSqlApi(s"$substr(f0, 2, 100)", "his is a test String.")
@@ -1160,21 +1145,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testKeyValue(): Unit = {
-    // NOTE: Spark has str_to_map
-    testSqlApi("keyValue('a=1,b=2,c=3', ',', '=', 'a')", "1")
-    testSqlApi("keyValue('a=1,b=2,c=3', ',', '=', 'b')", "2")
-    testSqlApi("keyValue('a=1,b=2,c=3', ',', '=', 'c')", "3")
-    testSqlApi("keyValue('', ',', '=', 'c')", "null")
-    testSqlApi("keyValue(f40, ',', '=', 'c')", "null")
-    testSqlApi("keyValue(CAST(null as VARCHAR), ',', '=', 'c')", "null")
-    testSqlApi("keyValue('a=1,b=2,c=3', ',', '=', 'd')", "null")
-    testSqlApi("keyValue('a=1,b=2,c=3', CAST(null as VARCHAR), '=', 'a')", "null")
-    testSqlApi("keyValue('a=1,b=2,c=3', ',', CAST(null as VARCHAR), 'a')", "null")
-    testSqlApi("keyValue('a=1,b=2,c=3', ',', '=', CAST(null as VARCHAR))", "null")
-  }
-
-  @Test
   def testHashCode(): Unit = {
     testSqlApi("hash_code('abc')", "96354")
     testSqlApi("hash_code(f35)", "97")
@@ -1202,17 +1172,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testJsonValue(): Unit = {
-    testSqlApi("jsonValue('[10, 20, [30, 40]]', '$[2][*]')", "[30,40]")
-    testSqlApi("jsonValue('[10, 20, [30, [40, 50, 60]]]', '$[2][*][1][*]')", "[30,[40,50,60]]")
-    testSqlApi("jsonValue(f40, '$[2][*][1][*]')", "null")
-    testSqlApi("jsonValue('[10, 20, [30, [40, 50, 60]]]', '')", "null")
-    testSqlApi("jsonValue('', '$[2][*][1][*]')", "null")
-    testSqlApi("jsonValue(CAST(null as VARCHAR), '$[2][*][1][*]')", "null")
-    testSqlApi("jsonValue('[10, 20, [30, [40, 50, 60]]]', CAST(null as VARCHAR))", "null")
-  }
-
-  @Test
   def testHex(): Unit = {
     testAllApis(
       100.hex(),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index d31c937..42de09d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -158,10 +158,6 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("SUBSTRING('hello world', 2)", "ello world")
     testSqlApi("SUBSTRING('hello world', 2, 3)", "ell")
     testSqlApi("SUBSTRING('hello world', 2, 300)", "ello world")
-    testSqlApi("SUBSTR('hello world', 2, 3)", "ell")
-    testSqlApi("SUBSTR('hello world', 2)", "ello world")
-    testSqlApi("SUBSTR('hello world', 2, 300)", "ello world")
-    testSqlApi("SUBSTR('hello world', 0, 3)", "hel")
     testSqlApi("INITCAP('hello world')", "Hello World")
     testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
     testSqlApi("REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)", "bar")
@@ -177,8 +173,6 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("CASE WHEN 1 = 2 THEN 2 WHEN 1 = 1 THEN 3 ELSE 3 END", "3")
     testSqlApi("NULLIF(1, 1)", "null")
     testSqlApi("COALESCE(NULL, 5)", "5")
-    testSqlApi("COALESCE(keyvalue('', ';', ':', 'isB2C'), '5')", "5")
-    testSqlApi("COALESCE(jsonvalue('xx', '$x'), '5')", "5")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 27965f9..4e1bb8d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -589,7 +589,7 @@ class TemporalTypesTest extends ExpressionTestBase {
     )
 
     testSqlApi(
-      "TO_TIMESTAMP(SUBSTR('', 2, -1))",
+      "TO_TIMESTAMP(SUBSTRING('', 2, -1))",
       "null"
     )
 


[flink] 03/04: [FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0196a95c74cba8a9fb072385dbc7e5920d357d70
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Thu Aug 8 18:01:23 2019 +0800

    [FLINK-13547][table-planner-blink] Align the implementation of TRUNCATE() function with old planner
---
 .../planner/codegen/calls/BuiltInMethods.scala     | 22 ++++++
 .../planner/codegen/calls/FunctionGenerator.scala  | 51 ++++++++++++
 .../planner/expressions/ScalarFunctionsTest.scala  | 92 ++++++++++++++++++++++
 .../planner/expressions/SqlExpressionTest.scala    |  2 +
 .../validation/ScalarFunctionsValidationTest.scala | 40 +++++++++-
 .../table/runtime/functions/SqlFunctionUtils.java  | 31 ++++++++
 6 files changed, 237 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 11c6149..3307fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -436,4 +436,26 @@ object BuiltInMethods {
 
   val STRING_TO_TIME = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "timeStringToUnixDate", classOf[String])
+
+  val TRUNCATE_DOUBLE_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Double])
+  val TRUNCATE_FLOAT_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Float])
+  val TRUNCATE_INT_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Int])
+  val TRUNCATE_LONG_ONE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Long])
+  val TRUNCATE_DEC_ONE = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Decimal])
+
+  val TRUNCATE_DOUBLE = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Double], classOf[Int])
+  val TRUNCATE_FLOAT = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Float], classOf[Int])
+  val TRUNCATE_INT = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Int], classOf[Int])
+  val TRUNCATE_LONG = Types.lookupMethod(classOf[SqlFunctions], "struncate",
+    classOf[Long], classOf[Int])
+  val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
+    classOf[Decimal], classOf[Int])
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index d1eb672..d3c34a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -701,6 +701,57 @@ object FunctionGenerator {
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
 
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(BIGINT),
+    BuiltInMethods.TRUNCATE_LONG_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(INTEGER),
+    BuiltInMethods.TRUNCATE_INT_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DECIMAL),
+    BuiltInMethods.TRUNCATE_DEC_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DOUBLE),
+    BuiltInMethods.TRUNCATE_DOUBLE_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(FLOAT),
+    BuiltInMethods.TRUNCATE_FLOAT_ONE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(BIGINT, INTEGER),
+    BuiltInMethods.TRUNCATE_LONG)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(INTEGER, INTEGER),
+    BuiltInMethods.TRUNCATE_INT)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DECIMAL, INTEGER),
+    BuiltInMethods.TRUNCATE_DEC)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(DOUBLE, INTEGER),
+    BuiltInMethods.TRUNCATE_DOUBLE)
+
+  addSqlFunctionMethod(
+    TRUNCATE,
+    Seq(FLOAT, INTEGER),
+    BuiltInMethods.TRUNCATE_FLOAT)
+
+
   // ----------------------------------------------------------------------------------------------
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 56fa16d..9411a75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -1395,6 +1395,98 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "-")
   }
 
+  @Test
+  def testTruncate(): Unit = {
+    testAllApis(
+      'f29.truncate('f30),
+      "f29.truncate(f30)",
+      "truncate(f29, f30)",
+      "0.4")
+
+    testAllApis(
+      'f31.truncate('f7),
+      "f31.truncate(f7)",
+      "truncate(f31, f7)",
+      "-0.123")
+
+    testAllApis(
+      'f4.truncate('f32),
+      "f4.truncate(f32)",
+      "truncate(f4, f32)",
+      "40")
+
+    testAllApis(
+      'f28.cast(DataTypes.DOUBLE).truncate(1),
+      "f28.cast(DOUBLE).truncate(1)",
+      "truncate(cast(f28 as DOUBLE), 1)",
+      "0.4")
+
+    // TODO: ignore TableApiTest for cast to DECIMAL(p, s) is not support now.
+    //  see https://issues.apache.org/jira/browse/FLINK-13651
+//    testAllApis(
+//      'f31.cast(DataTypes.DECIMAL(38, 18)).truncate(2),
+//      "f31.cast(DECIMAL(10, 10)).truncate(2)",
+//      "truncate(cast(f31 as decimal(38, 18)), 2)",
+//      "-0.12")
+//
+//    testAllApis(
+//      'f36.cast(DataTypes.DECIMAL(38, 18)).truncate(),
+//      "f36.cast(DECIMAL(10, 10)).truncate()",
+//      "truncate(42.324)",
+//      "42")
+
+    testSqlApi("truncate(cast(f31 as decimal(38, 18)), 2)", "-0.12")
+
+    testAllApis(
+      'f5.cast(DataTypes.FLOAT).truncate(),
+      "f5.cast(FLOAT).truncate()",
+      "truncate(cast(f5 as float))",
+      "4.0")
+
+    testAllApis(
+      42.truncate(-1),
+      "42.truncate(-1)",
+      "truncate(42, -1)",
+      "40")
+
+    testAllApis(
+      42.truncate(-3),
+      "42.truncate(-3)",
+      "truncate(42, -3)",
+      "0")
+
+    //    The validation parameter is null
+    testAllApis(
+      'f33.cast(DataTypes.INT).truncate(1),
+      "f33.cast(INT).truncate(1)",
+      "truncate(cast(null as integer), 1)",
+      "null")
+
+    testAllApis(
+      43.21.truncate('f33.cast(DataTypes.INT)),
+      "43.21.truncate(f33.cast(INT))",
+      "truncate(43.21, cast(null as integer))",
+      "null")
+
+    testAllApis(
+      'f33.cast(DataTypes.DOUBLE).truncate(1),
+      "f33.cast(DOUBLE).truncate(1)",
+      "truncate(cast(null as double), 1)",
+      "null")
+
+    testAllApis(
+      'f33.cast(DataTypes.INT).truncate(1),
+      "f33.cast(INT).truncate(1)",
+      "truncate(cast(null as integer))",
+      "null")
+
+    testAllApis(
+      'f33.cast(DataTypes.DOUBLE).truncate(),
+      "f33.cast(DOUBLE).truncate()",
+      "truncate(cast(null as double))",
+      "null")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Math functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
index 35a60c1..d31c937 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/SqlExpressionTest.scala
@@ -119,6 +119,8 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("ROUND(-12.345, 2)", "-12.35")
     testSqlApi("PI()", "3.141592653589793")
     testSqlApi("E()", "2.718281828459045")
+    testSqlApi("truncate(42.345)", "42")
+    testSqlApi("truncate(cast(42.345 as decimal(5, 3)), 2)", "42.34")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index da4763f..8299a12 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.expressions.validation
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{SqlParserException, ValidationException}
 import org.apache.flink.table.expressions.TimePointUnit
+import org.apache.flink.table.planner.codegen.CodeGenException
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
 import org.apache.calcite.avatica.util.TimeUnit
 import org.junit.{Ignore, Test}
 
@@ -69,6 +69,44 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
     testSqlApi("BIN(f16)", "101010") // Date type
   }
 
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTruncate1(): Unit = {
+    // All arguments are string type
+    testSqlApi(
+      "TRUNCATE('abc', 'def')",
+      "FAIL")
+
+    // The second argument is of type String
+    testSqlApi(
+      "TRUNCATE(f12, f0)",
+      "FAIL")
+
+    // The second argument is of type Float
+    testSqlApi(
+      "TRUNCATE(f12,f12)",
+      "FAIL")
+
+    // The second argument is of type Double
+    testSqlApi(
+      "TRUNCATE(f12, cast(f28 as DOUBLE))",
+      "FAIL")
+
+    // The second argument is of type BigDecimal
+    testSqlApi(
+      "TRUNCATE(f12,f15)",
+      "FAIL")
+  }
+
+  @Test
+  def testInvalidTruncate2(): Unit = {
+    thrown.expect(classOf[CodeGenException])
+    // The one argument is of type String
+    testSqlApi(
+      "TRUNCATE('abc')",
+      "FAIL")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // String functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index f904fd5..9a993a5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1074,4 +1074,35 @@ public class SqlFunctionUtils {
 	public static String uuid(byte[] b){
 		return UUID.nameUUIDFromBytes(b).toString();
 	}
+
+	/** SQL <code>TRUNCATE</code> operator applied to BigDecimal values. */
+	public static Decimal struncate(Decimal b0) {
+		return struncate(b0, 0);
+	}
+
+	public static Decimal struncate(Decimal b0, int b1) {
+		if (b1 >= b0.getScale()) {
+			return b0;
+		}
+
+		BigDecimal b2 = b0.toBigDecimal().movePointRight(b1)
+			.setScale(0, RoundingMode.DOWN).movePointLeft(b1);
+		int p = b0.getPrecision();
+		int s = b0.getScale();
+
+		if (b1 < 0) {
+			return Decimal.fromBigDecimal(b2, Math.min(38, 1 + p - s), 0);
+		} else {
+			return Decimal.fromBigDecimal(b2, 1 + p - s + b1, b1);
+		}
+	}
+
+	/** SQL <code>TRUNCATE</code> operator applied to double values. */
+	public static float struncate(float b0) {
+		return struncate(b0, 0);
+	}
+
+	public static float struncate(float b0, int b1) {
+		return (float) struncate(Decimal.castFrom((double) b0, 38, 18), b1).doubleValue();
+	}
 }


[flink] 01/04: [FLINK-13547][table-planner-blink] Refactor CONCAT() and CONCAT_WS() to keep it compatible with old planner

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e697cf148f2d4ea09b0117d4c22102de98440c4
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Fri Aug 2 17:29:47 2019 +0800

    [FLINK-13547][table-planner-blink] Refactor CONCAT() and CONCAT_WS() to keep it compatible with old planner
    
    CONCAT(string1, string2, ...) should returns NULL if any argument is NULL.
    CONCAT_WS(sep, string1, string2,...) should returns NULL if sep is NULL and automatically skips NULL arguments.
---
 .../planner/expressions/ScalarFunctionsTest.scala  |  6 ++---
 .../flink/table/dataformat/BinaryStringUtil.java   | 28 +++++++++++-----------
 .../flink/table/dataformat/BinaryStringTest.java   | 12 +++++-----
 3 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 0fb58bf..56fa16d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -731,7 +731,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       concat("xx", 'f33),
       "concat('xx', f33)",
       "CONCAT('xx', f33)",
-      "xx")
+      "null")
     testAllApis(
       concat("AA", "BB", "CC", "---"),
       "concat('AA','BB','CC','---')",
@@ -745,7 +745,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
     testSqlApi("concat(f35)", "a")
     testSqlApi("concat(f35,f36)", "ab")
-    testSqlApi("concat(f35,f36,f33)", "ab")
+    testSqlApi("concat(f35,f36,f33)", "null")
   }
 
   @Test
@@ -754,7 +754,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       concat_ws('f33, "AA"),
       "concat_ws(f33, 'AA')",
       "CONCAT_WS(f33, 'AA')",
-      "AA")
+      "null")
     testAllApis(
       concat_ws("~~~~", "AA"),
       "concat_ws('~~~~','AA')",
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
index ecc6152..6faa51a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
@@ -717,22 +717,22 @@ public class BinaryStringUtil {
 
 	/**
 	 * Concatenates input strings together into a single string.
+	 * Returns NULL if any argument is NULL.
 	 */
 	public static BinaryString concat(BinaryString... inputs) {
 		return concat(Arrays.asList(inputs));
 	}
 
-	/**
-	 * Concatenates input strings together into a single string.
-	 */
 	public static BinaryString concat(Iterable<BinaryString> inputs) {
 		// Compute the total length of the result.
 		int totalLength = 0;
 		for (BinaryString input : inputs) {
-			if (input != null) {
-				input.ensureMaterialized();
-				totalLength += input.getSizeInBytes();
+			if (input == null) {
+				return null;
 			}
+
+			input.ensureMaterialized();
+			totalLength += input.getSizeInBytes();
 		}
 
 		// Allocate a new byte array, and copy the inputs one by one into it.
@@ -749,21 +749,21 @@ public class BinaryStringUtil {
 	}
 
 	/**
-	 * Concatenates input strings together into a single string using the separator.
-	 * A null input is skipped. For example, concat(",", "a", null, "c") would yield "a,c".
+	 * <p>Concatenates input strings together into a single string using the separator.
+	 * Returns NULL If the separator is NULL.</p>
+	 *
+	 * <p>Note: CONCAT_WS() does not skip any empty strings, however it does skip any NULL values after
+	 * the separator. For example, concat_ws(",", "a", null, "c") would yield "a,c".</p>
 	 */
 	public static BinaryString concatWs(BinaryString separator, BinaryString... inputs) {
 		return concatWs(separator, Arrays.asList(inputs));
 	}
 
-	/**
-	 * Concatenates input strings together into a single string using the separator.
-	 * A null input is skipped. For example, concat(",", "a", null, "c") would yield "a,c".
-	 */
 	public static BinaryString concatWs(BinaryString separator, Iterable<BinaryString> inputs) {
-		if (null == separator || EMPTY_UTF8.equals(separator)) {
-			return concat(inputs);
+		if (null == separator) {
+			return null;
 		}
+
 		separator.ensureMaterialized();
 
 		int numInputBytes = 0;  // total number of bytes from the inputs
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
index 59c93ec..a3e2c63 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
@@ -247,22 +247,22 @@ public class BinaryStringTest {
 	@Test
 	public void concatTest() {
 		assertEquals(empty, concat());
-		assertEquals(empty, concat((BinaryString) null));
+		assertEquals(null, concat((BinaryString) null));
 		assertEquals(empty, concat(empty));
 		assertEquals(fromString("ab"), concat(fromString("ab")));
 		assertEquals(fromString("ab"), concat(fromString("a"), fromString("b")));
 		assertEquals(fromString("abc"), concat(fromString("a"), fromString("b"), fromString("c")));
-		assertEquals(fromString("ac"), concat(fromString("a"), null, fromString("c")));
-		assertEquals(fromString("a"), concat(fromString("a"), null, null));
-		assertEquals(empty, concat(null, null, null));
+		assertEquals(null, concat(fromString("a"), null, fromString("c")));
+		assertEquals(null, concat(fromString("a"), null, null));
+		assertEquals(null, concat(null, null, null));
 		assertEquals(fromString("数据砖头"), concat(fromString("数据"), fromString("砖头")));
 	}
 
 	@Test
 	public void concatWsTest() {
 		// Returns empty if the separator is null
-		assertEquals(empty, concatWs(null, (BinaryString) null));
-		assertEquals(fromString("a"), concatWs(null, fromString("a")));
+		assertEquals(null, concatWs(null, (BinaryString) null));
+		assertEquals(null, concatWs(null, fromString("a")));
 
 		// If separator is null, concatWs should skip all null inputs and never return null.
 		BinaryString sep = fromString("哈哈");


[flink] 02/04: [FLINK-13547][table-planner-blink] Fix FROM_BASE64() should return STRING type instead of BINARY

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ae0e8a0762732b2a87dbb8ebadd933ebeb4b96a
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Thu Aug 8 12:04:14 2019 +0800

    [FLINK-13547][table-planner-blink] Fix FROM_BASE64() should return STRING type instead of BINARY
    
    This fix the behavior of FROM_BASE64() to align with old planner.
---
 .../apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 8618207..04c867b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -615,7 +615,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction FROM_BASE64 = new SqlFunction(
 		"FROM_BASE64",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.BINARY), SqlTypeTransforms.TO_NULLABLE),
+		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
 		null,
 		OperandTypes.family(SqlTypeFamily.STRING),
 		SqlFunctionCategory.STRING);