You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/09/12 14:16:57 UTC

[flink] branch master updated: [FLINK-10281] [table] Fix string literal escaping throughout Table & SQL API

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c87b943  [FLINK-10281] [table] Fix string literal escaping throughout Table & SQL API
c87b943 is described below

commit c87b9433e3225602e56497f782a610668eac8bf9
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Sep 7 13:32:05 2018 +0200

    [FLINK-10281] [table] Fix string literal escaping throughout Table & SQL API
    
    This commit fixes the string literal escaping of the Table & SQL API. Proper
    escaping of quotes was not possible in the past for Table API. SQL and SQL Client
    were not standard compliant.
    
    Due to FLINK-8301 backslashes were considered in SQL literals, however, they
    should only be used in SQL `U&'\1234'` literals. For the Table API, the new
    logic relies on the Java/Scala escaping and uses duplicate quotes for escaping
    the quotes in expression strings. For SQL, we rely on unicode string literals
    with or without the UESCAPE clause. The SQL Client was using backslashes for
    escaping new lines. For the SQL Client, we allow unescaped new lines and
    use ';' for statement finalization; similar to other SQL clients.
    
    This closes #6671.
---
 docs/dev/table/sql.md                              |  5 ++
 docs/dev/table/sqlClient.md                        | 12 +--
 docs/dev/table/tableApi.md                         |  7 +-
 .../apache/flink/table/client/cli/CliClient.java   | 19 +++--
 .../apache/flink/table/client/cli/CliStrings.java  | 10 +--
 .../flink/table/client/cli/SqlMultiLineParser.java | 52 +++++++++++++
 .../apache/flink/table/codegen/CodeGenerator.scala |  4 +-
 .../flink/table/codegen/ExpressionReducer.scala    |  7 +-
 .../flink/table/expressions/ExpressionParser.scala | 19 +++--
 .../apache/flink/table/expressions/literals.scala  |  5 --
 .../flink/table/expressions/LiteralTest.scala      | 87 +++++++++++++++++++++-
 .../flink/table/runtime/batch/sql/CalcITCase.scala | 25 ++++---
 .../table/runtime/batch/table/CalcITCase.scala     | 30 ++++----
 .../flink/table/runtime/stream/sql/SqlITCase.scala | 39 ----------
 .../table/runtime/stream/table/CalcITCase.scala    | 28 -------
 15 files changed, 215 insertions(+), 134 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 604e989..a9fd94f 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -204,6 +204,11 @@ Flink SQL uses a lexical policy for identifier (table, attribute, function names
 - After which, identifiers are matched case-sensitively.
 - Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. <code>"SELECT a AS `my field` FROM t"</code>).
 
+String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`). Duplicate a single quote for escaping (e.g., `SELECT 'It''s me.'`). Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:
+
+- Use the backslash (`\`) as escaping character (default): `SELECT U&'\263A'`
+- Use a custom escaping character: `SELECT U&'#263A' UESCAPE '#'`
+
 {% top %}
 
 Operations
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 296d638..5224842 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -61,7 +61,7 @@ By default, the SQL Client will read its configuration from the environment file
 Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it:
 
 {% highlight sql %}
-SELECT 'Hello World'
+SELECT 'Hello World';
 {% endhighlight %}
 
 This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key.
@@ -71,19 +71,19 @@ The CLI supports **two modes** for maintaining and visualizing results.
 The **table mode** materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI:
 
 {% highlight text %}
-SET execution.result-mode=table
+SET execution.result-mode=table;
 {% endhighlight %}
 
 The **changelog mode** does not materialize results and visualizes the result stream that is produced by a [continuous query](streaming.html#dynamic-tables--continuous-queries) consisting of insertions (`+`) and retractions (`-`).
 
 {% highlight text %}
-SET execution.result-mode=changelog
+SET execution.result-mode=changelog;
 {% endhighlight %}
 
 You can use the following query to see both result modes in action:
 
 {% highlight sql %}
-SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name 
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
 {% endhighlight %}
 
 This query performs a bounded word count example.
@@ -494,13 +494,13 @@ Similar to table sources and sinks, views defined in a session environment file
 Views can also be created within a CLI session using the `CREATE VIEW` statement:
 
 {% highlight text %}
-CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource
+CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
 {% endhighlight %}
 
 Views created within a CLI session can also be removed again using the `DROP VIEW` statement:
 
 {% highlight text %}
-DROP VIEW MyNewView
+DROP VIEW MyNewView;
 {% endhighlight %}
 
 <span class="label label-danger">Attention</span> The definition of views is limited to the mentioned syntax above. Defining a schema for views or escape whitespaces in table names will be supported in future versions.
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f8bcd3d..1a21a57 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1650,8 +1650,11 @@ timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
 
 {% endhighlight %}
 
-Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The
-column names and function names follow Java identifier syntax. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.
+Here, `literal` is a valid Java literal. String literals can be specified using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s me.'` or `"I ""like"" dogs."`).
+
+The `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The column names and function names follow Java identifier syntax.
+
+Expressions specified as strings can also use prefix notation instead of suffix notation to call operators and functions.
 
 If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`.
 
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 8030c7c..7867532 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -32,7 +32,6 @@ import org.jline.reader.LineReader;
 import org.jline.reader.LineReaderBuilder;
 import org.jline.reader.MaskingCallback;
 import org.jline.reader.UserInterruptException;
-import org.jline.reader.impl.DefaultParser;
 import org.jline.terminal.Terminal;
 import org.jline.terminal.TerminalBuilder;
 import org.jline.utils.AttributedString;
@@ -94,13 +93,14 @@ public class CliClient {
 		}
 
 		// initialize line lineReader
-		final DefaultParser parser = new DefaultParser();
-		parser.setEofOnEscapedNewLine(true); // allows for multi-line commands
 		lineReader = LineReaderBuilder.builder()
 			.terminal(terminal)
 			.appName(CliStrings.CLI_NAME)
-			.parser(parser)
+			.parser(new SqlMultiLineParser())
 			.build();
+		// this option is disabled for now for correct backslash escaping
+		// a "SELECT '\'" query should return a string with a backslash
+		lineReader.option(LineReader.Option.DISABLE_EVENT_EXPANSION, true);
 
 		// create prompt
 		prompt = new AttributedStringBuilder()
@@ -168,16 +168,19 @@ public class CliClient {
 			terminal.writer().append("\n");
 			terminal.flush();
 
-			String line;
+			final String line;
 			try {
 				line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
-			} catch (UserInterruptException | EndOfFileException | IOError e) {
-				// user cancelled application with Ctrl+C or kill
+			} catch (UserInterruptException e) {
+				// user cancelled line with Ctrl+C
+				continue;
+			} catch (EndOfFileException | IOError e) {
+				// user cancelled application with Ctrl+D or kill
 				break;
 			} catch (Throwable t) {
 				throw new SqlClientException("Could not read from command line.", t);
 			}
-			if (line == null || line.equals("")) {
+			if (line == null) {
 				continue;
 			}
 			final Optional<SqlCommandCall> cmdCall = parseCommand(line);
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index baa7898..d68f56b 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -50,15 +50,15 @@ public final class CliStrings {
 		.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
 		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
 		.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
-		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>'"))
-		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>'"))
+		.append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+		.append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
 		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
-		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
+		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
 		.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
 		.style(AttributedStyle.DEFAULT.underline())
 		.append("\nHint")
 		.style(AttributedStyle.DEFAULT)
-		.append(": Use '\\' for multi-line commands.")
+		.append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
 		.toAttributedString();
 
 	public static final String MESSAGE_WELCOME;
@@ -104,7 +104,7 @@ public final class CliStrings {
 			"   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ \n" +
 			"   |_|    |_|_|_| |_|_|\\_\\ |_____/ \\___\\_\\______|  \\_____|_|_|\\___|_| |_|\\__|\n" +
 			"          \n" +
-			"        Welcome! Enter HELP to list all available commands. QUIT to exit.\n\n";
+			"        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.\n\n";
 	}
 
 	public static final String MESSAGE_QUIT = "Exiting " + CliStrings.CLI_NAME + "...";
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
new file mode 100644
index 0000000..fb44a1e
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlMultiLineParser.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.jline.reader.EOFError;
+import org.jline.reader.ParsedLine;
+import org.jline.reader.Parser;
+import org.jline.reader.impl.DefaultParser;
+
+import java.util.Collections;
+
+/**
+ * Multi-line parser for parsing an arbitrary number of SQL lines until a line ends with ';'.
+ */
+public class SqlMultiLineParser implements Parser {
+
+	private static final String EOF_CHARACTER = ";";
+	private static final String NEW_LINE_PROMPT = ""; // results in simple '>' output
+
+	@Override
+	public ParsedLine parse(String line, int cursor, ParseContext context) {
+		if (!line.trim().endsWith(EOF_CHARACTER)) {
+			throw new EOFError(
+				-1,
+				-1,
+				"New line without EOF character.",
+				NEW_LINE_PROMPT);
+		}
+		return new DefaultParser.ArgumentList(
+			line,
+			Collections.singletonList(line),
+			0,
+			0,
+			cursor);
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 6cabe21..0fdd885 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -681,9 +681,7 @@ abstract class CodeGenerator(
         generateNonNullLiteral(resultType, decimalField)
 
       case VARCHAR | CHAR =>
-        val escapedValue = StringEscapeUtils.escapeJava(
-          StringEscapeUtils.unescapeJava(value.toString)
-        )
+        val escapedValue = StringEscapeUtils.ESCAPE_JAVA.translate(value.toString)
         generateNonNullLiteral(resultType, "\"" + escapedValue + "\"")
 
       case SYMBOL =>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index a9dbf19..f58e12c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -120,12 +120,7 @@ class ExpressionReducer(config: TableConfig)
              SqlTypeName.MAP |
              SqlTypeName.MULTISET =>
           reducedValues.add(unreduced)
-        // after expression reduce, the literal string has to be escaped
-        case SqlTypeName.VARCHAR | SqlTypeName.CHAR =>
-          val escapeVarchar = StringEscapeUtils
-            .escapeJava(reduced.getField(reducedIdx).asInstanceOf[String])
-          reducedValues.add(rexBuilder.makeLiteral(escapeVarchar, unreduced.getType, true))
-          reducedIdx += 1
+
         case _ =>
           val reducedValue = reduced.getField(reducedIdx)
           // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index c0a577d..633afb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -185,13 +185,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
           }
       }
 
-  lazy val singleQuoteStringLiteral: Parser[Expression] =
-    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
-      str => Literal(str.substring(1, str.length - 1))
-    }
+  // string with single quotes such as 'It''s me.'
+  lazy val singleQuoteStringLiteral: Parser[Expression] = "'(?:''|[^'])*'".r ^^ {
+    str =>
+      val escaped = str.substring(1, str.length - 1).replace("''", "'")
+      Literal(escaped)
+  }
 
-  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
-    str => Literal(str.substring(1, str.length - 1))
+  // string with double quotes such as "I ""like"" dogs."
+  lazy val doubleQuoteStringLiteral: PackratParser[Expression] = "\"(?:\"\"|[^\"])*\"".r ^^ {
+    str =>
+      val escaped = str.substring(1, str.length - 1).replace("\"\"", "\"")
+      Literal(escaped)
   }
 
   lazy val boolLiteral: PackratParser[Expression] = (TRUE | FALSE) ^^ {
@@ -203,7 +208,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   }
 
   lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral
+    numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | boolLiteral
 
   lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
     sym => UnresolvedFieldReference(sym)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 863dfc1..419c125 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -104,11 +104,6 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
           SqlParserPos.ZERO)
         relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
 
-      case BasicTypeInfo.STRING_TYPE_INFO =>
-        relBuilder.getRexBuilder.makeLiteral(
-          StringEscapeUtils.escapeJava(value.asInstanceOf[String])
-        )
-
       case _ => relBuilder.literal(value)
     }
   }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala
index 0a60eae..bf568c7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/LiteralTest.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.{ExpressionTestBase, Func3}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.types.Row
@@ -88,6 +88,91 @@ class LiteralTest extends ExpressionTestBase {
       s"42 and $hello")
   }
 
+  @Test
+  def testStringLiterals(): Unit = {
+
+    // these tests use Java/Scala escaping for non-quoting unicode characters
+
+    testAllApis(
+      ">\n<",
+      "'>\n<'",
+      "'>\n<'",
+      ">\n<")
+
+    testAllApis(
+      ">\u263A<",
+      "'>\u263A<'",
+      "'>\u263A<'",
+      ">\u263A<")
+
+    testAllApis(
+      ">\u263A<",
+      "'>\u263A<'",
+      "'>\u263A<'",
+      ">\u263A<")
+
+    testAllApis(
+      ">\\<",
+      "'>\\<'",
+      "'>\\<'",
+      ">\\<")
+
+    testAllApis(
+      ">'<",
+      "'>''<'",
+      "'>''<'",
+      ">'<")
+
+    testAllApis(
+      " ",
+      "' '",
+      "' '",
+      " ")
+
+    testAllApis(
+      "",
+      "''",
+      "''",
+      "")
+
+    testAllApis(
+      ">foo([\\w]+)<",
+      "'>foo([\\w]+)<'",
+      "'>foo([\\w]+)<'",
+      ">foo([\\w]+)<")
+
+    testAllApis(
+      ">\\'\n<",
+      "\">\\'\n<\"",
+      "'>\\''\n<'",
+      ">\\'\n<")
+
+    testAllApis(
+      "It's me.",
+      "'It''s me.'",
+      "'It''s me.'",
+      "It's me.")
+
+    testTableApi(
+      """I "like" dogs.""",
+      """"I ""like"" dogs."""",
+      """I "like" dogs.""")
+
+    // these test use SQL for describing unicode characters
+
+    testSqlApi(
+      "U&'>\\263A<'", // default escape backslash
+      ">\u263A<")
+
+    testSqlApi(
+      "U&'>#263A<' UESCAPE '#'", // custom escape '#'
+      ">\u263A<")
+
+    testSqlApi(
+      """'>\\<'""",
+      ">\\\\<")
+  }
+
   def testData: Any = {
     val testData = new Row(4)
     testData.setField(0, "trUeX_value")
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 6aed9a8..4f1eb68 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -355,9 +355,9 @@ class CalcITCase(
   }
 
   @Test
-  def testUdfWithUnicodeParameter(): Unit = {
+  def testFunctionWithUnicodeParameters(): Unit = {
     val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"), // uses Java/Scala escaping
       ("x\u0001y", "y\"z", "z\\\"\u0004z")
     )
 
@@ -371,20 +371,23 @@ class CalcITCase(
     tEnv.registerFunction("splitUDF0", splitUDF0)
     tEnv.registerFunction("splitUDF1", splitUDF1)
 
-    // user have to specify '\' with '\\' in SQL
-    val sqlQuery = "SELECT " +
-      "splitUDF0(a, '\u0001', 0) as a0, " +
-      "splitUDF1(a, '\u0001', 0) as a1, " +
-      "splitUDF0(b, '\"', 1) as b0, " +
-      "splitUDF1(b, '\"', 1) as b1, " +
-      "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
-      "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
+    // uses SQL escaping (be aware that even Scala multi-line strings parse backslash!)
+    val sqlQuery = s"""
+      |SELECT
+      |  splitUDF0(a, U&'${'\\'}0001', 0) AS a0,
+      |  splitUDF1(a, U&'${'\\'}0001', 0) AS a1,
+      |  splitUDF0(b, U&'"', 1) AS b0,
+      |  splitUDF1(b, U&'"', 1) AS b1,
+      |  splitUDF0(c, U&'${'\\'}${'\\'}"${'\\'}0004', 0) AS c0,
+      |  splitUDF1(c, U&'${'\\'}"#0004' UESCAPE '#', 0) AS c1
+      |FROM T1
+      |""".stripMargin
 
     val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
 
     tEnv.registerTable("T1", t1)
 
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
 
     val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 71a87d2..b161eed 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -550,30 +550,34 @@ class CalcITCase(
   }
 
   @Test
-  def testUDFWithUnicodeParameter(): Unit = {
+  def testFunctionWithUnicodeParameters(): Unit = {
     val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+      ("a\u0001b", "c\"d", "e\\\"\u0004f"), // uses Java/Scala escaping
       ("x\u0001y", "y\"z", "z\\\"\u0004z")
     )
+
     val env = ExecutionEnvironment.getExecutionEnvironment
+
     val tEnv = TableEnvironment.getTableEnvironment(env)
+
     val splitUDF0 = new SplitUDF(deterministic = true)
     val splitUDF1 = new SplitUDF(deterministic = false)
+
+     // uses Java/Scala escaping
     val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-             .select(splitUDF0('a, "\u0001", 0) as 'a0,
-                     splitUDF1('a, "\u0001", 0) as 'a1,
-                     splitUDF0('b, "\"", 1) as 'b0,
-                     splitUDF1('b, "\"", 1) as 'b1,
-                     splitUDF0('c, "\\\"\u0004", 0) as 'c0,
-                     splitUDF1('c, "\\\"\u0004", 0) as 'c1
-             )
+      .select(
+        splitUDF0('a, "\u0001", 0) as 'a0,
+        splitUDF1('a, "\u0001", 0) as 'a1,
+        splitUDF0('b, "\"", 1) as 'b0,
+        splitUDF1('b, "\"", 1) as 'b1,
+        splitUDF0('c, "\\\"\u0004", 0) as 'c0,
+        splitUDF1('c, "\\\"\u0004", 0) as 'c1)
+
     val results = ds.collect()
-    val expected = List(
-      "a,a,d,d,e,e", "x,x,z,z,z,z"
-    ).mkString("\n")
+
+    val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
-
 }
 
 object CalcITCase {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index a2d9bb2..f187055 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -782,45 +782,6 @@ class SqlITCase extends StreamingWithStateTestBase {
   }
 
   @Test
-  def testUdfWithUnicodeParameter(): Unit = {
-    val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
-      ("x\u0001y", "y\"z", "z\\\"\u0004z")
-    )
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val splitUDF0 = new SplitUDF(deterministic = true)
-    val splitUDF1 = new SplitUDF(deterministic = false)
-
-    tEnv.registerFunction("splitUDF0", splitUDF0)
-    tEnv.registerFunction("splitUDF1", splitUDF1)
-
-    // user have to specify '\' with '\\' in SQL
-    val sqlQuery = "SELECT " +
-      "splitUDF0(a, '\u0001', 0) as a0, " +
-      "splitUDF1(a, '\u0001', 0) as a1, " +
-      "splitUDF0(b, '\"', 1) as b0, " +
-      "splitUDF1(b, '\"', 1) as b1, " +
-      "splitUDF0(c, '\\\\\"\u0004', 0) as c0, " +
-      "splitUDF1(c, '\\\\\"\u0004', 0) as c1 from T1"
-
-    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-
-    tEnv.registerTable("T1", t1)
-
-    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
   def testUDFWithLongVarargs(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index a20b626..6407ebd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -351,32 +351,4 @@ class CalcITCase extends AbstractTestBase {
       "{9=Comment#3}")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
-
-  @Test
-  def testUDFWithUnicodeParameter(): Unit = {
-    val data = List(
-      ("a\u0001b", "c\"d", "e\\\"\u0004f"),
-      ("x\u0001y", "y\"z", "z\\\"\u0004z")
-    )
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val splitUDF0 = new SplitUDF(deterministic = true)
-    val splitUDF1 = new SplitUDF(deterministic = false)
-    val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
-      .select(splitUDF0('a, "\u0001", 0) as 'a0,
-              splitUDF1('a, "\u0001", 0) as 'a1,
-              splitUDF0('b, "\"", 1) as 'b0,
-              splitUDF1('b, "\"", 1) as 'b1,
-              splitUDF0('c, "\\\"\u0004", 0) as 'c0,
-              splitUDF1('c, "\\\"\u0004", 0) as 'c1
-      )
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-    val expected = mutable.MutableList(
-      "a,a,d,d,e,e", "x,x,z,z,z,z"
-    )
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
 }