You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/12 14:18:56 UTC

[GitHub] asfgit closed pull request #6671: [FLINK-10281] [table] Fix string literal parsing in Table & SQL API and SQL Client

asfgit closed pull request #6671: [FLINK-10281] [table] Fix string literal parsing in Table & SQL API and SQL Client
URL: https://github.com/apache/flink/pull/6671
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 604e9890708..a9fd94f10d5 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -204,6 +204,11 @@ Flink SQL uses a lexical policy for identifier (table, attribute, function names
 - After which, identifiers are matched case-sensitively.
 - Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. <code>"SELECT a AS `my field` FROM t"</code>).
 
+String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`). Duplicate a single quote for escaping (e.g., `SELECT 'It''s me.'`). Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:
+
+- Use the backslash (`\`) as escaping character (default): `SELECT U&'\263A'`
+- Use a custom escaping character: `SELECT U&'#263A' UESCAPE '#'`
+
 {% top %}
 
 Operations
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 8c4ba83c6dc..8462202572d 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -61,7 +61,7 @@ By default, the SQL Client will read its configuration from the environment file
 Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it:
 
 {% highlight sql %}
-SELECT 'Hello World'
+SELECT 'Hello World';
 {% endhighlight %}
 
 This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key.
@@ -71,19 +71,19 @@ The CLI supports **two modes** for maintaining and visualizing results.
 The **table mode** materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI:
 
 {% highlight text %}
-SET execution.result-mode=table
+SET execution.result-mode=table;
 {% endhighlight %}
 
 The **changelog mode** does not materialize results and visualizes the result stream that is produced by a [continuous query](streaming.html#dynamic-tables--continuous-queries) consisting of insertions (`+`) and retractions (`-`).
 
 {% highlight text %}
-SET execution.result-mode=changelog
+SET execution.result-mode=changelog;
 {% endhighlight %}
 
 You can use the following query to see both result modes in action:
 
 {% highlight sql %}
-SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name 
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
 {% endhighlight %}
 
 This query performs a bounded word count example.
@@ -494,13 +494,13 @@ Similar to table sources and sinks, views defined in a session environment file
 Views can also be created within a CLI session using the `CREATE VIEW` statement:
 
 {% highlight text %}
-CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
 {% endhighlight %}
 
 Views created within a CLI session can also be removed again using the `DROP VIEW` statement:
 
 {% highlight text %}
-DROP VIEW MyNewView
+DROP VIEW MyNewView;
 {% endhighlight %}
 
 <span class="label label-danger">Attention</span> The definition of views is limited to the mentioned syntax above. Defining a schema for views or escape whitespaces in table names will be supported in future versions.
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f8bcd3da1af..1a21a578570 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1650,8 +1650,11 @@ timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
 
 {% endhighlight %}
 
-Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The
-column names and function names follow Java identifier syntax. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.
+Here, `literal` is a valid Java literal. String literals can be specified using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s me.'` or `"I ""like"" dogs."`).
+
+The `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The column names and function names follow Java identifier syntax.
+
+Expressions specified as strings can also use prefix notation instead of suffix notation to call operators and functions.
 
 If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`.
 
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 8030c7c75da..7867532e546 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -32,7 +32,6 @@
 import org.jline.reader.LineReaderBuilder;
 import org.jline.reader.MaskingCallback;
 import org.jline.reader.UserInterruptException;
-import org.jline.reader.impl.DefaultParser;
 import org.jline.terminal.Terminal;
 import org.jline.terminal.TerminalBuilder;
 import org.jline.utils.AttributedString;
@@ -94,13 +93,14 @@ public CliClient(SessionContext context, Executor executor) {
 		}
 
 		// initialize line lineReader
-		final DefaultParser parser = new DefaultParser();
-		parser.setEofOnEscapedNewLine(true); // allows for multi-line commands
 		lineReader = LineReaderBuilder.builder()
 			.terminal(terminal)
 			.appName(CliStrings.CLI_NAME)
-			.parser(parser)
+			.parser(new SqlMultiLineParser())
 			.build();
+		// this option is disabled for now for correct backslash escaping
+		// a "SELECT '\'" query should return a string with a backslash
+		lineReader.option(LineReader.Option.DISABLE_EVENT_EXPANSION, true);
 
 		// create prompt
 		prompt = new AttributedStringBuilder()
@@ -168,16 +168,19 @@ public void open() {
 			terminal.writer().append("\n");
 			terminal.flush();
 
-			String line;
+			final String line;
 			try {
 				line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
-			} catch (UserInterruptException | EndOfFileException | IOError e) {
-				// user cancelled application with Ctrl+C or kill
+			} catch (UserInterruptException e) {
+				// user cancelled line with Ctrl+C
+				continue;
+			} catch (EndOfFileException | IOError e) {
+				// user cancelled application with Ctrl+D or kill
 				break;
 			} catch (Throwable t) {
 				throw new SqlClientException("Could not read from command line.", t);
 			}
-			if (line == null || line.equals("")) {
+			if (line == null) {
 				continue;
 			}
 			final Optional<SqlCommandCall> cmdCall = parseCommand(line);
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index baa78986b11..d68f56b4fce 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -50,15 +50,15 @@ private CliStrings() {
 		.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
 		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
 		.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
-		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>'"))
-		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>'"))
+		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
 		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
-		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
+		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
 		.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
 		.style(AttributedStyle.DEFAULT.underline())
 		.append("\nHint")
 		.style(AttributedStyle.DEFAULT)
-		.append(": Use '\\' for multi-line commands.")
+		.append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
 		.toAttributedString();
 
 	public static final String MESSAGE_WELCOME;
@@ -104,7 +104,7 @@ private CliStrings() {
 			"   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ \n" +
 			"   |_|    |_|_|_| |_|_|\\_\\ |_____/ \\___\\_\\______|  \\_____|_|_|\\___|_| |_|\\__|\n" +
 			"          \n" +
-			"        Welcome! Enter HELP to list all available commands. QUIT to exit.\n\n";
+			"        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.\n\n";
 	}
 
 	public static final String MESSAGE_QUIT = "Exiting " + CliStrings.CLI_NAME + "...";
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
new file mode 100644
index 00000000000..fb44a1e2ced
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.jline.reader.EOFError;
+import org.jline.reader.ParsedLine;
+import org.jline.reader.Parser;
+import org.jline.reader.impl.DefaultParser;
+
+import java.util.Collections;
+
+/**
+ * Multi-line parser for parsing an arbitrary number of SQL lines until a line ends with ';'.
+ */
+public class SqlMultiLineParser implements Parser {
+
+	private static final String EOF_CHARACTER = ";";
+	private static final String NEW_LINE_PROMPT = ""; // results in simple '>' output
+
+	@Override
+	public ParsedLine parse(String line, int cursor, ParseContext context) {
+		if (!line.trim().endsWith(EOF_CHARACTER)) {
+			throw new EOFError(
+				-1,
+				-1,
+				"New line without EOF character.",
+				NEW_LINE_PROMPT);
+		}
+		return new DefaultParser.ArgumentList(
+			line,
+			Collections.singletonList(line),
+			0,
+			0,
+			cursor);
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 195812d1d1c..7d3478cb50c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -34,6 +34,7 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql.validate.SqlConformanceEnum
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 6cabe212213..0fdd8859cf4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -681,9 +681,7 @@ abstract class CodeGenerator(
         generateNonNullLiteral(resultType, decimalField)
 
       case VARCHAR | CHAR =>
-        val escapedValue = StringEscapeUtils.escapeJava(
-          StringEscapeUtils.unescapeJava(value.toString)
-        )
+        val escapedValue = StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
         generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
 
       case SYMBOL =>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index a9dbf19e425..f58e12cfea2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -120,12 +120,7 @@ class ExpressionReducer(config: TableConfig)
              SqlTypeName.MAP |
              SqlTypeName.MULTISET =>
           reducedValues.add(unreduced)
-        // after expression reduce, the literal string has to be escaped
-        case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
-          val escapeVarchar = StringEscapeUtils
-            .escapeJava(reduced.getField(reducedIdx).asInstanceOf[String])
-          reducedValues.add(rexBuilder.makeLiteral(escapeVarchar, unreduced.getType, true))
-          reducedIdx += 1
+
         case _ =>
           val reducedValue = reduced.getField(reducedIdx)
           // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index c0a577dccfd..633afb1020d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -185,13 +185,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
           }
       }
 
-  lazy val singleQuoteStringLiteral: Parser[Expression] =
-    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
-      str => Literal(str.substring(1, str.length - 1))
-    }
+  // string with single quotes such as 'It''s me.'
+  lazy val singleQuoteStringLiteral: Parser[Expression] = "'(?:''|[^'])*'".r ^^ {
+    str =>
+      val escaped = str.substring(1, str.length - 1).replace("''", "'")
+      Literal(escaped)
+  }
 
-  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
-    str => Literal(str.substring(1, str.length - 1))
+  // string with double quotes such as "I ""like"" dogs."
+  lazy val doubleQuoteStringLiteral: PackratParser[Expression] = "\"(?:\"\"|[^\"])*\"".r ^^ {
+    str =>
+      val escaped = str.substring(1, str.length - 1).replace("\"\"", "\"")
+      Literal(escaped)
   }
 
   lazy val boolLiteral: PackratParser[Expression] = (TRUE | FALSE) ^^ {
@@ -203,7 +208,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral
+    numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | boolLiteral
 
   lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
     sym => UnresolvedFieldReference(sym)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 863dfc1aef2..419c12567f1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -104,11 +104,6 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
           SqlParserPos.ZERO)
         relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
 
-      case BasicTypeInfo.STRING_TYPE_INFO =>
-        relBuilder.getRexBuilder.makeLiteral(
-          StringEscapeUtils.escapeJava(value.asInstanceOf[String])
-        )
-
       case _ => relBuilder.literal(value)
     }
   }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala
index 0a60eae0e5f..bf568c7d33d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.{ExpressionTestBase, Func3}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.types.Row
@@ -88,6 +88,91 @@ class LiteralTest extends ExpressionTestBase {
       s"42 and $hello")
   }
 
+  @Test
+  def testStringLiterals(): Unit = {
+
+    // these tests use Java/Scala escaping for non-quoting unicode characters
+
+    testAllApis(
+      ">\n<",
+      "'>\n<'",
+      "'>\n<'",
+      ">\n<")
+
+    testAllApis(
+      ">\u263A<",
+      "'>\u263A<'",
+      "'>\u263A<'",
+      ">\u263A<")
+
+    testAllApis(
+      ">\u263A<",
+      "'>\u263A<'",
+      "'>\u263A<'",
+      ">\u263A<")
+
+    testAllApis(
+      ">\\<",
+      "'>\\<'",
+      "'>\\<'",
+      ">\\<")
+
+    testAllApis(
+      ">'<",
+      "'>''<'",
+      "'>''<'",
+      ">'<")
+
+    testAllApis(
+      " ",
+      "' '",
+      "' '",
+      " ")
+
+    testAllApis(
+      "",
+      "''",
+      "''",
+      "")
+
+    testAllApis(
+      ">foo([\\w]+)<",
+      "'>foo([\\w]+)<'",
+      "'>foo([\\w]+)<'",
+      ">foo([\\w]+)<")
+
+    testAllApis(
+      ">\\'\n<",
+      "\">\\'\n<\"",
+      "'>\\''\n<'",
+      ">\\'\n<")
+
+    testAllApis(
+      "It's me.",
+      "'It''s me.'",
+      "'It''s me.'",
+      "It's me.")
+
+    testTableApi(
+      """I "like" dogs.""",
+      """"I ""like"" dogs."""",
+      """I "like" dogs.""")
+
+    // these test use SQL for describing unicode characters
+
+    testSqlApi(
+      "U&'>\\263A<'", // default escape backslash
+      ">\u263A<")
+
+    testSqlApi(
+      "U&'>#263A<' UESCAPE '#'", // custom escape '#'
+      ">\u263A<")
+
+    testSqlApi(
+      """'>\\<'""",
+      ">\\\\<")
+  }
+
   def testData: Any = {
     val testData = new Row(4)
     testData.setField(0, "trUeX_value")
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 6aed9a83c1b..4f1eb6863af 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -355,9 +355,9 @@ class CalcITCase(
   }
 
   @Test
-  def testUdfWithUnicodeParameter(): Unit = {
+  def testFunctionWithUnicodeParameters(): Unit = {
     val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"), // uses Java/Scala escaping
       ("x\u0001y", "y\"z", "z\\\"\u0004z")
     )
 
@@ -371,20 +371,23 @@ class CalcITCase(
     tEnv.registerFunction("splitUDF0", splitUDF0)
     tEnv.registerFunction("splitUDF1", splitUDF1)
 
-    // user have to specify '\' with '\\' in SQL
-    val sqlQuery = "SELECT " +
-      "splitUDF0(a, '\u0001', 0) as a0, " +
-      "splitUDF1(a, '\u0001', 0) as a1, " +
-      "splitUDF0(b, '\"', 1) as b0, " +
-      "splitUDF1(b, '\"', 1) as b1, " +
-      "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
-      "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
+    // uses SQL escaping (be aware that even Scala multi-line strings parse backslash!)
+    val sqlQuery = s"""
+      |SELECT
+      |  splitUDF0(a, U&'${'\\'}0001', 0) AS a0,
+      |  splitUDF1(a, U&'${'\\'}0001', 0) AS a1,
+      |  splitUDF0(b, U&'"', 1) AS b0,
+      |  splitUDF1(b, U&'"', 1) AS b1,
+      |  splitUDF0(c, U&'${'\\'}${'\\'}"${'\\'}0004', 0) AS c0,
+      |  splitUDF1(c, U&'${'\\'}"#0004' UESCAPE '#', 0) AS c1
+      |FROM T1
+      |""".stripMargin
 
     val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
 
     tEnv.registerTable("T1", t1)
 
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
 
     val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 71a87d2c1ed..b161eeda43c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -550,30 +550,34 @@ class CalcITCase(
   }
 
   @Test
-  def testUDFWithUnicodeParameter(): Unit = {
+  def testFunctionWithUnicodeParameters(): Unit = {
     val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"), // uses Java/Scala escaping
       ("x\u0001y", "y\"z", "z\\\"\u0004z")
     )
+
     val env = ExecutionEnvironment.getExecutionEnvironment
+
     val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val splitUDF0 = new SplitUDF(deterministic = true)
     val splitUDF1 = new SplitUDF(deterministic = false)
+
+     // uses Java/Scala escaping
     val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-             .select(splitUDF0('a, "\u0001", 0) as 'a0,
-                     splitUDF1('a, "\u0001", 0) as 'a1,
-                     splitUDF0('b, "\"", 1) as 'b0,
-                     splitUDF1('b, "\"", 1) as 'b1,
-                     splitUDF0('c, "\\\"\u0004", 0) as 'c0,
-                     splitUDF1('c, "\\\"\u0004", 0) as 'c1
-             )
+      .select(
+        splitUDF0('a, "\u0001", 0) as 'a0,
+        splitUDF1('a, "\u0001", 0) as 'a1,
+        splitUDF0('b, "\"", 1) as 'b0,
+        splitUDF1('b, "\"", 1) as 'b1,
+        splitUDF0('c, "\\\"\u0004", 0) as 'c0,
+        splitUDF1('c, "\\\"\u0004", 0) as 'c1)
+
     val results = ds.collect()
-    val expected = List(
-      "a,a,d,d,e,e", "x,x,z,z,z,z"
-    ).mkString("\n")
+
+    val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
-
 }
 
 object CalcITCase {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index a2d9bb26c4c..f18705568b1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -781,45 +781,6 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  @Test
-  def testUdfWithUnicodeParameter(): Unit = {
-    val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
-      ("x\u0001y", "y\"z", "z\\\"\u0004z")
-    )
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val splitUDF0 = new SplitUDF(deterministic = true)
-    val splitUDF1 = new SplitUDF(deterministic = false)
-
-    tEnv.registerFunction("splitUDF0", splitUDF0)
-    tEnv.registerFunction("splitUDF1", splitUDF1)
-
-    // user have to specify '\' with '\\' in SQL
-    val sqlQuery = "SELECT " +
-      "splitUDF0(a, '\u0001', 0) as a0, " +
-      "splitUDF1(a, '\u0001', 0) as a1, " +
-      "splitUDF0(b, '\"', 1) as b0, " +
-      "splitUDF1(b, '\"', 1) as b1, " +
-      "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
-      "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
   @Test
   def testUDFWithLongVarargs(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index a20b6263448..6407ebdce22 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -351,32 +351,4 @@ class CalcITCase extends AbstractTestBase {
       "{9=Comment#3}")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
-
-  @Test
-  def testUDFWithUnicodeParameter(): Unit = {
-    val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
-      ("x\u0001y", "y\"z", "z\\\"\u0004z")
-    )
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val splitUDF0 = new SplitUDF(deterministic = true)
-    val splitUDF1 = new SplitUDF(deterministic = false)
-    val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-      .select(splitUDF0('a, "\u0001", 0) as 'a0,
-              splitUDF1('a, "\u0001", 0) as 'a1,
-              splitUDF0('b, "\"", 1) as 'b0,
-              splitUDF1('b, "\"", 1) as 'b1,
-              splitUDF0('c, "\\\"\u0004", 0) as 'c0,
-              splitUDF1('c, "\\\"\u0004", 0) as 'c1
-      )
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-    val expected = mutable.MutableList(
-      "a,a,d,d,e,e", "x,x,z,z,z,z"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services