You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/09/15 07:27:55 UTC

[flink] branch master updated: [FLINK-9991] [table] Add regexp_replace function to TableAPI and SQL

This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f03d15a  [FLINK-9991] [table] Add regexp_replace function to TableAPI and SQL
f03d15a is described below

commit f03d15a08ad5df44a4bb742d3edfdce211bf9e48
Author: yanghua <ya...@gmail.com>
AuthorDate: Sun Jul 29 21:36:05 2018 +0800

    [FLINK-9991] [table] Add regexp_replace function to TableAPI and SQL
    
    This closes #6450.
---
 docs/dev/table/functions.md                        | 36 ++++++++++++
 .../flink/table/api/scala/expressionDsl.scala      |  7 +++
 .../flink/table/codegen/calls/BuiltInMethods.scala |  7 +++
 .../table/codegen/calls/FunctionGenerator.scala    |  6 ++
 .../table/expressions/stringExpressions.scala      | 24 ++++++++
 .../table/functions/sql/ScalarSqlFunctions.scala   | 10 ++++
 .../table/runtime/functions/ScalarFunctions.scala  | 14 +++++
 .../flink/table/validate/FunctionCatalog.scala     |  2 +
 .../table/expressions/ScalarFunctionsTest.scala    | 64 ++++++++++++++++++++++
 .../table/expressions/SqlExpressionTest.scala      |  1 +
 10 files changed, 171 insertions(+)

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index e41fe45..85768ab 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2416,6 +2416,18 @@ REPEAT(string, integer)
     <tr>
       <td>
         {% highlight text %}
+REGEXP_REPLACE(string1, string2, string3)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string from <i>string1</i> with all the substrings that match a regular expression <i>string2</i> consecutively being replaced with <i>string3</i>.</p> 
+        <p>E.g., <code>REGEXP_REPLACE('foobar', 'oo|ar', '')</code> returns "fb".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
 OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ])
 {% endhighlight %}
       </td>
@@ -2638,6 +2650,18 @@ STRING.repeat(INT)
         <p>E.g., <code>'This is a test String.'.repeat(2)</code> returns "This is a test String.This is a test String.".</p>
       </td>
     </tr>    
+
+    <tr>
+      <td>
+        {% highlight java %}
+STRING1.regexpReplace(STRING2, STRING3)
+{% endhighlight %}
+      </td>
+       <td>
+         <p>Returns a string from <i>STRING1</i> with all the substrings that match a regular expression <i>STRING2</i> consecutively being replaced with <i>STRING3</i>.</p> 
+         <p>E.g., <code>'foobar'.regexpReplace('oo|ar', '')</code> returns "fb".</p>
+      </td>
+    </tr>
     
     <tr>
       <td>
@@ -2869,6 +2893,18 @@ STRING.repeat(INT)
     <tr>
       <td>
         {% highlight scala %}
+STRING1.regexpReplace(STRING2, STRING3)
+{% endhighlight %}
+      </td>
+       <td>
+         <p>Returns a string from <i>STRING1</i> with all the substrings that match a regular expression <i>STRING2</i> consecutively being replaced with <i>STRING3</i>.</p> 
+         <p>E.g. <code>"foobar".regexpReplace("oo|ar", "")</code> returns "fb".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 STRING1.overlay(STRING2, INT1)
 STRING1.overlay(STRING2, INT1, INT2)
 {% endhighlight %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 126cc5f..626012c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -565,6 +565,13 @@ trait ImplicitExpressionOperations {
     Overlay(expr, newString, starting, length)
 
   /**
+    * Returns a string with all substrings that match the regular expression consecutively
+    * being replaced.
+    */
+  def regexpReplace(regex: Expression, replacement: Expression) =
+    RegexpReplace(expr, regex, replacement)
+
+  /**
     * Returns the base string decoded with base64.
     */
   def fromBase64() = FromBase64(expr)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 899cb0f..f15da16 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -133,6 +133,13 @@ object BuiltInMethods {
 
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
 
+  val REGEXP_REPLACE = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "regexp_replace",
+    classOf[String],
+    classOf[String],
+    classOf[String])
+
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
 
   val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index c7eb869..b6eb3e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -147,6 +147,12 @@ object FunctionGenerator {
     BuiltInMethod.OVERLAY.method)
 
   addSqlFunctionMethod(
+    REGEXP_REPLACE,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.REGEXP_REPLACE)
+
+  addSqlFunctionMethod(
     FROM_BASE64,
     Seq(STRING_TYPE_INFO),
     STRING_TYPE_INFO,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 7079440..5734b36 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -359,6 +359,30 @@ case class Rpad(text: Expression, len: Expression, pad: Expression)
 }
 
 /**
+  * Returns a string with all substrings that match the regular expression consecutively
+  * being replaced.
+  */
+case class RegexpReplace(str: Expression, regex: Expression, replacement: Expression)
+  extends Expression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO)
+
+  override private[flink] def children: Seq[Expression] = Seq(str, regex, replacement)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.REGEXP_REPLACE, children.map(_.toRexNode))
+  }
+
+  override def toString: String = s"($str).regexp_replace($regex, $replacement)"
+}
+
+/**
   * Returns the base string decoded with base64.
   * Returns NULL If the input string is NULL.
   */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index db67e39..249eb8a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -187,6 +187,16 @@ object ScalarSqlFunctions {
     SqlFunctionCategory.TIMEDATE
   )
 
+  val REGEXP_REPLACE = new SqlFunction(
+    "REGEXP_REPLACE",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.STRING_STRING_STRING,
+    SqlFunctionCategory.STRING
+  )
+
   val FROM_BASE64 = new SqlFunction(
     "FROM_BASE64",
     SqlKind.OTHER_FUNCTION,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index fa6f020..001fb98 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions
 import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets
+import java.util.regex.Matcher
 
 import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.commons.lang3.StringUtils
@@ -205,6 +206,19 @@ object ScalarFunctions {
     new String(data)
   }
 
+
+  /**
+    * Returns a string resulting from replacing all substrings
+    * that match the regular expression with replacement.
+    */
+  def regexp_replace(str: String, regex: String, replacement: String): String = {
+    if (str == null || regex == null || replacement == null) {
+      return null
+    }
+
+    str.replaceAll(regex, Matcher.quoteReplacement(replacement))
+  }
+
   /**
     * Returns the base string decoded with base64.
     */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index fe508aa..c60f979 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -208,6 +208,7 @@ object FunctionCatalog {
     "ltrim" -> classOf[LTrim],
     "rtrim" -> classOf[RTrim],
     "repeat" -> classOf[Repeat],
+    "regexpReplace" -> classOf[RegexpReplace],
 
     // math functions
     "plus" -> classOf[Plus],
@@ -461,6 +462,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.LTRIM,
     ScalarSqlFunctions.RTRIM,
     ScalarSqlFunctions.REPEAT,
+    ScalarSqlFunctions.REGEXP_REPLACE,
 
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index fbd9b02..e8708cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -551,6 +551,70 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testRegexpReplace(): Unit = {
+
+    testAllApis(
+      "foobar".regexpReplace("oo|ar", "abc"),
+      "'foobar'.regexpReplace('oo|ar', 'abc')",
+      "regexp_replace('foobar', 'oo|ar', 'abc')",
+      "fabcbabc")
+
+    testAllApis(
+      "foofar".regexpReplace("^f", ""),
+      "'foofar'.regexpReplace('^f', '')",
+      "regexp_replace('foofar', '^f', '')",
+      "oofar")
+
+    testAllApis(
+      "foobar".regexpReplace("^f*.*r$", ""),
+      "'foobar'.regexpReplace('^f*.*r$', '')",
+      "regexp_replace('foobar', '^f*.*r$', '')",
+      "")
+
+    testAllApis(
+      "foo1bar2".regexpReplace("\\d", ""),
+      "'foo1bar2'.regexpReplace('\\d', '')",
+      "regexp_replace('foobar', '\\d', '')",
+      "foobar")
+
+    testAllApis(
+      "foobar".regexpReplace("\\w", ""),
+      "'foobar'.regexpReplace('\\w', '')",
+      "regexp_replace('foobar', '\\w', '')",
+      "")
+
+    testAllApis(
+      "fooobar".regexpReplace("oo", "$"),
+      "'fooobar'.regexpReplace('oo', '$')",
+      "regexp_replace('fooobar', 'oo', '$')",
+      "f$obar")
+
+    testAllApis(
+      "foobar".regexpReplace("oo", "\\"),
+      "'foobar'.regexpReplace('oo', '\\')",
+      "regexp_replace('foobar', 'oo', '\\')",
+      "f\\bar")
+
+    testAllApis(
+      'f33.regexpReplace("oo|ar", ""),
+      "f33.regexpReplace('oo|ar', '')",
+      "REGEXP_REPLACE(f33, 'oo|ar', '')",
+      "null")
+
+    testAllApis(
+      "foobar".regexpReplace('f33, ""),
+      "'foobar'.regexpReplace(f33, '')",
+      "REGEXP_REPLACE('foobar', f33, '')",
+      "null")
+
+    testAllApis(
+      "foobar".regexpReplace("oo|ar", 'f33),
+      "'foobar'.regexpReplace('oo|ar', f33)",
+      "REGEXP_REPLACE('foobar', 'oo|ar', f33)",
+      "null")
+  }
+
+  @Test
   def testFromBase64(): Unit = {
     testAllApis(
       'f35.fromBase64(),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 4a79a61..b49922b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -154,6 +154,7 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi(
       "REPEAT('This is a test String.', 2)",
       "This is a test String.This is a test String.")
+    testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
   }
 
   @Test