You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2017/12/26 06:22:04 UTC
flink git commit: [FLINK-8301][table] Support Unicode in codegen for
TableAPI && SQL
Repository: flink
Updated Branches:
refs/heads/master 15a0dc4ae -> edf10c714
[FLINK-8301][table] Support Unicode in codegen for TableAPI && SQL
This closes #5203
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edf10c71
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edf10c71
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edf10c71
Branch: refs/heads/master
Commit: edf10c714fb1fe7a58e96a4c7006d0df48954b79
Parents: 15a0dc4
Author: Xpray <le...@gmail.com>
Authored: Mon Dec 25 10:51:02 2017 +0800
Committer: 金竹 <ji...@alibaba-inc.com>
Committed: Tue Dec 26 07:50:25 2017 +0800
----------------------------------------------------------------------
.../flink/table/codegen/CodeGenerator.scala | 4 +-
.../flink/table/codegen/ExpressionReducer.scala | 7 ++++
.../flink/table/expressions/literals.scala | 8 +++-
.../utils/userDefinedScalarFunctions.scala | 13 ++++++
.../table/runtime/batch/sql/CalcITCase.scala | 37 +++++++++++++++++
.../table/runtime/batch/table/CalcITCase.scala | 31 ++++++++++++--
.../table/runtime/stream/sql/SqlITCase.scala | 43 ++++++++++++++++++--
.../table/runtime/stream/table/CalcITCase.scala | 30 +++++++++++++-
8 files changed, 163 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e94fe38..40ea5b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -692,7 +692,9 @@ abstract class CodeGenerator(
generateNonNullLiteral(resultType, decimalField)
case VARCHAR | CHAR =>
- val escapedValue = StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
+ val escapedValue = StringEscapeUtils.escapeJava(
+ StringEscapeUtils.unescapeJava(value.toString)
+ )
generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
case SYMBOL =>
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 9696ced..a9dbf19 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -23,6 +23,7 @@ import java.util
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.commons.lang3.StringEscapeUtils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -119,6 +120,12 @@ class ExpressionReducer(config: TableConfig)
SqlTypeName.MAP |
SqlTypeName.MULTISET =>
reducedValues.add(unreduced)
+ // after expression reduce, the literal string has to be escaped
+ case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
+ val escapeVarchar = StringEscapeUtils
+ .escapeJava(reduced.getField(reducedIdx).asInstanceOf[String])
+ reducedValues.add(rexBuilder.makeLiteral(escapeVarchar, unreduced.getType, true))
+ reducedIdx += 1
case _ =>
val reducedValue = reduced.getField(reducedIdx)
// RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index e6905ef..863dfc1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -27,10 +27,11 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-
import java.sql.{Date, Time, Timestamp}
import java.util.{Calendar, TimeZone}
+import org.apache.commons.lang3.StringEscapeUtils
+
object Literal {
private[flink] val UTC = TimeZone.getTimeZone("UTC")
@@ -103,6 +104,11 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
SqlParserPos.ZERO)
relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+ case BasicTypeInfo.STRING_TYPE_INFO =>
+ relBuilder.getRexBuilder.makeLiteral(
+ StringEscapeUtils.escapeJava(value.asInstanceOf[String])
+ )
+
case _ => relBuilder.literal(value)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 3f6ebbd..32e5d71 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.utils
import java.sql.{Date, Time, Timestamp}
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
@@ -288,3 +289,15 @@ object Func19 extends ScalarFunction {
Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
}
+
+class SplitUDF(deterministic: Boolean) extends ScalarFunction {
+ def eval(x: String, sep: String, index: Int): String = {
+ val splits = StringUtils.splitByWholeSeparator(x, sep)
+ if (splits.length > index) {
+ splits(index)
+ } else {
+ null
+ }
+ }
+ override def isDeterministic: Boolean = deterministic
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 3529b5f..6aed9a8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.expressions.utils.SplitUDF
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.runtime.batch.table.OldHashCode
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
@@ -352,6 +353,42 @@ class CalcITCase(
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test
+ def testUdfWithUnicodeParameter(): Unit = {
+ val data = List(
+ ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+ ("x\u0001y", "y\"z", "z\\\"\u0004z")
+ )
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val splitUDF0 = new SplitUDF(deterministic = true)
+ val splitUDF1 = new SplitUDF(deterministic = false)
+
+ tEnv.registerFunction("splitUDF0", splitUDF0)
+ tEnv.registerFunction("splitUDF1", splitUDF1)
+
+ // user have to specify '\' with '\\' in SQL
+ val sqlQuery = "SELECT " +
+ "splitUDF0(a, '\u0001', 0) as a0, " +
+ "splitUDF1(a, '\u0001', 0) as a1, " +
+ "splitUDF0(b, '\"', 1) as b0, " +
+ "splitUDF1(b, '\"', 1) as b1, " +
+ "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
+ "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+ val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
}
object MyHashCode extends ScalarFunction {
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 5ef7e31..74f0560 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -27,11 +27,10 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.Types._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, RichFunc3}
+import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils}
import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
import org.apache.flink.types.Row
@@ -541,6 +540,32 @@ class CalcITCase(
"default-nosharp,Sunny-nosharp,kevin2-nosharp"
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test
+ def testUDFWithUnicodeParameter(): Unit = {
+ val data = List(
+ ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+ ("x\u0001y", "y\"z", "z\\\"\u0004z")
+ )
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val splitUDF0 = new SplitUDF(deterministic = true)
+ val splitUDF1 = new SplitUDF(deterministic = false)
+ val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(splitUDF0('a, "\u0001", 0) as 'a0,
+ splitUDF1('a, "\u0001", 0) as 'a1,
+ splitUDF0('b, "\"", 1) as 'b0,
+ splitUDF1('b, "\"", 1) as 'b1,
+ splitUDF0('c, "\\\"\u0004", 0) as 'c0,
+ splitUDF1('c, "\\\"\u0004", 0) as 'c1
+ )
+ val results = ds.collect()
+ val expected = List(
+ "a,a,d,d,e,e", "x,x,z,z,z,z"
+ ).mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
}
object CalcITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 3acdd58..18b45a3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -18,8 +18,6 @@
package org.apache.flink.table.runtime.stream.sql
-import java.util
-
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
@@ -27,12 +25,11 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.SplitUDF
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.apache.flink.table.utils.MemoryTableSinkUtil
-
-import scala.collection.JavaConverters._
import org.junit.Assert._
import org.junit._
@@ -481,4 +478,42 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
}
+ @Test
+ def testUdfWithUnicodeParameter(): Unit = {
+ val data = List(
+ ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+ ("x\u0001y", "y\"z", "z\\\"\u0004z")
+ )
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val splitUDF0 = new SplitUDF(deterministic = true)
+ val splitUDF1 = new SplitUDF(deterministic = false)
+
+ tEnv.registerFunction("splitUDF0", splitUDF0)
+ tEnv.registerFunction("splitUDF1", splitUDF1)
+
+ // user have to specify '\' with '\\' in SQL
+ val sqlQuery = "SELECT " +
+ "splitUDF0(a, '\u0001', 0) as a0, " +
+ "splitUDF1(a, '\u0001', 0) as a1, " +
+ "splitUDF0(b, '\"', 1) as b0, " +
+ "splitUDF1(b, '\"', 1) as b1, " +
+ "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
+ "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 03dd6db..46788f5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2}
+import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, SplitUDF}
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
import org.apache.flink.types.Row
@@ -352,4 +352,32 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
"{9=Comment#3}")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testUDFWithUnicodeParameter(): Unit = {
+ val data = List(
+ ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+ ("x\u0001y", "y\"z", "z\\\"\u0004z")
+ )
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val splitUDF0 = new SplitUDF(deterministic = true)
+ val splitUDF1 = new SplitUDF(deterministic = false)
+ val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(splitUDF0('a, "\u0001", 0) as 'a0,
+ splitUDF1('a, "\u0001", 0) as 'a1,
+ splitUDF0('b, "\"", 1) as 'b0,
+ splitUDF1('b, "\"", 1) as 'b1,
+ splitUDF0('c, "\\\"\u0004", 0) as 'c0,
+ splitUDF1('c, "\\\"\u0004", 0) as 'c1
+ )
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = mutable.MutableList(
+ "a,a,d,d,e,e", "x,x,z,z,z,z"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}