You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/09 10:05:30 UTC

flink git commit: [FLINK-3497] [table] Add POSITION/OVERLAY functions

Repository: flink
Updated Branches:
  refs/heads/master bdd3c0d94 -> 95b673f0b


[FLINK-3497] [table] Add POSITION/OVERLAY functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95b673f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95b673f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95b673f0

Branch: refs/heads/master
Commit: 95b673f0bbb302f6f2ed274183941d063fc1feca
Parents: bdd3c0d
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 9 11:56:12 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Sep 9 12:04:42 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 46 ++++++++++++
 .../flink/api/scala/table/expressionDsl.scala   | 54 ++++++++++----
 .../table/codegen/calls/ScalarFunctions.scala   | 18 +++++
 .../table/expressions/stringExpressions.scala   | 77 ++++++++++++++++----
 .../api/table/validate/FunctionCatalog.scala    |  4 +-
 .../table/expressions/ScalarFunctionsTest.scala | 30 ++++++++
 6 files changed, 199 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95b673f0/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 9272ea3..59998f0 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1460,6 +1460,29 @@ STRING.similar(STRING)
     <tr>
       <td>
         {% highlight java %}
+STRING.position(STRING)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the position of string in an other string starting at 1. Returns 0 if string could not be found. E.g. <code>'a'.position('bbbbba')</code> leads to 6.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.overlay(STRING, INT)
+STRING.overlay(STRING, INT, INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Replaces a substring of string with a string starting at a position (starting at 1). An optional length specifies how many characters should be removed. E.g. <code>'xxxxxtest'.overlay('xxxx', 6)</code> leads to "xxxxxxxxx", <code>'xxxxxtest'.overlay('xxxx', 6, 2)</code> leads to "xxxxxxxxxst".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
 STRING.toDate()
 {% endhighlight %}
       </td>
@@ -1822,6 +1845,29 @@ STRING.similar(STRING)
     <tr>
       <td>
         {% highlight scala %}
+STRING.position(STRING)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the position of string in an other string starting at 1. Returns 0 if string could not be found. E.g. <code>"a".position("bbbbba")</code> leads to 6.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+STRING.overlay(STRING, INT)
+STRING.overlay(STRING, INT, INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Replaces a substring of string with a string starting at a position (starting at 1). An optional length specifies how many characters should be removed. E.g. <code>"xxxxxtest".overlay("xxxx", 6)</code> leads to "xxxxxxxxx", <code>"xxxxxtest".overlay('xxxx', 6, 2)</code> leads to "xxxxxxxxxst".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 STRING.toDate
 {% endhighlight %}
       </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/95b673f0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 942b07e..449f9e2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -157,7 +157,7 @@ trait ImplicitExpressionOperations {
     * @return substring
     */
   def substring(beginIndex: Expression, length: Expression) =
-    SubString(expr, beginIndex, length)
+    Substring(expr, beginIndex, length)
 
   /**
     * Creates a substring of the given string beginning at the given index to the end.
@@ -166,14 +166,14 @@ trait ImplicitExpressionOperations {
     * @return substring
     */
   def substring(beginIndex: Expression) =
-    new SubString(expr, beginIndex)
+    new Substring(expr, beginIndex)
 
   /**
     * Removes leading and/or trailing characters from the given string.
     *
     * @param removeLeading if true, remove leading characters (default: true)
     * @param removeTrailing if true, remove trailing characters (default: true)
-    * @param character String containing the character (default: " ")
+    * @param character string containing the character (default: " ")
     * @return trimmed string
     */
   def trim(
@@ -192,56 +192,80 @@ trait ImplicitExpressionOperations {
   }
 
   /**
-    * Returns the length of a String.
+    * Returns the length of a string.
     */
   def charLength() = CharLength(expr)
 
   /**
-    * Returns all of the characters in a String in upper case using the rules of
+    * Returns all of the characters in a string in upper case using the rules of
     * the default locale.
     */
   def upperCase() = Upper(expr)
 
   /**
-    * Returns all of the characters in a String in lower case using the rules of
+    * Returns all of the characters in a string in lower case using the rules of
     * the default locale.
     */
   def lowerCase() = Lower(expr)
 
   /**
-    * Converts the initial letter of each word in a String to uppercase.
-    * Assumes a String containing only [A-Za-z0-9], everything else is treated as whitespace.
+    * Converts the initial letter of each word in a string to uppercase.
+    * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.
     */
   def initCap() = InitCap(expr)
 
   /**
-    * Returns true, if a String matches the specified LIKE pattern.
+    * Returns true, if a string matches the specified LIKE pattern.
     *
-    * e.g. "Jo_n%" matches all Strings that start with "Jo(arbitrary letter)n"
+    * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
     */
   def like(pattern: Expression) = Like(expr, pattern)
 
   /**
-    * Returns true, if a String matches the specified SQL regex pattern.
+    * Returns true, if a string matches the specified SQL regex pattern.
     *
-    * e.g. "A+" matches all Strings that consist of at least one A
+    * e.g. "A+" matches all strings that consist of at least one A
     */
   def similar(pattern: Expression) = Similar(expr, pattern)
 
+  /**
+    * Returns the position of string in an other string starting at 1.
+    * Returns 0 if string could not be found.
+    *
+    * e.g. "a".position("bbbbba") leads to 6
+    */
+  def position(haystack: Expression) = Position(expr, haystack)
+
+  /**
+    * Replaces a substring of string with a string starting at a position (starting at 1).
+    *
+    * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
+    */
+  def overlay(newString: Expression, starting: Expression) = new Overlay(expr, newString, starting)
+
+  /**
+    * Replaces a substring of string with a string starting at a position (starting at 1).
+    * The length specifies how many characters should be removed.
+    *
+    * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
+    */
+  def overlay(newString: Expression, starting: Expression, length: Expression) =
+    Overlay(expr, newString, starting, length)
+
   // Temporal operations
 
   /**
-    * Parses a date String in the form "yy-mm-dd" to a SQL Date.
+    * Parses a date string in the form "yy-mm-dd" to a SQL Date.
     */
   def toDate = Cast(expr, SqlTimeTypeInfo.DATE)
 
   /**
-    * Parses a time String in the form "hh:mm:ss" to a SQL Time.
+    * Parses a time string in the form "hh:mm:ss" to a SQL Time.
     */
   def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
 
   /**
-    * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
+    * Parses a timestamp string in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp.
     */
   def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/95b673f0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index 24e8290..d8bd4c1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -113,6 +113,24 @@ object ScalarFunctions {
     Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
     BuiltInMethod.SIMILAR.method)
 
+  addSqlFunctionMethod(
+    POSITION,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
+    INT_TYPE_INFO,
+    BuiltInMethod.POSITION.method)
+
+  addSqlFunctionMethod(
+    OVERLAY,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.OVERLAY.method)
+
+  addSqlFunctionMethod(
+    OVERLAY,
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.OVERLAY.method)
+
   // ----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/95b673f0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
index aee7d67..a692c9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -144,9 +144,9 @@ case class Similar(str: Expression, pattern: Expression) extends BinaryExpressio
 }
 
 /**
-  * Returns subString of `str` from `begin`(inclusive) for `length`.
+  * Returns substring of `str` from `begin`(inclusive) for `length`.
   */
-case class SubString(
+case class Substring(
     str: Expression,
     begin: Expression,
     length: Expression) extends Expression with InputTypeSpec {
@@ -160,7 +160,7 @@ case class SubString(
   override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
     Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
 
-  override def toString: String = s"$str.subString($begin, $length)"
+  override def toString: String = s"($str).substring($begin, $length)"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(SqlStdOperatorTable.SUBSTRING, children.map(_.toRexNode))
@@ -193,7 +193,7 @@ case class Trim(
     }
   }
 
-  override def toString: String = s"trim($trimMode, $trimString, $str)"
+  override def toString: String = s"($str).trim($trimMode, $trimString)"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(SqlStdOperatorTable.TRIM, children.map(_.toRexNode))
@@ -210,21 +210,70 @@ object TrimConstants {
 /**
   * Returns str with all characters changed to uppercase.
   */
-case class Upper(child: Expression) extends UnaryExpression {
+case class Upper(child: Expression) extends UnaryExpression with InputTypeSpec {
+
   override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
 
-  override private[flink] def validateInput(): ExprValidationResult = {
-    if (child.resultType == STRING_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Upper operator requires String input, " +
-        s"but $child is of type ${child.resultType}")
-    }
-  }
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO)
 
-  override def toString: String = s"($child).toUpperCase()"
+  override def toString: String = s"($child).upperCase()"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     relBuilder.call(SqlStdOperatorTable.UPPER, child.toRexNode)
   }
 }
+
+/**
+  * Returns the position of string needle in string haystack.
+  */
+case class Position(needle: Expression, haystack: Expression)
+    extends Expression with InputTypeSpec {
+
+  override private[flink] def children: Seq[Expression] = Seq(needle, haystack)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString: String = s"($needle).position($haystack)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.POSITION, needle.toRexNode, haystack.toRexNode)
+  }
+}
+
+/**
+  * Replaces a substring of a string with a replacement string.
+  * Starting at a position for a given length.
+  */
+case class Overlay(
+    str: Expression,
+    replacement: Expression,
+    starting: Expression,
+    position: Expression)
+  extends Expression with InputTypeSpec {
+
+  def this(str: Expression, replacement: Expression, starting: Expression) =
+    this(str, replacement, starting, CharLength(replacement))
+
+  override private[flink] def children: Seq[Expression] =
+    Seq(str, replacement, starting, position)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  override def toString: String = s"($str).overlay($replacement, $starting, $position)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(
+      SqlStdOperatorTable.OVERLAY,
+      str.toRexNode,
+      replacement.toRexNode,
+      starting.toRexNode,
+      position.toRexNode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95b673f0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 9808672..ef49356 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -136,9 +136,11 @@ object FunctionCatalog {
     "like" -> classOf[Like],
     "lowerCase" -> classOf[Lower],
     "similar" -> classOf[Similar],
-    "subString" -> classOf[SubString],
+    "substring" -> classOf[Substring],
     "trim" -> classOf[Trim],
     "upperCase" -> classOf[Upper],
+    "position" -> classOf[Position],
+    "overlay" -> classOf[Overlay],
 
     // math functions
     "abs" -> classOf[Abs],

http://git-wip-us.apache.org/repos/asf/flink/blob/95b673f0/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 516bfca..2d630ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -34,6 +34,36 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   @Test
+  def testOverlay(): Unit = {
+    testAllApis(
+      "xxxxxtest".overlay("xxxx", 6),
+      "'xxxxxtest'.overlay('xxxx', 6)",
+      "OVERLAY('xxxxxtest' PLACING 'xxxx' FROM 6)",
+      "xxxxxxxxx")
+
+    testAllApis(
+      "xxxxxtest".overlay("xxxx", 6, 2),
+      "'xxxxxtest'.overlay('xxxx', 6, 2)",
+      "OVERLAY('xxxxxtest' PLACING 'xxxx' FROM 6 FOR 2)",
+      "xxxxxxxxxst")
+  }
+
+  @Test
+  def testPosition(): Unit = {
+    testAllApis(
+      "test".position("xxxtest"),
+      "'test'.position('xxxtest')",
+      "POSITION('test' IN 'xxxtest')",
+      "4")
+
+    testAllApis(
+      "testx".position("xxxtest"),
+      "'testx'.position('xxxtest')",
+      "POSITION('testx' IN 'xxxtest')",
+      "0")
+  }
+
+  @Test
   def testSubstring(): Unit = {
     testAllApis(
       'f0.substring(2),