You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xc...@apache.org on 2018/07/26 08:35:50 UTC

[flink] branch master updated: [FLINK-9916] Add FROM_BASE64 function for table/sql API

This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 724857c  [FLINK-9916] Add FROM_BASE64 function for table/sql API
724857c is described below

commit 724857c3636e923037ea02131b05e03a8c7ca520
Author: yanghua <ya...@gmail.com>
AuthorDate: Mon Jul 23 23:38:23 2018 +0800

    [FLINK-9916] Add FROM_BASE64 function for table/sql API
    
    This closes #6397.
---
 docs/dev/table/sql.md                              | 10 ++++++++
 docs/dev/table/tableApi.md                         | 11 +++++++++
 .../flink/table/api/scala/expressionDsl.scala      |  5 ++++
 .../flink/table/codegen/calls/BuiltInMethods.scala |  2 ++
 .../table/codegen/calls/FunctionGenerator.scala    |  6 +++++
 .../table/expressions/stringExpressions.scala      | 28 +++++++++++++++++++++-
 .../table/functions/sql/ScalarSqlFunctions.scala   | 11 +++++++++
 .../table/runtime/functions/ScalarFunctions.scala  |  8 +++++++
 .../flink/table/validate/FunctionCatalog.scala     |  2 ++
 .../table/expressions/ScalarFunctionsTest.scala    | 22 +++++++++++++++++
 .../expressions/utils/ScalarTypesTestBase.scala    |  6 +++--
 11 files changed, 108 insertions(+), 3 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 5b83e9d..1ed06f0 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1828,6 +1828,16 @@ RPAD(text string, len integer, pad string)
         <p>Returns the string text right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code>, <code>RPAD('hi',1,'??')</code> returns <code>h</code>.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        {% highlight text %}
+FROM_BASE64(text string)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the base string decoded with base64, if text is NULL, returns NULL. E.g. <code>FROM_BASE64('aGVsbG8gd29ybGQ=')</code> returns <code>hello world</code>.</p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 2f65145..b1b8f60 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2463,6 +2463,17 @@ STRING.rpad(len INT, pad STRING)
         <p>Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. E.g. "hi".rpad(4, '??') returns "hi??",  "hi".rpad(1, '??') returns "h".</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.fromBase64()
+{% endhighlight %}
+      </td>
+
+      <td>
+        <p>Returns the base string decoded with base64, if string is null, returns null. E.g. "aGVsbG8gd29ybGQ=".fromBase64() returns "hello world".</p>
+      </td>
+    </tr>
 
     <tr>
       <td>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index b0bc5b6..62c62b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -539,6 +539,11 @@ trait ImplicitExpressionOperations {
   def overlay(newString: Expression, starting: Expression, length: Expression) =
     Overlay(expr, newString, starting, length)
 
+  /**
+    * Returns the base string decoded with base64.
+    */
+  def fromBase64() = FromBase64(expr)
+
   // Temporal operations
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index ea4f0fd..22298da 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -110,4 +110,6 @@ object BuiltInMethods {
     classOf[String])
 
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
+
+  val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index bd75617..d264cce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -146,6 +146,12 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethod.OVERLAY.method)
 
+  addSqlFunctionMethod(
+    FROM_BASE64,
+    Seq(STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.FROMBASE64)
+
   // ----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index e69485f..87d251d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -61,7 +61,7 @@ case class InitCap(child: Expression) extends UnaryExpression {
     if (child.resultType == STRING_TYPE_INFO) {
       ValidationSuccess
     } else {
-      ValidationFailure(s"InitCap operator requires String input, " + 
+      ValidationFailure(s"InitCap operator requires String input, " +
         s"but $child is of type ${child.resultType}")
     }
   }
@@ -357,3 +357,29 @@ case class Rpad(text: Expression, len: Expression, pad: Expression)
     relBuilder.call(ScalarSqlFunctions.RPAD, children.map(_.toRexNode))
   }
 }
+
+/**
+  * Returns the base string decoded with base64.
+  * Returns NULL If the input string is NULL.
+  */
+case class FromBase64(child: Expression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"FromBase64 operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.FROM_BASE64, child.toRexNode)
+  }
+
+  override def toString: String = s"($child).fromBase64"
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 69f430b..1af1e68 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -157,4 +157,15 @@ object ScalarSqlFunctions {
     OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING),
     SqlFunctionCategory.TIMEDATE
   )
+
+  val FROM_BASE64 = new SqlFunction(
+    "FROM_BASE64",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.STRING),
+    SqlFunctionCategory.STRING
+  )
+
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 2e7c9f6..40f1ec3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,8 @@ import scala.annotation.varargs
 import java.math.{BigDecimal => JBigDecimal}
 import java.lang.StringBuilder
 
+import org.apache.commons.codec.binary.Base64
+
 /**
   * Built-in scalar runtime functions.
   */
@@ -182,4 +184,10 @@ object ScalarFunctions {
 
     new String(data)
   }
+
+  /**
+    * Returns the base string decoded with base64.
+    */
+  def fromBase64(str: String): String = new String(Base64.decodeBase64(str))
+
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 3184e00..b4f0424 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -202,6 +202,7 @@ object FunctionCatalog {
     "concat_ws" -> classOf[ConcatWs],
     "lpad" -> classOf[Lpad],
     "rpad" -> classOf[Rpad],
+    "fromBase64" -> classOf[FromBase64],
 
     // math functions
     "plus" -> classOf[Plus],
@@ -443,6 +444,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.SHA384,
     ScalarSqlFunctions.SHA512,
     ScalarSqlFunctions.SHA2,
+    ScalarSqlFunctions.FROM_BASE64,
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
     BasicOperatorTable.HOP,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 8b0c380..995762a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -450,6 +450,28 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "1111111111111111111111111111111111111111111111111111111111111111")
   }
 
+  @Test
+  def testFromBase64(): Unit = {
+    testAllApis(
+      'f35.fromBase64(),
+      "f35.fromBase64()",
+      "from_base64(f35)",
+      "hello world")
+
+    testAllApis(
+      'f35.fromBase64(),
+      "f35.fromBase64()",
+      "FROM_BASE64(f35)",
+      "hello world")
+
+    //null test
+    testAllApis(
+      'f33.fromBase64(),
+      "f33.fromBase64()",
+      "FROM_BASE64(f33)",
+      "null")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Math functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
index 2a90a90..6ad59b1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
 class ScalarTypesTestBase extends ExpressionTestBase {
 
   def testData: Row = {
-    val testData = new Row(35)
+    val testData = new Row(36)
     testData.setField(0, "This is a test String.")
     testData.setField(1, true)
     testData.setField(2, 42.toByte)
@@ -64,6 +64,7 @@ class ScalarTypesTestBase extends ExpressionTestBase {
     testData.setField(32, -1)
     testData.setField(33, null)
     testData.setField(34, 256)
+    testData.setField(35, "aGVsbG8gd29ybGQ=")
     testData
   }
 
@@ -103,6 +104,7 @@ class ScalarTypesTestBase extends ExpressionTestBase {
       Types.DECIMAL,
       Types.INT,
       Types.STRING,
-      Types.INT).asInstanceOf[TypeInformation[Any]]
+      Types.INT,
+      Types.STRING).asInstanceOf[TypeInformation[Any]]
   }
 }