You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2017/12/26 06:22:04 UTC

flink git commit: [FLINK-8301][table] Support Unicode in codegen for TableAPI && SQL

Repository: flink
Updated Branches:
  refs/heads/master 15a0dc4ae -> edf10c714


[FLINK-8301][table] Support Unicode in codegen for TableAPI && SQL

This closes #5203


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edf10c71
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edf10c71
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edf10c71

Branch: refs/heads/master
Commit: edf10c714fb1fe7a58e96a4c7006d0df48954b79
Parents: 15a0dc4
Author: Xpray <le...@gmail.com>
Authored: Mon Dec 25 10:51:02 2017 +0800
Committer: 金竹 <ji...@alibaba-inc.com>
Committed: Tue Dec 26 07:50:25 2017 +0800

----------------------------------------------------------------------
 .../flink/table/codegen/CodeGenerator.scala     |  4 +-
 .../flink/table/codegen/ExpressionReducer.scala |  7 ++++
 .../flink/table/expressions/literals.scala      |  8 +++-
 .../utils/userDefinedScalarFunctions.scala      | 13 ++++++
 .../table/runtime/batch/sql/CalcITCase.scala    | 37 +++++++++++++++++
 .../table/runtime/batch/table/CalcITCase.scala  | 31 ++++++++++++--
 .../table/runtime/stream/sql/SqlITCase.scala    | 43 ++++++++++++++++++--
 .../table/runtime/stream/table/CalcITCase.scala | 30 +++++++++++++-
 8 files changed, 163 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e94fe38..40ea5b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -692,7 +692,9 @@ abstract class CodeGenerator(
         generateNonNullLiteral(resultType, decimalField)
 
       case VARCHAR | CHAR =>
-        val escapedValue = StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
+        val escapedValue = StringEscapeUtils.escapeJava(
+          StringEscapeUtils.unescapeJava(value.toString)
+        )
         generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
 
       case SYMBOL =>

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 9696ced..a9dbf19 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -23,6 +23,7 @@ import java.util
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -119,6 +120,12 @@ class ExpressionReducer(config: TableConfig)
              SqlTypeName.MAP |
              SqlTypeName.MULTISET =>
           reducedValues.add(unreduced)
+        // after expression reduce, the literal string has to be escaped
+        case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
+          val escapeVarchar = StringEscapeUtils
+            .escapeJava(reduced.getField(reducedIdx).asInstanceOf[String])
+          reducedValues.add(rexBuilder.makeLiteral(escapeVarchar, unreduced.getType, true))
+          reducedIdx += 1
         case _ =>
           val reducedValue = reduced.getField(reducedIdx)
           // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index e6905ef..863dfc1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -27,10 +27,11 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-
 import java.sql.{Date, Time, Timestamp}
 import java.util.{Calendar, TimeZone}
 
+import org.apache.commons.lang3.StringEscapeUtils
+
 object Literal {
   private[flink] val UTC = TimeZone.getTimeZone("UTC")
 
@@ -103,6 +104,11 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
           SqlParserPos.ZERO)
         relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
 
+      case BasicTypeInfo.STRING_TYPE_INFO =>
+        relBuilder.getRexBuilder.makeLiteral(
+          StringEscapeUtils.escapeJava(value.asInstanceOf[String])
+        )
+
       case _ => relBuilder.literal(value)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 3f6ebbd..32e5d71 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.utils
 
 import java.sql.{Date, Time, Timestamp}
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
@@ -288,3 +289,15 @@ object Func19 extends ScalarFunction {
     Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
 
 }
+
+class SplitUDF(deterministic: Boolean) extends ScalarFunction {
+  def eval(x: String, sep: String, index: Int): String = {
+    val splits = StringUtils.splitByWholeSeparator(x, sep)
+    if (splits.length > index) {
+      splits(index)
+    } else {
+      null
+    }
+  }
+  override def isDeterministic: Boolean = deterministic
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 3529b5f..6aed9a8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.expressions.utils.SplitUDF
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.runtime.batch.table.OldHashCode
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
@@ -352,6 +353,42 @@ class CalcITCase(
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testUdfWithUnicodeParameter(): Unit = {
+    val data = List(
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("x\u0001y", "y\"z", "z\\\"\u0004z")
+    )
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val splitUDF0 = new SplitUDF(deterministic = true)
+    val splitUDF1 = new SplitUDF(deterministic = false)
+
+    tEnv.registerFunction("splitUDF0", splitUDF0)
+    tEnv.registerFunction("splitUDF1", splitUDF1)
+
+    // user have to specify '\' with '\\' in SQL
+    val sqlQuery = "SELECT " +
+      "splitUDF0(a, '\u0001', 0) as a0, " +
+      "splitUDF1(a, '\u0001', 0) as a1, " +
+      "splitUDF0(b, '\"', 1) as b0, " +
+      "splitUDF1(b, '\"', 1) as b1, " +
+      "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
+      "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+    val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object MyHashCode extends ScalarFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 5ef7e31..74f0560 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -27,11 +27,10 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.Types._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, RichFunc3}
+import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase, UserDefinedFunctionTestUtils}
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
 import org.apache.flink.types.Row
@@ -541,6 +540,32 @@ class CalcITCase(
       "default-nosharp,Sunny-nosharp,kevin2-nosharp"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testUDFWithUnicodeParameter(): Unit = {
+    val data = List(
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("x\u0001y", "y\"z", "z\\\"\u0004z")
+    )
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val splitUDF0 = new SplitUDF(deterministic = true)
+    val splitUDF1 = new SplitUDF(deterministic = false)
+    val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+             .select(splitUDF0('a, "\u0001", 0) as 'a0,
+                     splitUDF1('a, "\u0001", 0) as 'a1,
+                     splitUDF0('b, "\"", 1) as 'b0,
+                     splitUDF1('b, "\"", 1) as 'b1,
+                     splitUDF0('c, "\\\"\u0004", 0) as 'c0,
+                     splitUDF1('c, "\\\"\u0004", 0) as 'c1
+             )
+    val results = ds.collect()
+    val expected = List(
+      "a,a,d,d,e,e", "x,x,z,z,z,z"
+    ).mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }
 
 object CalcITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 3acdd58..18b45a3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
-import java.util
-
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
@@ -27,12 +25,11 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.SplitUDF
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.apache.flink.table.utils.MemoryTableSinkUtil
-
-import scala.collection.JavaConverters._
 import org.junit.Assert._
 import org.junit._
 
@@ -481,4 +478,42 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUdfWithUnicodeParameter(): Unit = {
+    val data = List(
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("x\u0001y", "y\"z", "z\\\"\u0004z")
+    )
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val splitUDF0 = new SplitUDF(deterministic = true)
+    val splitUDF1 = new SplitUDF(deterministic = false)
+
+    tEnv.registerFunction("splitUDF0", splitUDF0)
+    tEnv.registerFunction("splitUDF1", splitUDF1)
+
+    // user have to specify '\' with '\\' in SQL
+    val sqlQuery = "SELECT " +
+      "splitUDF0(a, '\u0001', 0) as a0, " +
+      "splitUDF1(a, '\u0001', 0) as a1, " +
+      "splitUDF0(b, '\"', 1) as b0, " +
+      "splitUDF1(b, '\"', 1) as b1, " +
+      "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
+      "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+
+    tEnv.registerTable("T1", t1)
+
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/edf10c71/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 03dd6db..46788f5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2}
+import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, SplitUDF}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils
 import org.apache.flink.types.Row
@@ -352,4 +352,32 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
       "{9=Comment#3}")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUDFWithUnicodeParameter(): Unit = {
+    val data = List(
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("x\u0001y", "y\"z", "z\\\"\u0004z")
+    )
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val splitUDF0 = new SplitUDF(deterministic = true)
+    val splitUDF1 = new SplitUDF(deterministic = false)
+    val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+      .select(splitUDF0('a, "\u0001", 0) as 'a0,
+              splitUDF1('a, "\u0001", 0) as 'a1,
+              splitUDF0('b, "\"", 1) as 'b0,
+              splitUDF1('b, "\"", 1) as 'b1,
+              splitUDF0('c, "\\\"\u0004", 0) as 'c0,
+              splitUDF1('c, "\\\"\u0004", 0) as 'c1
+      )
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = mutable.MutableList(
+      "a,a,d,d,e,e", "x,x,z,z,z,z"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }