You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/09 21:35:32 UTC

[GitHub] asfgit closed pull request #6448: [FLINK-9990] [table] Add regexp_extract supported in TableAPI and SQL

asfgit closed pull request #6448: [FLINK-9990] [table] Add regexp_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index f888d790deb..6491c81b32e 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2496,6 +2496,19 @@ REPLACE(string1, string2, string3)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight text %}
+REGEXP_EXTRACT(string1, string2[, integer])
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string from <i>string1</i> which extracted with a specified regular expression <i>string2</i> and a regex match group index <i>integer</i>.</p> 
+        <p><b>Note:</b> The regex match group index starts from 1 and 0 means matching the whole regex. In addition, The regex match group index should not exceed the number of the defined groups.</p> 
+        <p>E.g. <code>REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)"</code> returns "bar".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight text %}
@@ -2748,6 +2761,19 @@ STRING1.replace(STRING2, STRING3)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+STRING1.regexpExtract(STRING2[, INTEGER1])
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string from <i>STRING1</i> which extracted with a specified regular expression <i>STRING2</i> and a regex match group index <i>INTEGER1</i>.</p>
+        <p><b>Note:</b> The regex match group index starts from 1 and 0 means matching the whole regex. In addition, The regex match group index should not exceed the number of the defined groups.</p> 
+        <p>E.g. <code>'foothebar'.regexpExtract('foo(.*?)(bar)', 2)"</code> returns "bar".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight java %}
@@ -2999,6 +3025,19 @@ STRING1.replace(STRING2, STRING3)
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+STRING1.regexpExtract(STRING2[, INTEGER1])
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string from <i>STRING1</i> which extracted with a specified regular expression <i>STRING2</i> and a regex match group index <i>INTEGER1</i>.</p>
+        <p><b>Note:</b> The regex match group index starts from 1 and 0 means matching the whole regex. In addition, The regex match group index should not exceed the number of the defined groups.</p>
+        <p>E.g. <code>"foothebar".regexpExtract("foo(.*?)(bar)", 2)"</code> returns "bar".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight scala %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index e638051a2bd..e1947c37acf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -584,6 +584,18 @@ trait ImplicitExpressionOperations {
   def regexpReplace(regex: Expression, replacement: Expression) =
     RegexpReplace(expr, regex, replacement)
 
+  /**
+    * Returns a string extracted with a specified regular expression and a regex match group index.
+    */
+  def regexpExtract(regex: Expression, extractIndex: Expression) =
+    RegexpExtract(expr, regex, extractIndex)
+
+  /**
+    * Returns a string extracted with a specified regular expression.
+    */
+  def regexpExtract(regex: Expression) =
+    RegexpExtract(expr, regex, null)
+
   /**
     * Returns the base string decoded with base64.
     */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 34be86e0400..7781b57825e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -143,6 +143,19 @@ object BuiltInMethods {
     classOf[String],
     classOf[String])
 
+  val REGEXP_EXTRACT = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "regexp_extract",
+    classOf[String],
+    classOf[String],
+    classOf[Integer])
+
+  val REGEXP_EXTRACT_WITHOUT_INDEX = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "regexp_extract",
+    classOf[String],
+    classOf[String])
+
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
 
   val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 58c476a36e0..6404813702f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -159,6 +159,18 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethod.REPLACE.method)
 
+  addSqlFunctionMethod(
+    REGEXP_EXTRACT,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.REGEXP_EXTRACT)
+
+  addSqlFunctionMethod(
+    REGEXP_EXTRACT,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.REGEXP_EXTRACT_WITHOUT_INDEX)
+
   addSqlFunctionMethod(
     FROM_BASE64,
     Seq(STRING_TYPE_INFO),
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index d0ae246763f..d65de864184 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -382,6 +382,45 @@ case class RegexpReplace(str: Expression, regex: Expression, replacement: Expres
   override def toString: String = s"($str).regexp_replace($regex, $replacement)"
 }
 
+/**
+  * Returns a string extracted with a specified regular expression and a regex match group index.
+  */
+case class RegexpExtract(str: Expression, regex: Expression, extractIndex: Expression)
+  extends Expression with InputTypeSpec {
+  def this(str: Expression, regex: Expression) = this(str, regex, null)
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = {
+    if (extractIndex == null) {
+      Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+    } else {
+      Seq(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO)
+    }
+  }
+
+  override private[flink] def children: Seq[Expression] = {
+    if (extractIndex == null) {
+      Seq(str, regex)
+    } else {
+      Seq(str, regex, extractIndex)
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.REGEXP_EXTRACT, children.map(_.toRexNode))
+  }
+
+  override def toString: String = s"($str).regexp_extract($regex, $extractIndex)"
+}
+
+object RegexpExtract {
+  def apply(str: Expression, regex: Expression): RegexpExtract = RegexpExtract(str, regex, null)
+}
+
 /**
   * Returns the base string decoded with base64.
   * Returns NULL If the input string is NULL.
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index c7b68d046f5..0f594bbcd6c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -205,6 +205,18 @@ object ScalarSqlFunctions {
     SqlFunctionCategory.STRING
   )
 
+  val REGEXP_EXTRACT = new SqlFunction(
+    "REGEXP_EXTRACT",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.or(OperandTypes.STRING_STRING_INTEGER,
+      OperandTypes.STRING_STRING
+    ),
+    SqlFunctionCategory.STRING
+  )
+
   val FROM_BASE64 = new SqlFunction(
     "FROM_BASE64",
     SqlKind.OTHER_FUNCTION,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 0c469926246..1db4da2f899 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,7 @@ import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets
 import java.util.regex.Matcher
+import java.util.regex.Pattern
 
 import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.commons.lang3.StringUtils
@@ -226,6 +227,31 @@ object ScalarFunctions {
     str.replaceAll(regex, Matcher.quoteReplacement(replacement))
   }
 
+  /**
+    * Returns a string extracted with a specified regular expression and a regex match group index.
+    */
+  def regexp_extract(str: String, regex: String, extractIndex: Integer): String = {
+    if (str == null || regex == null) {
+      return null
+    }
+
+    val m = Pattern.compile(regex).matcher(str)
+    if (m.find) {
+      val mr = m.toMatchResult
+      return mr.group(extractIndex)
+    }
+
+    null
+  }
+
+  /**
+    * Returns a string extracted with a specified regular expression and
+    * a optional regex match group index.
+    */
+  def regexp_extract(str: String, regex: String): String = {
+    regexp_extract(str, regex, 0)
+  }
+
   /**
     * Returns the base string decoded with base64.
     */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 218ffec0cda..26a20ff41a4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -203,6 +203,7 @@ object FunctionCatalog {
     "concat_ws" -> classOf[ConcatWs],
     "lpad" -> classOf[Lpad],
     "rpad" -> classOf[Rpad],
+    "regexpExtract" -> classOf[RegexpExtract],
     "fromBase64" -> classOf[FromBase64],
     "toBase64" -> classOf[ToBase64],
     "uuid" -> classOf[UUID],
@@ -462,6 +463,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.SHA384,
     ScalarSqlFunctions.SHA512,
     ScalarSqlFunctions.SHA2,
+    ScalarSqlFunctions.REGEXP_EXTRACT,
     ScalarSqlFunctions.FROM_BASE64,
     ScalarSqlFunctions.TO_BASE64,
     ScalarSqlFunctions.UUID,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 541ad62f94c..646661b9d2b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -647,6 +647,58 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "null")
   }
 
+  @Test
+  def testRegexpExtract(): Unit = {
+    testAllApis(
+      "foothebar".regexpExtract("foo(.*?)(bar)", 2),
+      "'foothebar'.regexpExtract('foo(.*?)(bar)', 2)",
+      "REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)",
+      "bar")
+
+    testAllApis(
+      "foothebar".regexpExtract("foo(.*?)(bar)", 0),
+      "'foothebar'.regexpExtract('foo(.*?)(bar)', 0)",
+      "REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 0)",
+      "foothebar")
+
+    testAllApis(
+      "foothebar".regexpExtract("foo(.*?)(bar)", 1),
+      "'foothebar'.regexpExtract('foo(.*?)(bar)', 1)",
+      "REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 1)",
+      "the")
+
+    testAllApis(
+      "foothebar".regexpExtract("foo([\\w]+)", 1),
+      "'foothebar'.regexpExtract('foo([\\w]+)', 1)",
+      "REGEXP_EXTRACT('foothebar', 'foo([\\w]+)', 1)",
+      "thebar")
+
+    testAllApis(
+      "foothebar".regexpExtract("foo([\\d]+)", 1),
+      "'foothebar'.regexpExtract('foo([\\d]+)', 1)",
+      "REGEXP_EXTRACT('foothebar', 'foo([\\d]+)', 1)",
+      "null")
+
+    testAllApis(
+      'f33.regexpExtract("foo(.*?)(bar)", 2),
+      "f33.regexpExtract('foo(.*?)(bar)', 2)",
+      "REGEXP_EXTRACT(f33, 'foo(.*?)(bar)', 2)",
+      "null")
+
+    testAllApis(
+      "foothebar".regexpExtract('f33, 2),
+      "'foothebar'.regexpExtract(f33, 2)",
+      "REGEXP_EXTRACT('foothebar', f33, 2)",
+      "null")
+
+    //test optional regex match group
+    testAllApis(
+      "foothebar".regexpExtract("foo(.*?)(bar)"),
+      "'foothebar'.regexpExtract('foo(.*?)(bar)')",
+      "REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)')",
+      "foothebar")
+  }
+
   @Test
   def testFromBase64(): Unit = {
     testAllApis(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 8949a2f1f74..93285d0d962 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -157,6 +157,7 @@ class SqlExpressionTest extends ExpressionTestBase {
       "This is a test String.This is a test String.")
     testSqlApi("REGEXP_REPLACE('foobar', 'oo|ar', '')", "fb")
     testSqlApi("REPLACE('hello world', 'world', 'flink')", "hello flink")
+    testSqlApi("REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)", "bar")
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services