You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/08/25 01:38:22 UTC

[flink] branch master updated: [FLINK-10136] [table] Add REPEAT function in Table API and SQL

This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 505dca1  [FLINK-10136] [table] Add REPEAT function in Table API and SQL
505dca1 is described below

commit 505dca174128ebb3bf765778ee36d58f680d6a1e
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Aug 21 20:35:23 2018 +0800

    [FLINK-10136] [table] Add REPEAT function in Table API and SQL
    
    This closes #6597.
---
 docs/dev/table/functions.md                        | 36 ++++++++++++++++++++
 .../flink/table/api/scala/expressionDsl.scala      |  5 +++
 .../flink/table/codegen/calls/BuiltInMethods.scala |  6 ++++
 .../table/codegen/calls/FunctionGenerator.scala    |  6 ++++
 .../table/expressions/stringExpressions.scala      | 28 ++++++++++++++++
 .../table/functions/sql/ScalarSqlFunctions.scala   |  8 +++++
 .../table/runtime/functions/ScalarFunctions.scala  |  7 ++++
 .../flink/table/validate/FunctionCatalog.scala     |  2 ++
 .../table/expressions/ScalarFunctionsTest.scala    | 39 ++++++++++++++++++++++
 .../table/expressions/SqlExpressionTest.scala      |  3 ++
 10 files changed, 140 insertions(+)

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 24d8d70..e41fe45 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2404,6 +2404,18 @@ RTRIM(string)
     <tr>
       <td>
         {% highlight text %}
+REPEAT(string, integer)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string that repeats the base <i>string</i> <i>integer</i> times.</p> 
+        <p>E.g., <code>REPEAT('This is a test String.', 2)</code> returns "This is a test String.This is a test String.".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
 OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ])
 {% endhighlight %}
       </td>
@@ -2614,6 +2626,18 @@ STRING.rtrim()
         <p>E.g., <code>'This is a test String. '.rtrim()</code> returns "This is a test String.".</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string that repeats the base <i>STRING</i> <i>INT</i> times.</p> 
+        <p>E.g., <code>'This is a test String.'.repeat(2)</code> returns "This is a test String.This is a test String.".</p>
+      </td>
+    </tr>    
     
     <tr>
       <td>
@@ -2833,6 +2857,18 @@ STRING.rtrim()
     <tr>
       <td>
         {% highlight scala %}
+STRING.repeat(INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string that repeats the base <i>STRING</i> <i>INT</i> times.</p> 
+        <p>E.g., <code>"This is a test String.".repeat(2)</code> returns "This is a test String.This is a test String.".</p>
+      </td>
+    </tr> 
+
+    <tr>
+      <td>
+        {% highlight scala %}
 STRING1.overlay(STRING2, INT1)
 STRING1.overlay(STRING2, INT1, INT2)
 {% endhighlight %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 429c37f..cf8cd91 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -584,6 +584,11 @@ trait ImplicitExpressionOperations {
     */
   def rtrim() = RTrim(expr)
 
+  /**
+    * Returns a string that repeats the base string n times.
+    */
+  def repeat(n: Expression) = Repeat(expr, n)
+
   // Temporal operations
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 7eb91d3..899cb0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -141,4 +141,10 @@ object BuiltInMethods {
   val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String])
 
   val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid")
+
+  val REPEAT: Method = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "repeat",
+    classOf[String],
+    classOf[Int])
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7c328c9..c7eb869 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -176,6 +176,12 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethod.RTRIM.method)
 
+  addSqlFunctionMethod(
+    REPEAT,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.REPEAT)
+
   // ----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index b2d7a3d..7079440 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -459,3 +459,31 @@ case class RTrim(child: Expression) extends UnaryExpression with InputTypeSpec {
 
   override def toString = s"($child).rtrim"
 }
+
+/**
+  * Returns a string that repeats the base str n times.
+  */
+case class Repeat(str: Expression, n: Expression) extends Expression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO)
+
+  override private[flink] def children: Seq[Expression] = Seq(str, n)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.REPEAT, str.toRexNode, n.toRexNode)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && n.resultType == INT_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Repeat operator requires (String, Int) input, " +
+        s"but ($str, $n) is of type (${str.resultType}, ${n.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).repeat($n)"
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index d419000..db67e39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -222,4 +222,12 @@ object ScalarSqlFunctions {
     OperandTypes.STRING,
     SqlFunctionCategory.STRING)
 
+  val REPEAT = new SqlFunction(
+    "REPEAT",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
+    SqlFunctionCategory.STRING)
+
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index d92af7a..1aadf31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,7 @@ import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.commons.codec.binary.{Base64, Hex}
+import org.apache.commons.lang3.StringUtils
 
 import scala.annotation.varargs
 
@@ -227,4 +228,10 @@ object ScalarFunctions {
     * Returns an UUID string using Java utilities.
     */
   def uuid(): String = java.util.UUID.randomUUID().toString
+
+  /**
+    * Returns a string that repeats the base string n times.
+    */
+  def repeat(base: String, n: Int): String = StringUtils.repeat(base, n)
+
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 67b8a0e..fe508aa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -207,6 +207,7 @@ object FunctionCatalog {
     "uuid" -> classOf[UUID],
     "ltrim" -> classOf[LTrim],
     "rtrim" -> classOf[RTrim],
+    "repeat" -> classOf[Repeat],
 
     // math functions
     "plus" -> classOf[Plus],
@@ -459,6 +460,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.UUID,
     ScalarSqlFunctions.LTRIM,
     ScalarSqlFunctions.RTRIM,
+    ScalarSqlFunctions.REPEAT,
 
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 27b8afb..5038254 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -680,6 +680,45 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "null")
   }
 
+  @Test
+  def testRepeat(): Unit = {
+    testAllApis(
+      'f0.repeat(1),
+      "f0.repeat(1)",
+      "REPEAT(f0, 1)",
+      "This is a test String.")
+
+    testAllApis(
+      'f0.repeat(2),
+      "f0.repeat(2)",
+      "REPEAT(f0, 2)",
+      "This is a test String.This is a test String.")
+
+    testAllApis(
+      'f0.repeat(0),
+      "f0.repeat(0)",
+      "REPEAT(f0, 0)",
+      "")
+
+    testAllApis(
+      'f0.repeat(-1),
+      "f0.repeat(-1)",
+      "REPEAT(f0, -1)",
+      "")
+
+    testAllApis(
+      'f33.repeat(2),
+      "f33.repeat(2)",
+      "REPEAT(f33, 2)",
+      "null")
+
+    testAllApis(
+      "".repeat(1),
+      "''.repeat(1)",
+      "REPEAT('', 2)",
+      "")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Math functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 94fabbe..4a79a61 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -151,6 +151,9 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("RPAD('hi',4,'??')", "hi??")
     testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world")
     testSqlApi("TO_BASE64('hello world')", "aGVsbG8gd29ybGQ=")
+    testSqlApi(
+      "REPEAT('This is a test String.', 2)",
+      "This is a test String.This is a test String.")
   }
 
   @Test