You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/09/15 07:27:55 UTC
[flink] branch master updated: [FLINK-9991] [table] Add
regexp_replace function to TableAPI and SQL
This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f03d15a [FLINK-9991] [table] Add regexp_replace function to TableAPI and SQL
f03d15a is described below
commit f03d15a08ad5df44a4bb742d3edfdce211bf9e48
Author: yanghua <ya...@gmail.com>
AuthorDate: Sun Jul 29 21:36:05 2018 +0800
[FLINK-9991] [table] Add regexp_replace function to TableAPI and SQL
This closes #6450.
---
docs/dev/table/functions.md | 36 ++++++++++++
.../flink/table/api/scala/expressionDsl.scala | 7 +++
.../flink/table/codegen/calls/BuiltInMethods.scala | 7 +++
.../table/codegen/calls/FunctionGenerator.scala | 6 ++
.../table/expressions/stringExpressions.scala | 24 ++++++++
.../table/functions/sql/ScalarSqlFunctions.scala | 10 ++++
.../table/runtime/functions/ScalarFunctions.scala | 14 +++++
.../flink/table/validate/FunctionCatalog.scala | 2 +
.../table/expressions/ScalarFunctionsTest.scala | 64 ++++++++++++++++++++++
.../table/expressions/SqlExpressionTest.scala | 1 +
10 files changed, 171 insertions(+)
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index e41fe45..85768ab 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2416,6 +2416,18 @@ REPEAT(string, integer)
<tr>
<td>
{% highlight text %}
+REGEXP_REPLACE(string1, string2, string3)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string from <i>string1</i> with all the substrings that match a regular expression <i>string2</i> consecutively being replaced with <i>string3</i>.</p>
+ <p>E.g., <code>REGEXP_REPLACE('foobar', 'oo|ar', '')</code> returns "fb".</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ])
{% endhighlight %}
</td>
@@ -2638,6 +2650,18 @@ STRING.repeat(INT)
<p>E.g., <code>'This is a test String.'.repeat(2)</code> returns "This is a test String.This is a test String.".</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+STRING1.regexpReplace(STRING2, STRING3)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string from <i>STRING1</i> with all the substrings that match a regular expression <i>STRING2</i> consecutively being replaced with <i>STRING3</i>.</p>
+ <p>E.g., <code>'foobar'.regexpReplace('oo|ar', '')</code> returns "fb".</p>
+ </td>
+ </tr>
<tr>
<td>
@@ -2869,6 +2893,18 @@ STRING.repeat(INT)
<tr>
<td>
{% highlight scala %}
+STRING1.regexpReplace(STRING2, STRING3)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string from <i>STRING1</i> with all the substrings that match a regular expression <i>STRING2</i> consecutively being replaced with <i>STRING3</i>.</p>
+ <p>E.g. <code>"foobar".regexpReplace("oo|ar", "")</code> returns "fb".</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
STRING1.overlay(STRING2, INT1)
STRING1.overlay(STRING2, INT1, INT2)
{% endhighlight %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 126cc5f..626012c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -565,6 +565,13 @@ trait ImplicitExpressionOperations {
Overlay(expr, newString, starting, length)
/**
+ * Returns a string with all substrings that match the regular expression consecutively
+ * being replaced.
+ */
+ def regexpReplace(regex: Expression, replacement: Expression) =
+ RegexpReplace(expr, regex, replacement)
+
+ /**
* Returns the base string decoded with base64.
*/
def fromBase64() = FromBase64(expr)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 899cb0f..f15da16 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -133,6 +133,13 @@ object BuiltInMethods {
val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
+ val REGEXP_REPLACE = Types.lookupMethod(
+ classOf[ScalarFunctions],
+ "regexp_replace",
+ classOf[String],
+ classOf[String],
+ classOf[String])
+
val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index c7eb869..b6eb3e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -147,6 +147,12 @@ object FunctionGenerator {
BuiltInMethod.OVERLAY.method)
addSqlFunctionMethod(
+ REGEXP_REPLACE,
+ Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethods.REGEXP_REPLACE)
+
+ addSqlFunctionMethod(
FROM_BASE64,
Seq(STRING_TYPE_INFO),
STRING_TYPE_INFO,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 7079440..5734b36 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -359,6 +359,30 @@ case class Rpad(text: Expression, len: Expression, pad: Expression)
}
/**
+ * Returns a string with all substrings that match the regular expression consecutively
+ * being replaced.
+ */
+case class RegexpReplace(str: Expression, regex: Expression, replacement: Expression)
+ extends Expression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ override private[flink] def children: Seq[Expression] = Seq(str, regex, replacement)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(ScalarSqlFunctions.REGEXP_REPLACE, children.map(_.toRexNode))
+ }
+
+ override def toString: String = s"($str).regexp_replace($regex, $replacement)"
+}
+
+/**
* Returns the base string decoded with base64.
* Returns NULL If the input string is NULL.
*/
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index db67e39..249eb8a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -187,6 +187,16 @@ object ScalarSqlFunctions {
SqlFunctionCategory.TIMEDATE
)
+ val REGEXP_REPLACE = new SqlFunction(
+ "REGEXP_REPLACE",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.cascade(
+ ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ InferTypes.RETURN_TYPE,
+ OperandTypes.STRING_STRING_STRING,
+ SqlFunctionCategory.STRING
+ )
+
val FROM_BASE64 = new SqlFunction(
"FROM_BASE64",
SqlKind.OTHER_FUNCTION,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index fa6f020..001fb98 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.functions
import java.lang.{StringBuilder, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
+import java.util.regex.Matcher
import org.apache.commons.codec.binary.{Base64, Hex}
import org.apache.commons.lang3.StringUtils
@@ -205,6 +206,19 @@ object ScalarFunctions {
new String(data)
}
+
+ /**
+ * Returns a string resulting from replacing all substrings
+ * that match the regular expression with replacement.
+ */
+ def regexp_replace(str: String, regex: String, replacement: String): String = {
+ if (str == null || regex == null || replacement == null) {
+ return null
+ }
+
+ str.replaceAll(regex, Matcher.quoteReplacement(replacement))
+ }
+
/**
* Returns the base string decoded with base64.
*/
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index fe508aa..c60f979 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -208,6 +208,7 @@ object FunctionCatalog {
"ltrim" -> classOf[LTrim],
"rtrim" -> classOf[RTrim],
"repeat" -> classOf[Repeat],
+ "regexpReplace" -> classOf[RegexpReplace],
// math functions
"plus" -> classOf[Plus],
@@ -461,6 +462,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
ScalarSqlFunctions.LTRIM,
ScalarSqlFunctions.RTRIM,
ScalarSqlFunctions.REPEAT,
+ ScalarSqlFunctions.REGEXP_REPLACE,
// EXTENSIONS
BasicOperatorTable.TUMBLE,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index fbd9b02..e8708cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -551,6 +551,70 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
@Test
+ def testRegexpReplace(): Unit = {
+
+ testAllApis(
+ "foobar".regexpReplace("oo|ar", "abc"),
+ "'foobar'.regexpReplace('oo|ar', 'abc')",
+ "regexp_replace('foobar', 'oo|ar', 'abc')",
+ "fabcbabc")
+
+ testAllApis(
+ "foofar".regexpReplace("^f", ""),
+ "'foofar'.regexpReplace('^f', '')",
+ "regexp_replace('foofar', '^f', '')",
+ "oofar")
+
+ testAllApis(
+ "foobar".regexpReplace("^f*.*r$", ""),
+ "'foobar'.regexpReplace('^f*.*r$', '')",
+ "regexp_replace('foobar', '^f*.*r$', '')",
+ "")
+
+ testAllApis(
+ "foo1bar2".regexpReplace("\\d", ""),
+ "'foo1bar2'.regexpReplace('\\d', '')",
+ "regexp_replace('foobar', '\\d', '')",
+ "foobar")
+
+ testAllApis(
+ "foobar".regexpReplace("\\w", ""),
+ "'foobar'.regexpReplace('\\w', '')",
+ "regexp_replace('foobar', '\\w', '')",
+ "")
+
+ testAllApis(
+ "fooobar".regexpReplace("oo", "$"),
+ "'fooobar'.regexpReplace('oo', '$')",
+ "regexp_replace('fooobar', 'oo', '$')",
+ "f$obar")
+
+ testAllApis(
+ "foobar".regexpReplace("oo", "\\"),
+ "'foobar'.regexpReplace('oo', '\\')",
+ "regexp_replace('foobar', 'oo', '\\')",
+ "f\\bar")
+
+ testAllApis(
+ 'f33.regexpReplace("oo|ar", ""),
+ "f33.regexpReplace('oo|ar', '')",
+ "REGEXP_REPLACE(f33, 'oo|ar', '')",
+ "null")
+
+ testAllApis(
+ "foobar".regexpReplace('f33, ""),
+ "'foobar'.regexpReplace(f33, '')",
+ "REGEXP_REPLACE('foobar', f33, '')",
+ "null")
+
+ testAllApis(
+ "foobar".regexpReplace("oo|ar", 'f33),
+ "'foobar'.regexpReplace('oo|ar', f33)",
+ "REGEXP_REPLACE('foobar', 'oo|ar', f33)",
+ "null")
+ }
+
+ @Test
def testFromBase64(): Unit = {
testAllApis(
'f35.fromBase64(),
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 4a79a61..b49922b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -154,6 +154,7 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi(
"REPEAT('This is a test String.', 2)",
"This is a test String.This is a test String.")
+ testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
}
@Test