You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/27 05:58:56 UTC

[GitHub] asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL

asfgit closed pull request #6576: [FLINK-10145][table] Add replace supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6576
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 11082941385..00465727399 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2448,6 +2448,18 @@ SUBSTRING(string FROM integer1 [ FOR integer2 ])
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight text %}
+REPLACE(string1, string2, string3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a new string which replaces <i>string2</i> with <i>string3</i> (non-overlapping) from <i>string1</i></p>
+        <p>E.g., <code>REPLACE("hello world", "world", "flink")</code> returns "hello flink"; <code>REPLACE("ababab", "abab", "z")</code> returns "zab".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight text %}
@@ -2688,6 +2700,18 @@ STRING.substring(INT1, INT2)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a new string which replaces <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
+        <p>E.g., <code>'hello world'.replace('world', 'flink')</code> returns 'hello flink'; <code>'ababab'.replace('abab', 'z')</code> returns 'zab'.</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight java %}
@@ -2927,6 +2951,18 @@ STRING.substring(INT1, INT2)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+STRING1.replace(STRING2, STRING3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a new string which replaces <i>STRING2</i> with <i>STRING3</i> (non-overlapping) from <i>STRING1</i>.</p>
+        <p>E.g., <code>"hello world".replace("world", "flink")</code> returns "hello flink"; <code>"ababab".replace("abab", "z")</code> returns "zab".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight scala %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 14adb71bbab..3c726678b24 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -471,6 +471,13 @@ trait ImplicitExpressionOperations {
     }
   }
 
+  /**
+    * Creates a new string of the given string with non-overlapping occurrences
+    * of given search replaced with replacement.
+    */
+  def replace(search: Expression, replacement: Expression) =
+    Replace(expr, search, replacement)
+
   /**
     * Returns the length of a string.
     */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index e32005213eb..bd05f9bd71e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -153,6 +153,12 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethods.REGEXP_REPLACE)
 
+  addSqlFunctionMethod(
+    REPLACE,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.REPLACE.method)
+
   addSqlFunctionMethod(
     FROM_BASE64,
     Seq(STRING_TYPE_INFO),
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 5734b369fdf..f5e6ad2fff3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -511,3 +511,27 @@ case class Repeat(str: Expression, n: Expression) extends Expression with InputT
 
   override def toString: String = s"($str).repeat($n)"
 }
+
+/**
+  * Returns the string `str` with all non-overlapping occurrences
+    * of `search` replaced with `replacement`.
+    */
+  case class Replace(str: Expression,
+                     search: Expression,
+                     replacement: Expression) extends Expression with InputTypeSpec {
+
+    def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))
+
+    override private[flink] def children: Seq[Expression] = str :: search :: replacement :: Nil
+
+    override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+    override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+      Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString: String = s"($str).replace($search, $replacement)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.REPLACE, children.map(_.toRexNode))
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 801d2e9ecc2..cd804c55aea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -193,6 +193,7 @@ object FunctionCatalog {
     "lowerCase" -> classOf[Lower],
     "similar" -> classOf[Similar],
     "substring" -> classOf[Substring],
+    "replace" -> classOf[Replace],
     "trim" -> classOf[Trim],
     "upper" -> classOf[Upper],
     "upperCase" -> classOf[Upper],
@@ -444,6 +445,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.RAND_INTEGER,
     ScalarSqlFunctions.CONCAT,
     ScalarSqlFunctions.CONCAT_WS,
+    SqlStdOperatorTable.REPLACE,
     ScalarSqlFunctions.BIN,
     ScalarSqlFunctions.HEX,
     SqlStdOperatorTable.TIMESTAMP_ADD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 7614f1992b9..26650772467 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -94,6 +94,45 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "his is a test String.")
   }
 
+  @Test
+  def testReplace(): Unit = {
+    testAllApis(
+      'f0.replace("This", "That"),
+      "f0.replace('This', 'That')",
+      "REPLACE(f0, 'This', 'That')",
+      "That is a test String.")
+
+    testAllApis(
+      'f0.replace(" ", "_"),
+      "f0.replace(' ', '_')",
+      "REPLACE(f0, ' ', '_')",
+      "This_is_a_test_String.")
+
+    testAllApis(
+      'f0.replace("i", ""),
+      "f0.replace('i', '')",
+      "REPLACE(f0, 'i', '')",
+      "Ths s a test Strng.")
+
+    testAllApis(
+      'f33.replace("i", ""),
+      "f33.replace('i', '')",
+      "REPLACE(f33, 'i', '')",
+      "null")
+
+    testAllApis(
+      'f0.replace(Null(Types.STRING), ""),
+      "f0.replace(Null(STRING), '')",
+      "REPLACE(f0, NULLIF('', ''), '')",
+      "null")
+
+    testAllApis(
+      'f0.replace(" ", Null(Types.STRING)),
+      "f0.replace(' ', Null(STRING))",
+      "REPLACE(f0, ' ', NULLIF('', ''))",
+      "null")
+  }
+
   @Test
   def testTrim(): Unit = {
     testAllApis(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index b49922bc3fc..269a490aa16 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -155,6 +155,9 @@ class SqlExpressionTest extends ExpressionTestBase {
       "REPEAT('This is a test String.', 2)",
       "This is a test String.This is a test String.")
     testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
+    testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink")
+    testSqlApi("REPLACE('ababab', 'abab', 'Z')", "Zab")
+    testSqlApi("REPLACE('ababab', 'a', '')", "bbb")
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services