You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/08/25 01:38:22 UTC
[flink] branch master updated: [FLINK-10136] [table] Add REPEAT
function in Table API and SQL
This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 505dca1 [FLINK-10136] [table] Add REPEAT function in Table API and SQL
505dca1 is described below
commit 505dca174128ebb3bf765778ee36d58f680d6a1e
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Aug 21 20:35:23 2018 +0800
[FLINK-10136] [table] Add REPEAT function in Table API and SQL
This closes #6597.
---
docs/dev/table/functions.md | 36 ++++++++++++++++++++
.../flink/table/api/scala/expressionDsl.scala | 5 +++
.../flink/table/codegen/calls/BuiltInMethods.scala | 6 ++++
.../table/codegen/calls/FunctionGenerator.scala | 6 ++++
.../table/expressions/stringExpressions.scala | 28 ++++++++++++++++
.../table/functions/sql/ScalarSqlFunctions.scala | 8 +++++
.../table/runtime/functions/ScalarFunctions.scala | 7 ++++
.../flink/table/validate/FunctionCatalog.scala | 2 ++
.../table/expressions/ScalarFunctionsTest.scala | 39 ++++++++++++++++++++++
.../table/expressions/SqlExpressionTest.scala | 3 ++
10 files changed, 140 insertions(+)
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 24d8d70..e41fe45 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -2404,6 +2404,18 @@ RTRIM(string)
<tr>
<td>
{% highlight text %}
+REPEAT(string, integer)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string that repeats the base <i>string</i> <i>integer</i> times.</p>
+ <p>E.g., <code>REPEAT('This is a test String.', 2)</code> returns "This is a test String.This is a test String.".</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ])
{% endhighlight %}
</td>
@@ -2614,6 +2626,18 @@ STRING.rtrim()
<p>E.g., <code>'This is a test String. '.rtrim()</code> returns "This is a test String.".</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+STRING.repeat(INT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string that repeats the base <i>STRING</i> <i>INT</i> times.</p>
+ <p>E.g., <code>'This is a test String.'.repeat(2)</code> returns "This is a test String.This is a test String.".</p>
+ </td>
+ </tr>
<tr>
<td>
@@ -2833,6 +2857,18 @@ STRING.rtrim()
<tr>
<td>
{% highlight scala %}
+STRING.repeat(INT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string that repeats the base <i>STRING</i> <i>INT</i> times.</p>
+ <p>E.g., <code>"This is a test String.".repeat(2)</code> returns "This is a test String.This is a test String.".</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
STRING1.overlay(STRING2, INT1)
STRING1.overlay(STRING2, INT1, INT2)
{% endhighlight %}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 429c37f..cf8cd91 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -584,6 +584,11 @@ trait ImplicitExpressionOperations {
*/
def rtrim() = RTrim(expr)
+ /**
+ * Returns a string that repeats the base string n times.
+ */
+ def repeat(n: Expression) = Repeat(expr, n)
+
// Temporal operations
/**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 7eb91d3..899cb0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -141,4 +141,10 @@ object BuiltInMethods {
val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String])
val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid")
+
+ val REPEAT: Method = Types.lookupMethod(
+ classOf[ScalarFunctions],
+ "repeat",
+ classOf[String],
+ classOf[Int])
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 7c328c9..c7eb869 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -176,6 +176,12 @@ object FunctionGenerator {
STRING_TYPE_INFO,
BuiltInMethod.RTRIM.method)
+ addSqlFunctionMethod(
+ REPEAT,
+ Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethods.REPEAT)
+
// ----------------------------------------------------------------------------------------------
// Arithmetic functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index b2d7a3d..7079440 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -459,3 +459,31 @@ case class RTrim(child: Expression) extends UnaryExpression with InputTypeSpec {
override def toString = s"($child).rtrim"
}
+
+/**
+ * Returns a string that repeats the base str n times.
+ */
+case class Repeat(str: Expression, n: Expression) extends Expression with InputTypeSpec {
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq(STRING_TYPE_INFO, INT_TYPE_INFO)
+
+ override private[flink] def children: Seq[Expression] = Seq(str, n)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(ScalarSqlFunctions.REPEAT, str.toRexNode, n.toRexNode)
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (str.resultType == STRING_TYPE_INFO && n.resultType == INT_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Repeat operator requires (String, Int) input, " +
+ s"but ($str, $n) is of type (${str.resultType}, ${n.resultType})")
+ }
+ }
+
+ override def toString: String = s"($str).repeat($n)"
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index d419000..db67e39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -222,4 +222,12 @@ object ScalarSqlFunctions {
OperandTypes.STRING,
SqlFunctionCategory.STRING)
+ val REPEAT = new SqlFunction(
+ "REPEAT",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ InferTypes.RETURN_TYPE,
+ OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER),
+ SqlFunctionCategory.STRING)
+
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index d92af7a..1aadf31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,7 @@ import java.lang.{StringBuilder, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import org.apache.commons.codec.binary.{Base64, Hex}
+import org.apache.commons.lang3.StringUtils
import scala.annotation.varargs
@@ -227,4 +228,10 @@ object ScalarFunctions {
* Returns an UUID string using Java utilities.
*/
def uuid(): String = java.util.UUID.randomUUID().toString
+
+ /**
+ * Returns a string that repeats the base string n times.
+ */
+ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n)
+
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 67b8a0e..fe508aa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -207,6 +207,7 @@ object FunctionCatalog {
"uuid" -> classOf[UUID],
"ltrim" -> classOf[LTrim],
"rtrim" -> classOf[RTrim],
+ "repeat" -> classOf[Repeat],
// math functions
"plus" -> classOf[Plus],
@@ -459,6 +460,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
ScalarSqlFunctions.UUID,
ScalarSqlFunctions.LTRIM,
ScalarSqlFunctions.RTRIM,
+ ScalarSqlFunctions.REPEAT,
// EXTENSIONS
BasicOperatorTable.TUMBLE,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 27b8afb..5038254 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -680,6 +680,45 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"null")
}
+ @Test
+ def testRepeat(): Unit = {
+ testAllApis(
+ 'f0.repeat(1),
+ "f0.repeat(1)",
+ "REPEAT(f0, 1)",
+ "This is a test String.")
+
+ testAllApis(
+ 'f0.repeat(2),
+ "f0.repeat(2)",
+ "REPEAT(f0, 2)",
+ "This is a test String.This is a test String.")
+
+ testAllApis(
+ 'f0.repeat(0),
+ "f0.repeat(0)",
+ "REPEAT(f0, 0)",
+ "")
+
+ testAllApis(
+ 'f0.repeat(-1),
+ "f0.repeat(-1)",
+ "REPEAT(f0, -1)",
+ "")
+
+ testAllApis(
+ 'f33.repeat(2),
+ "f33.repeat(2)",
+ "REPEAT(f33, 2)",
+ "null")
+
+ testAllApis(
+ "".repeat(1),
+ "''.repeat(1)",
+ "REPEAT('', 2)",
+ "")
+ }
+
// ----------------------------------------------------------------------------------------------
// Math functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 94fabbe..4a79a61 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -151,6 +151,9 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("RPAD('hi',4,'??')", "hi??")
testSqlApi("FROM_BASE64('aGVsbG8gd29ybGQ=')", "hello world")
testSqlApi("TO_BASE64('hello world')", "aGVsbG8gd29ybGQ=")
+ testSqlApi(
+ "REPEAT('This is a test String.', 2)",
+ "This is a test String.This is a test String.")
}
@Test