You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/09/27 05:56:52 UTC
[flink] branch master updated: [FLINK-10145] [table] Add replace
function in Table API and SQL
This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 24af70f [FLINK-10145] [table] Add replace function in Table API and SQL
24af70f is described below
commit 24af70fdecbbb66e8555df7aca35a92a2f1aa7ac
Author: Guibo Pan <gu...@gmail.com>
AuthorDate: Sun Aug 19 01:09:38 2018 +0800
[FLINK-10145] [table] Add replace function in Table API and SQL
This closes #6576.
---
docs/dev/table/functions.md | 36 ++++++++++++++++++++++
.../flink/table/api/scala/expressionDsl.scala | 7 +++++
.../table/codegen/calls/FunctionGenerator.scala | 6 ++++
.../table/expressions/stringExpressions.scala | 24 +++++++++++++++
.../flink/table/validate/FunctionCatalog.scala | 2 ++
.../table/expressions/ScalarFunctionsTest.scala | 33 ++++++++++++++++++++
.../table/expressions/SqlExpressionTest.scala | 1 +
7 files changed, 109 insertions(+)
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 1108294..99a21b1 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2451,6 +2451,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ])
<tr>
<td>
{% highlight text %}
+REPLACE(string1, string2, string3)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a new string which replaces all the occurrences of <i>string2</i> with <i>string3</i> (non-overlapping) from <i>string1</i></p>
+ <p>E.g., <code>REPLACE("hello world", "world", "flink")</code> returns "hello flink"; <code>REPLACE("ababab", "abab", "z")</code> returns "zab".</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
INITCAP(string)
{% endhighlight %}
</td>
@@ -2691,6 +2703,18 @@ STRING.substring(INT1, INT2)
<tr>
<td>
{% highlight java %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a new string which replaces all the occurrences of <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
+ <p>E.g., <code>'hello world'.replace('world', 'flink')</code> returns 'hello flink'; <code>'ababab'.replace('abab', 'z')</code> returns 'zab'.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
STRING.initCap()
{% endhighlight %}
</td>
@@ -2930,6 +2954,18 @@ STRING.substring(INT1, INT2)
<tr>
<td>
{% highlight scala %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a new string which replaces all the occurrences of <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
+ <p>E.g., <code>"hello world".replace("world", "flink")</code> returns "hello flink"; <code>"ababab".replace("abab", "z")</code> returns "zab".</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
STRING.initCap()
{% endhighlight %}
</td>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 14adb71..9a39bf5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -472,6 +472,13 @@ trait ImplicitExpressionOperations {
}
/**
+ * Returns a new string which replaces all the occurrences of the search target
+ * with the replacement string (non-overlapping).
+ */
+ def replace(search: Expression, replacement: Expression) =
+ Replace(expr, search, replacement)
+
+ /**
* Returns the length of a string.
*/
def charLength() = CharLength(expr)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index e320052..bd05f9b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -154,6 +154,12 @@ object FunctionGenerator {
BuiltInMethods.REGEXP_REPLACE)
addSqlFunctionMethod(
+ REPLACE,
+ Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethod.REPLACE.method)
+
+ addSqlFunctionMethod(
FROM_BASE64,
Seq(STRING_TYPE_INFO),
STRING_TYPE_INFO,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 5734b36..d0ae246 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends Expression with InputT
override def toString: String = s"($str).repeat($n)"
}
+
+/**
+ * Returns a new string which replaces all the occurrences of the search target
+ * with the replacement string (non-overlapping).
+ */
+case class Replace(str: Expression,
+ search: Expression,
+ replacement: Expression) extends Expression with InputTypeSpec {
+
+ def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))
+
+ override private[flink] def children: Seq[Expression] = str :: search :: replacement :: Nil
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+ override def toString: String = s"($str).replace($search, $replacement)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode))
+ }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 801d2e9..cd804c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -193,6 +193,7 @@ object FunctionCatalog {
"lowerCase" -> classOf[Lower],
"similar" -> classOf[Similar],
"substring" -> classOf[Substring],
+ "replace" -> classOf[Replace],
"trim" -> classOf[Trim],
"upper" -> classOf[Upper],
"upperCase" -> classOf[Upper],
@@ -444,6 +445,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.RAND_INTEGER,
ScalarSqlFunctions.CONCAT,
ScalarSqlFunctions.CONCAT_WS,
+ SqlStdOperatorTable.REPLACE,
ScalarSqlFunctions.BIN,
ScalarSqlFunctions.HEX,
SqlStdOperatorTable.TIMESTAMP_ADD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 7614f19..1fa81b0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -95,6 +95,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
@Test
+ def testReplace(): Unit = {
+ testAllApis(
+ 'f0.replace(" ", "_"),
+ "f0.replace(' ', '_')",
+ "REPLACE(f0, ' ', '_')",
+ "This_is_a_test_String.")
+
+ testAllApis(
+ 'f0.replace("i", ""),
+ "f0.replace('i', '')",
+ "REPLACE(f0, 'i', '')",
+ "Ths s a test Strng.")
+
+ testAllApis(
+ 'f33.replace("i", ""),
+ "f33.replace('i', '')",
+ "REPLACE(f33, 'i', '')",
+ "null")
+
+ testAllApis(
+ 'f0.replace(Null(Types.STRING), ""),
+ "f0.replace(Null(STRING), '')",
+ "REPLACE(f0, NULLIF('', ''), '')",
+ "null")
+
+ testAllApis(
+ 'f0.replace(" ", Null(Types.STRING)),
+ "f0.replace(' ', Null(STRING))",
+ "REPLACE(f0, ' ', NULLIF('', ''))",
+ "null")
+ }
+
+ @Test
def testTrim(): Unit = {
testAllApis(
'f8.trim(),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index b49922b..ab6d7d1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -155,6 +155,7 @@ class SqlExpressionTest extends ExpressionTestBase {
"REPEAT('This is a test String.', 2)",
"This is a test String.This is a test String.")
testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
+ testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink")
}
@Test