You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/07/05 15:22:39 UTC
flink git commit: [FLINK-6925] [table] Add CONCAT/CONCAT_WS supported
in SQL
Repository: flink
Updated Branches:
refs/heads/master 9c4676414 -> 57c675ea5
[FLINK-6925] [table] Add CONCAT/CONCAT_WS supported in SQL
This closes #4138
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57c675ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57c675ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57c675ea
Branch: refs/heads/master
Commit: 57c675ea58ab2a0003da89385a9f092f943da3ae
Parents: 9c46764
Author: sunjincheng121 <su...@gmail.com>
Authored: Fri Jun 16 17:59:44 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Wed Jul 5 20:56:18 2017 +0800
----------------------------------------------------------------------
docs/dev/table/sql.md | 22 +++++
.../flink/table/codegen/CodeGenerator.scala | 9 ++-
.../table/codegen/calls/BuiltInMethods.scala | 10 ++-
.../table/codegen/calls/CallGenerator.scala | 1 +
.../table/codegen/calls/ScalarOperators.scala | 12 +++
.../functions/sql/ScalarSqlFunctions.scala | 19 ++++-
.../table/functions/utils/MathFunctions.scala | 29 -------
.../runtime/functions/ScalarFunctions.scala | 85 ++++++++++++++++++++
.../flink/table/validate/FunctionCatalog.scala | 2 +
.../table/expressions/ScalarFunctionsTest.scala | 22 ++++-
10 files changed, 176 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index be586f1..11b7b0e 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1575,6 +1575,28 @@ INITCAP(string)
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight text %}
+CONCAT(string1, string2,...)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the string that results from concatenating the arguments. Returns NULL if any argument is NULL. E.g. <code>CONCAT("AA", "BB", "CC")</code> returns <code>AABBCC</code>.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
+CONCAT_WS(separator, string1, string2,...)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the string that results from concatenating the arguments using a separator. The separator is added between the strings to be concatenated. Returns NULL If the separator is NULL. CONCAT_WS() does not skip empty strings. However, it does skip any NULL argument. E.g. <code>CONCAT_WS("~", "AA", "BB", "", "CC")</code> returns <code>AA~BB~~CC</code></p>
+ </td>
+ </tr>
+
</tbody>
</table>
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e7dc033..045fbdd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -40,8 +40,9 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.codegen.Indenter.toISC
-import org.apache.flink.table.codegen.calls.FunctionGenerator
+import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator}
import org.apache.flink.table.codegen.calls.ScalarOperators._
+import org.apache.flink.table.functions.sql.ScalarSqlFunctions
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
@@ -1560,6 +1561,12 @@ class CodeGenerator(
requireArray(array)
generateArrayElement(this, array)
+ case ScalarSqlFunctions.CONCAT =>
+ generateConcat(BuiltInMethods.CONCAT, operands)
+
+ case ScalarSqlFunctions.CONCAT_WS =>
+ generateConcat(BuiltInMethods.CONCAT_WS, operands)
+
// advanced scalar functions
case sqlOperator: SqlOperator =>
val callGen = FunctionGenerator.getCallGenerator(
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 40b1a59..b7da141 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -22,7 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
import org.apache.calcite.linq4j.tree.Types
import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.table.functions.utils.MathFunctions
+import org.apache.flink.table.runtime.functions.ScalarFunctions
object BuiltInMethods {
val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
@@ -31,7 +31,7 @@ object BuiltInMethods {
val POWER = Types.lookupMethod(classOf[Math], "pow", classOf[Double], classOf[Double])
val POWER_DEC = Types.lookupMethod(
- classOf[MathFunctions], "power", classOf[Double], classOf[JBigDecimal])
+ classOf[ScalarFunctions], "power", classOf[Double], classOf[JBigDecimal])
val POWER_DEC_DEC = Types.lookupMethod(
classOf[SqlFunctions], "power", classOf[JBigDecimal], classOf[JBigDecimal])
@@ -84,4 +84,10 @@ object BuiltInMethods {
val ROUND_LONG = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[Long], classOf[Int])
val ROUND_DEC = Types.lookupMethod(classOf[SqlFunctions], "sround", classOf[JBigDecimal],
classOf[Int])
+
+ val CONCAT = Types.lookupMethod(classOf[ScalarFunctions], "concat", classOf[Array[String]])
+ val CONCAT_WS =
+ Types.lookupMethod(
+ classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]])
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
index 1bc9fbb..f02741a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
@@ -64,6 +64,7 @@ object CallGenerator {
|""".stripMargin
} else{
s"""
+ |boolean $nullTerm = false;
|${operands.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = ${call(operands.map(_.resultTerm))};
|""".stripMargin
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index f34b0d0..af92df4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -17,6 +17,8 @@
*/
package org.apache.flink.table.codegen.calls
+import java.lang.reflect.Method
+
import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
import org.apache.calcite.util.BuiltInMethod
@@ -24,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils._
@@ -934,6 +937,15 @@ object ScalarOperators {
}
}
+ def generateConcat(
+ method: Method,
+ operands: Seq[GeneratedExpression]): GeneratedExpression = {
+
+ generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) {
+ (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})"
+ }
+ }
+
def generateMapGet(
codeGenerator: CodeGenerator,
map: GeneratedExpression,
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 84f2d21..d27efbe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.functions.sql
import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
-import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily}
/**
* All build-in scalar sql functions.
*/
object ScalarSqlFunctions {
+
val E = new SqlFunction(
"E",
SqlKind.OTHER_FUNCTION,
@@ -31,4 +32,20 @@ object ScalarSqlFunctions {
null,
OperandTypes.NILADIC,
SqlFunctionCategory.NUMERIC)
+
+ val CONCAT = new SqlFunction(
+ "CONCAT",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.VARCHAR_2000,
+ null,
+ OperandTypes.ONE_OR_MORE,
+ SqlFunctionCategory.STRING)
+
+ val CONCAT_WS = new SqlFunction(
+ "CONCAT_WS",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.VARCHAR_2000,
+ null,
+ OperandTypes.ONE_OR_MORE,
+ SqlFunctionCategory.STRING)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
deleted file mode 100644
index 64e4bc4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.functions.utils
-
-import java.math.{BigDecimal => JBigDecimal}
-
-class MathFunctions {}
-
-object MathFunctions {
- def power(a: Double, b: JBigDecimal): Double = {
- Math.pow(a, b.doubleValue())
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
new file mode 100644
index 0000000..865421f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.functions
+
+import scala.annotation.varargs
+import java.math.{BigDecimal => JBigDecimal}
+import java.lang.StringBuilder
+
+/**
+ * Built-in scalar runtime functions.
+ */
+class ScalarFunctions {}
+
+object ScalarFunctions {
+
+ def power(a: Double, b: JBigDecimal): Double = {
+ Math.pow(a, b.doubleValue())
+ }
+
+ /**
+ * Returns the string that results from concatenating the arguments.
+ * Returns NULL if any argument is NULL.
+ */
+ @varargs
+ def concat(args: String*): String = {
+ val sb = new StringBuilder
+ var i = 0
+ while (i < args.length) {
+ if (args(i) == null) {
+ return null
+ }
+ sb.append(args(i))
+ i += 1
+ }
+ sb.toString
+ }
+
+ /**
+ * Returns the string that results from concatenating the arguments and separator.
+ * Returns NULL If the separator is NULL.
+ *
+ * Note: CONCAT_WS() does not skip empty strings. However, it does skip any NULL values after
+ * the separator argument.
+ *
+ **/
+ @varargs
+ def concat_ws(separator: String, args: String*): String = {
+ if (null == separator) {
+ return null
+ }
+
+ val sb = new StringBuilder
+
+ var i = 0
+
+ var hasValueAppended = false
+
+ while (i < args.length) {
+ if (null != args(i)) {
+ if (hasValueAppended) {
+ sb.append(separator)
+ }
+ sb.append(args(i))
+ hasValueAppended = true
+ }
+ i = i + 1
+ }
+ sb.toString
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 6d3006d..df77441 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -394,6 +394,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.RAND,
SqlStdOperatorTable.RAND_INTEGER,
ScalarSqlFunctions.E,
+ ScalarSqlFunctions.CONCAT,
+ ScalarSqlFunctions.CONCAT_WS,
// EXTENSIONS
SqlStdOperatorTable.TUMBLE,
http://git-wip-us.apache.org/repos/asf/flink/blob/57c675ea/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index d4c2a45..be28134 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -325,6 +325,22 @@ class ScalarFunctionsTest extends ExpressionTestBase {
"false")
}
+ @Test
+ def testMultiConcat(): Unit = {
+ testSqlApi("CONCAT('xx', f33)","null")
+ testSqlApi("CONCAT('AA','BB','CC','---')","AABBCC---")
+ testSqlApi("CONCAT('x~x','b~b','c~~~~c','---')","x~xb~bc~~~~c---")
+ }
+
+ @Test
+ def testConcatWs(): Unit = {
+ testSqlApi("CONCAT_WS(f33, 'AA')", "null")
+ testSqlApi("concat_ws('~~~~','AA')", "AA")
+ testSqlApi("concat_ws('~','AA','BB')", "AA~BB")
+ testSqlApi("concat_ws('~',f33, 'AA','BB','',f33, 'CC')", "AA~BB~~CC")
+ testSqlApi("CONCAT_WS('~~~~','Flink', f33, 'xx', f33, f33)", "Flink~~~~xx")
+ }
+
// ----------------------------------------------------------------------------------------------
// Math functions
// ----------------------------------------------------------------------------------------------
@@ -1569,7 +1585,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
def testData = {
- val testData = new Row(33)
+ val testData = new Row(34)
testData.setField(0, "This is a test String.")
testData.setField(1, true)
testData.setField(2, 42.toByte)
@@ -1603,6 +1619,7 @@ class ScalarFunctionsTest extends ExpressionTestBase {
testData.setField(30, 1)
testData.setField(31, BigDecimal("-0.1231231321321321111").bigDecimal)
testData.setField(32, -1)
+ testData.setField(33, null)
testData
}
@@ -1640,7 +1657,8 @@ class ScalarFunctionsTest extends ExpressionTestBase {
Types.DOUBLE,
Types.INT,
Types.DECIMAL,
- Types.INT).asInstanceOf[TypeInformation[Any]]
+ Types.INT,
+ Types.STRING).asInstanceOf[TypeInformation[Any]]
}
}