You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/07/05 15:22:39 UTC

flink git commit: [FLINK-6925] [table] Add CONCAT/CONCAT_WS supported in SQL

Repository: flink
Updated Branches:
  refs/heads/master 9c4676414 -> 57c675ea5


[FLINK-6925] [table] Add CONCAT/CONCAT_WS supported in SQL

This closes #4138


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57c675ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57c675ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57c675ea

Branch: refs/heads/master
Commit: 57c675ea58ab2a0003da89385a9f092f943da3ae
Parents: 9c46764
Author: sunjincheng121 <su...@gmail.com>
Authored: Fri Jun 16 17:59:44 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Wed Jul 5 20:56:18 2017 +0800

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 22 +++++
 .../flink/table/codegen/CodeGenerator.scala     |  9 ++-
 .../table/codegen/calls/BuiltInMethods.scala    | 10 ++-
 .../table/codegen/calls/CallGenerator.scala     |  1 +
 .../table/codegen/calls/ScalarOperators.scala   | 12 +++
 .../functions/sql/ScalarSqlFunctions.scala      | 19 ++++-
 .../table/functions/utils/MathFunctions.scala   | 29 -------
 .../runtime/functions/ScalarFunctions.scala     | 85 ++++++++++++++++++++
 .../flink/table/validate/FunctionCatalog.scala  |  2 +
 .../table/expressions/ScalarFunctionsTest.scala | 22 ++++-
 10 files changed, 176 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index be586f1..11b7b0e 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1575,6 +1575,28 @@ INITCAP(string)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight text %}
+CONCAT(string1, string2,...)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the string that results from concatenating the arguments. Returns NULL if any argument is NULL. E.g. <code>CONCAT("AA", "BB", "CC")</code> returns <code>AABBCC</code>.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+CONCAT_WS(separator, string1, string2,...)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the string that results from concatenating the arguments using a separator. The separator is added between the strings to be concatenated. Returns NULL If the separator is NULL. CONCAT_WS() does not skip empty strings. However, it does skip any NULL argument. E.g. <code>CONCAT_WS("~", "AA", "BB", "", "CC")</code> returns <code>AA~BB~~CC</code></p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e7dc033..045fbdd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -40,8 +40,9 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.Indenter.toISC
-import org.apache.flink.table.codegen.calls.FunctionGenerator
+import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator}
 import org.apache.flink.table.codegen.calls.ScalarOperators._
+import org.apache.flink.table.functions.sql.ScalarSqlFunctions
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
 import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
@@ -1560,6 +1561,12 @@ class CodeGenerator(
         requireArray(array)
         generateArrayElement(this, array)
 
+      case ScalarSqlFunctions.CONCAT =>
+        generateConcat(BuiltInMethods.CONCAT, operands)
+
+      case ScalarSqlFunctions.CONCAT_WS =>
+        generateConcat(BuiltInMethods.CONCAT_WS, operands)
+
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
         val callGen = FunctionGenerator.getCallGenerator(

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 40b1a59..b7da141 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -22,7 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.linq4j.tree.Types
 import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.table.functions.utils.MathFunctions
+import org.apache.flink.table.runtime.functions.ScalarFunctions
 
 object BuiltInMethods {
   val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
@@ -31,7 +31,7 @@ object BuiltInMethods {
 
   val POWER = Types.lookupMethod(classOf[Math], "pow", classOf[Double], classOf[Double])
   val POWER_DEC = Types.lookupMethod(
-    classOf[MathFunctions], "power", classOf[Double], classOf[JBigDecimal])
+    classOf[ScalarFunctions], "power", classOf[Double], classOf[JBigDecimal])
   val POWER_DEC_DEC = Types.lookupMethod(
     classOf[SqlFunctions], "power", classOf[JBigDecimal], classOf[JBigDecimal])
 
@@ -84,4 +84,10 @@ object BuiltInMethods {
   val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[Long], classOf[Int])
   val ROUND_DEC = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[JBigDecimal],
     classOf[Int])
+
+  val CONCAT = Types.lookupMethod(classOf[ScalarFunctions], "concat", classOf[Array[String]])
+  val CONCAT_WS =
+    Types.lookupMethod(
+      classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]])
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
index 1bc9fbb..f02741a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
@@ -64,6 +64,7 @@ object CallGenerator {
         |""".stripMargin
     } else{
       s"""
+        |boolean $nullTerm = false;
         |${operands.map(_.code).mkString("\n")}
         |$resultTypeTerm $resultTerm = ${call(operands.map(_.resultTerm))};
         |""".stripMargin

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index f34b0d0..af92df4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.table.codegen.calls
 
+import java.lang.reflect.Method
+
 import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
 import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
@@ -24,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
@@ -934,6 +937,15 @@ object ScalarOperators {
     }
   }
 
+  def generateConcat(
+      method: Method,
+      operands: Seq[GeneratedExpression]): GeneratedExpression = {
+
+    generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) {
+      (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})"
+    }
+  }
+
   def generateMapGet(
       codeGenerator: CodeGenerator,
       map: GeneratedExpression,

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 84f2d21..d27efbe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -18,12 +18,13 @@
 package org.apache.flink.table.functions.sql
 
 import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
-import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily}
 
 /**
   * All build-in scalar sql functions.
   */
 object ScalarSqlFunctions {
+
   val E = new SqlFunction(
     "E",
     SqlKind.OTHER_FUNCTION,
@@ -31,4 +32,20 @@ object ScalarSqlFunctions {
     null,
     OperandTypes.NILADIC,
     SqlFunctionCategory.NUMERIC)
+
+  val CONCAT = new SqlFunction(
+    "CONCAT",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.VARCHAR_2000,
+    null,
+    OperandTypes.ONE_OR_MORE,
+    SqlFunctionCategory.STRING)
+
+  val CONCAT_WS = new SqlFunction(
+    "CONCAT_WS",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.VARCHAR_2000,
+    null,
+    OperandTypes.ONE_OR_MORE,
+    SqlFunctionCategory.STRING)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
deleted file mode 100644
index 64e4bc4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.utils
-
-import java.math.{BigDecimal => JBigDecimal}
-
-class MathFunctions {}
-
-object MathFunctions {
-  def power(a: Double, b: JBigDecimal): Double = {
-    Math.pow(a, b.doubleValue())
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
new file mode 100644
index 0000000..865421f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.StringBuilder
+
+/**
+  * Built-in scalar runtime functions.
+  */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+  def power(a: Double, b: JBigDecimal): Double = {
+    Math.pow(a, b.doubleValue())
+  }
+
+  /**
+    * Returns the string that results from concatenating the arguments.
+    * Returns NULL if any argument is NULL.
+    */
+  @varargs
+  def concat(args: String*): String = {
+    val sb = new StringBuilder
+    var i = 0
+    while (i < args.length) {
+      if (args(i) == null) {
+        return null
+      }
+      sb.append(args(i))
+      i += 1
+    }
+    sb.toString
+  }
+
+  /**
+    * Returns the string that results from concatenating the arguments and separator.
+    * Returns NULL If the separator is NULL.
+    *
+    * Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after
+    * the separator argument.
+    *
+    **/
+  @varargs
+  def concat_ws(separator: String, args: String*): String = {
+    if (null == separator) {
+      return null
+    }
+
+    val sb = new StringBuilder
+
+    var i = 0
+
+    var hasValueAppended = false
+
+    while (i < args.length) {
+      if (null != args(i)) {
+        if (hasValueAppended) {
+          sb.append(separator)
+        }
+        sb.append(args(i))
+        hasValueAppended = true
+      }
+      i = i + 1
+    }
+    sb.toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 6d3006d..df77441 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -394,6 +394,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.RAND,
     SqlStdOperatorTable.RAND_INTEGER,
     ScalarSqlFunctions.E,
+    ScalarSqlFunctions.CONCAT,
+    ScalarSqlFunctions.CONCAT_WS,
 
     // EXTENSIONS
     SqlStdOperatorTable.TUMBLE,

http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index d4c2a45..be28134 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -325,6 +325,22 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "false")
   }
 
+  @Test
+  def testMultiConcat(): Unit = {
+   testSqlApi("CONCAT('xx', f33)","null")
+   testSqlApi("CONCAT('AA','BB','CC','---')","AABBCC---")
+   testSqlApi("CONCAT('x~x','b~b','c~~~~c','---')","x~xb~bc~~~~c---")
+  }
+
+  @Test
+  def testConcatWs(): Unit = {
+    testSqlApi("CONCAT_WS(f33, 'AA')", "null")
+    testSqlApi("concat_ws('~~~~','AA')", "AA")
+    testSqlApi("concat_ws('~','AA','BB')", "AA~BB")
+    testSqlApi("concat_ws('~',f33, 'AA','BB','',f33, 'CC')", "AA~BB~~CC")
+    testSqlApi("CONCAT_WS('~~~~','Flink', f33, 'xx', f33, f33)", "Flink~~~~xx")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Math functions
   // ----------------------------------------------------------------------------------------------
@@ -1569,7 +1585,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   def testData = {
-    val testData = new Row(33)
+    val testData = new Row(34)
     testData.setField(0, "This is a test String.")
     testData.setField(1, true)
     testData.setField(2, 42.toByte)
@@ -1603,6 +1619,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
     testData.setField(30, 1)
     testData.setField(31, BigDecimal("-0.1231231321321321111").bigDecimal)
     testData.setField(32, -1)
+    testData.setField(33, null)
     testData
   }
 
@@ -1640,7 +1657,8 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       Types.DOUBLE,
       Types.INT,
       Types.DECIMAL,
-      Types.INT).asInstanceOf[TypeInformation[Any]]
+      Types.INT,
+      Types.STRING).asInstanceOf[TypeInformation[Any]]
 
   }
 }