You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/07/26 08:35:50 UTC
[flink] branch master updated: [FLINK-9916] Add FROM_BASE64
function for table/sql API
This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 724857c [FLINK-9916] Add FROM_BASE64 function for table/sql API
724857c is described below
commit 724857c3636e923037ea02131b05e03a8c7ca520
Author: yanghua <ya...@gmail.com>
AuthorDate: Mon Jul 23 23:38:23 2018 +0800
[FLINK-9916] Add FROM_BASE64 function for table/sql API
This closes #6397.
---
docs/dev/table/sql.md | 10 ++++++++
docs/dev/table/tableApi.md | 11 +++++++++
.../flink/table/api/scala/expressionDsl.scala | 5 ++++
.../flink/table/codegen/calls/BuiltInMethods.scala | 2 ++
.../table/codegen/calls/FunctionGenerator.scala | 6 +++++
.../table/expressions/stringExpressions.scala | 28 +++++++++++++++++++++-
.../table/functions/sql/ScalarSqlFunctions.scala | 11 +++++++++
.../table/runtime/functions/ScalarFunctions.scala | 8 +++++++
.../flink/table/validate/FunctionCatalog.scala | 2 ++
.../table/expressions/ScalarFunctionsTest.scala | 22 +++++++++++++++++
.../expressions/utils/ScalarTypesTestBase.scala | 6 +++--
11 files changed, 108 insertions(+), 3 deletions(-)
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 5b83e9d..1ed06f0 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1828,6 +1828,16 @@ RPAD(text string, len integer, pad string)
<p>Returns the string text right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code>, <code>RPAD('hi',1,'??')</code> returns <code>h</code>.</p>
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight text %}
+FROM_BASE64(text string)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the base string decoded with base64, if text is NULL, returns NULL. E.g. <code>FROM_BASE64('aGVsbG8gd29ybGQ=')</code> returns <code>hello world</code>.</p>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 2f65145..b1b8f60 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2463,6 +2463,17 @@ STRING.rpad(len INT, pad STRING)
<p>Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. E.g. "hi".rpad(4, '??') returns "hi??", "hi".rpad(1, '??') returns "h".</p>
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight java %}
+STRING.fromBase64()
+{% endhighlight %}
+ </td>
+
+ <td>
+ <p>Returns the base string decoded with base64, if string is null, returns null. E.g. "aGVsbG8gd29ybGQ=".fromBase64() returns "hello world".</p>
+ </td>
+ </tr>
<tr>
<td>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index b0bc5b6..62c62b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -539,6 +539,11 @@ trait ImplicitExpressionOperations {
def overlay(newString: Expression, starting: Expression, length: Expression) =
Overlay(expr, newString, starting, length)
+ /**
+ * Returns the base string decoded with base64.
+ */
+ def fromBase64() = FromBase64(expr)
+
// Temporal operations
/**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index ea4f0fd..22298da 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -110,4 +110,6 @@ object BuiltInMethods {
classOf[String])
val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
+
+ val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index bd75617..d264cce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -146,6 +146,12 @@ object FunctionGenerator {
STRING_TYPE_INFO,
BuiltInMethod.OVERLAY.method)
+ addSqlFunctionMethod(
+ FROM_BASE64,
+ Seq(STRING_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethods.FROMBASE64)
+
// ----------------------------------------------------------------------------------------------
// Arithmetic functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index e69485f..87d251d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -61,7 +61,7 @@ case class InitCap(child: Expression) extends UnaryExpression {
if (child.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
- ValidationFailure(s"InitCap operator requires String input, " +
+ ValidationFailure(s"InitCap operator requires String input, " +
s"but $child is of type ${child.resultType}")
}
}
@@ -357,3 +357,29 @@ case class Rpad(text: Expression, len: Expression, pad: Expression)
relBuilder.call(ScalarSqlFunctions.RPAD, children.map(_.toRexNode))
}
}
+
+/**
+ * Returns the base string decoded with base64.
+ * Returns NULL If the input string is NULL.
+ */
+case class FromBase64(child: Expression) extends UnaryExpression with InputTypeSpec {
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+ override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == STRING_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"FromBase64 operator requires String input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(ScalarSqlFunctions.FROM_BASE64, child.toRexNode)
+ }
+
+ override def toString: String = s"($child).fromBase64"
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 69f430b..1af1e68 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -157,4 +157,15 @@ object ScalarSqlFunctions {
OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING),
SqlFunctionCategory.TIMEDATE
)
+
+ val FROM_BASE64 = new SqlFunction(
+ "FROM_BASE64",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.cascade(
+ ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ InferTypes.RETURN_TYPE,
+ OperandTypes.family(SqlTypeFamily.STRING),
+ SqlFunctionCategory.STRING
+ )
+
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 2e7c9f6..40f1ec3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,8 @@ import scala.annotation.varargs
import java.math.{BigDecimal => JBigDecimal}
import java.lang.StringBuilder
+import org.apache.commons.codec.binary.Base64
+
/**
* Built-in scalar runtime functions.
*/
@@ -182,4 +184,10 @@ object ScalarFunctions {
new String(data)
}
+
+ /**
+ * Returns the base string decoded with base64.
+ */
+ def fromBase64(str: String): String = new String(Base64.decodeBase64(str))
+
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 3184e00..b4f0424 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -202,6 +202,7 @@ object FunctionCatalog {
"concat_ws" -> classOf[ConcatWs],
"lpad" -> classOf[Lpad],
"rpad" -> classOf[Rpad],
+ "fromBase64" -> classOf[FromBase64],
// math functions
"plus" -> classOf[Plus],
@@ -443,6 +444,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
ScalarSqlFunctions.SHA384,
ScalarSqlFunctions.SHA512,
ScalarSqlFunctions.SHA2,
+ ScalarSqlFunctions.FROM_BASE64,
// EXTENSIONS
BasicOperatorTable.TUMBLE,
BasicOperatorTable.HOP,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 8b0c380..995762a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -450,6 +450,28 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
"1111111111111111111111111111111111111111111111111111111111111111")
}
+ @Test
+ def testFromBase64(): Unit = {
+ testAllApis(
+ 'f35.fromBase64(),
+ "f35.fromBase64()",
+ "from_base64(f35)",
+ "hello world")
+
+ testAllApis(
+ 'f35.fromBase64(),
+ "f35.fromBase64()",
+ "FROM_BASE64(f35)",
+ "hello world")
+
+ //null test
+ testAllApis(
+ 'f33.fromBase64(),
+ "f33.fromBase64()",
+ "FROM_BASE64(f33)",
+ "null")
+ }
+
// ----------------------------------------------------------------------------------------------
// Math functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
index 2a90a90..6ad59b1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
class ScalarTypesTestBase extends ExpressionTestBase {
def testData: Row = {
- val testData = new Row(35)
+ val testData = new Row(36)
testData.setField(0, "This is a test String.")
testData.setField(1, true)
testData.setField(2, 42.toByte)
@@ -64,6 +64,7 @@ class ScalarTypesTestBase extends ExpressionTestBase {
testData.setField(32, -1)
testData.setField(33, null)
testData.setField(34, 256)
+ testData.setField(35, "aGVsbG8gd29ybGQ=")
testData
}
@@ -103,6 +104,7 @@ class ScalarTypesTestBase extends ExpressionTestBase {
Types.DECIMAL,
Types.INT,
Types.STRING,
- Types.INT).asInstanceOf[TypeInformation[Any]]
+ Types.INT,
+ Types.STRING).asInstanceOf[TypeInformation[Any]]
}
}