You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/09/27 05:56:52 UTC

[flink] branch master updated: [FLINK-10145] [table] Add replace function in Table API and SQL

This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 24af70f  [FLINK-10145] [table] Add replace function in Table API and SQL
24af70f is described below

commit 24af70fdecbbb66e8555df7aca35a92a2f1aa7ac
Author: Guibo Pan <gu...@gmail.com>
AuthorDate: Sun Aug 19 01:09:38 2018 +0800

    [FLINK-10145] [table] Add replace function in Table API and SQL
    
    This closes #6576.
---
 docs/dev/table/functions.md                        | 36 ++++++++++++++++++++++
 .../flink/table/api/scala/expressionDsl.scala      |  7 +++++
 .../table/codegen/calls/FunctionGenerator.scala    |  6 ++++
 .../table/expressions/stringExpressions.scala      | 24 +++++++++++++++
 .../flink/table/validate/FunctionCatalog.scala     |  2 ++
 .../table/expressions/ScalarFunctionsTest.scala    | 33 ++++++++++++++++++++
 .../table/expressions/SqlExpressionTest.scala      |  1 +
 7 files changed, 109 insertions(+)

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 1108294..99a21b1 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2451,6 +2451,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ])
     <tr>
       <td>
         {% highlight text %}
+REPLACE(string1, string2, string3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a new string which replaces all the occurrences of <i>string2</i> with <i>string3</i> (non-overlapping) from <i>string1</i></p>
+        <p>E.g., <code>REPLACE("hello world", "world", "flink")</code> returns "hello flink"; <code>REPLACE("ababab", "abab", "z")</code> returns "zab".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
 INITCAP(string)
 {% endhighlight %}
       </td>
@@ -2691,6 +2703,18 @@ STRING.substring(INT1, INT2)
     <tr>
       <td>
         {% highlight java %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a new string which replaces all the occurrences of <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
+        <p>E.g., <code>'hello world'.replace('world', 'flink')</code> returns 'hello flink'; <code>'ababab'.replace('abab', 'z')</code> returns 'zab'.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
 STRING.initCap()
 {% endhighlight %}
       </td>
@@ -2930,6 +2954,18 @@ STRING.substring(INT1, INT2)
     <tr>
       <td>
         {% highlight scala %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a new string which replaces all the occurrences of <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
+        <p>E.g., <code>"hello world".replace("world", "flink")</code> returns "hello flink"; <code>"ababab".replace("abab", "z")</code> returns "zab".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 STRING.initCap()
 {% endhighlight %}
       </td>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 14adb71..9a39bf5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -472,6 +472,13 @@ trait ImplicitExpressionOperations {
   }
 
   /**
+    * Returns a new string which replaces all the occurrences of the search target
+    * with the replacement string (non-overlapping).
+    */
+  def replace(search: Expression, replacement: Expression) =
+    Replace(expr, search, replacement)
+
+  /**
     * Returns the length of a string.
     */
   def charLength() = CharLength(expr)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index e320052..bd05f9b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -154,6 +154,12 @@ object FunctionGenerator {
     BuiltInMethods.REGEXP_REPLACE)
 
   addSqlFunctionMethod(
+    REPLACE,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.REPLACE.method)
+
+  addSqlFunctionMethod(
     FROM_BASE64,
     Seq(STRING_TYPE_INFO),
     STRING_TYPE_INFO,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 5734b36..d0ae246 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends Expression with InputT
 
   override def toString: String = s"($str).repeat($n)"
 }
+
+/**
+  * Returns a new string which replaces all the occurrences of the search target
+  * with the replacement string (non-overlapping).
+  */
+case class Replace(str: Expression,
+  search: Expression,
+  replacement: Expression) extends Expression with InputTypeSpec {
+
+  def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))
+
+  override private[flink] def children: Seq[Expression] = str :: search :: replacement :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString: String = s"($str).replace($search, $replacement)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode))
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 801d2e9..cd804c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -193,6 +193,7 @@ object FunctionCatalog {
     "lowerCase" -> classOf[Lower],
     "similar" -> classOf[Similar],
     "substring" -> classOf[Substring],
+    "replace" -> classOf[Replace],
     "trim" -> classOf[Trim],
     "upper" -> classOf[Upper],
     "upperCase" -> classOf[Upper],
@@ -444,6 +445,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.RAND_INTEGER,
     ScalarSqlFunctions.CONCAT,
     ScalarSqlFunctions.CONCAT_WS,
+    SqlStdOperatorTable.REPLACE,
     ScalarSqlFunctions.BIN,
     ScalarSqlFunctions.HEX,
     SqlStdOperatorTable.TIMESTAMP_ADD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 7614f19..1fa81b0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -95,6 +95,39 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testReplace(): Unit = {
+    testAllApis(
+      'f0.replace(" ", "_"),
+      "f0.replace(' ', '_')",
+      "REPLACE(f0, ' ', '_')",
+      "This_is_a_test_String.")
+
+    testAllApis(
+      'f0.replace("i", ""),
+      "f0.replace('i', '')",
+      "REPLACE(f0, 'i', '')",
+      "Ths s a test Strng.")
+
+    testAllApis(
+      'f33.replace("i", ""),
+      "f33.replace('i', '')",
+      "REPLACE(f33, 'i', '')",
+      "null")
+
+    testAllApis(
+      'f0.replace(Null(Types.STRING), ""),
+      "f0.replace(Null(STRING), '')",
+      "REPLACE(f0, NULLIF('', ''), '')",
+      "null")
+
+    testAllApis(
+      'f0.replace(" ", Null(Types.STRING)),
+      "f0.replace(' ', Null(STRING))",
+      "REPLACE(f0, ' ', NULLIF('', ''))",
+      "null")
+  }
+
+  @Test
   def testTrim(): Unit = {
     testAllApis(
       'f8.trim(),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index b49922b..ab6d7d1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -155,6 +155,7 @@ class SqlExpressionTest extends ExpressionTestBase {
       "REPEAT('This is a test String.', 2)",
       "This is a test String.This is a test String.")
     testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
+    testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink")
   }
 
   @Test