You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/07/26 08:36:56 UTC

[GitHub] xccui closed pull request #6397: [FLINK-9916] Add FROM_BASE64 function for table/sql API

xccui closed pull request #6397: [FLINK-9916] Add FROM_BASE64 function for table/sql API
URL: https://github.com/apache/flink/pull/6397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 57e0ba5b781..0097b413809 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1828,6 +1828,16 @@ RPAD(text string, len integer, pad string)
         <p>Returns the string text right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code>, <code>RPAD('hi',1,'??')</code> returns <code>h</code>.</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        {% highlight text %}
+FROM_BASE64(text string)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the base string decoded with base64, if text is null, returns null. E.g. <code>FROM_BASE64('aGVsbG8gd29ybGQ=')</code> returns <code>hello world</code>.</p>
+      </td>
+    </tr>
 
   </tbody>
 </table>
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 2f651454903..b1b8f6082d8 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2463,6 +2463,17 @@ STRING.rpad(len INT, pad STRING)
         <p>Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. E.g. "hi".rpad(4, '??') returns "hi??",  "hi".rpad(1, '??') returns "h".</p>
       </td>
     </tr>
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.fromBase64()
+{% endhighlight %}
+      </td>
+
+      <td>
+        <p>Returns the base string decoded with base64, if string is null, returns null. E.g. "aGVsbG8gd29ybGQ=".fromBase64() returns "hello world".</p>
+      </td>
+    </tr>
 
     <tr>
       <td>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index b0bc5b66c33..62c62b13296 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -539,6 +539,11 @@ trait ImplicitExpressionOperations {
   def overlay(newString: Expression, starting: Expression, length: Expression) =
     Overlay(expr, newString, starting, length)
 
+  /**
+    * Returns the base string decoded with base64.
+    */
+  def fromBase64() = FromBase64(expr)
+
   // Temporal operations
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index ea4f0fd2f92..22298dadfd7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -110,4 +110,6 @@ object BuiltInMethods {
     classOf[String])
 
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
+
+  val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index bd75617f474..fd8f8f69bea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -146,6 +146,13 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethod.OVERLAY.method)
 
+  addSqlFunctionMethod(
+    FROM_BASE64,
+    Seq(STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.FROMBASE64
+  )
+
   // ----------------------------------------------------------------------------------------------
   // Arithmetic functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index e69485f0ad9..a242e7c4f4e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -357,3 +357,25 @@ case class Rpad(text: Expression, len: Expression, pad: Expression)
     relBuilder.call(ScalarSqlFunctions.RPAD, children.map(_.toRexNode))
   }
 }
+
+case class FromBase64(child: Expression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"FromBase64 operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.FROM_BASE64, children.map(_.toRexNode))
+  }
+
+  override def toString: String = s"($child).fromBase64"
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 69f430b399e..1af1e68b2d3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -157,4 +157,15 @@ object ScalarSqlFunctions {
     OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", OperandTypes.DATETIME, OperandTypes.STRING),
     SqlFunctionCategory.TIMEDATE
   )
+
+  val FROM_BASE64 = new SqlFunction(
+    "FROM_BASE64",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.STRING),
+    SqlFunctionCategory.STRING
+  )
+
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 2e7c9f6825f..40f1ec3b746 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -21,6 +21,8 @@ import scala.annotation.varargs
 import java.math.{BigDecimal => JBigDecimal}
 import java.lang.StringBuilder
 
+import org.apache.commons.codec.binary.Base64
+
 /**
   * Built-in scalar runtime functions.
   */
@@ -182,4 +184,10 @@ object ScalarFunctions {
 
     new String(data)
   }
+
+  /**
+    * Returns the base string decoded with base64.
+    */
+  def fromBase64(str: String): String = new String(Base64.decodeBase64(str))
+
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 3184e0001ea..b4f04240706 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -202,6 +202,7 @@ object FunctionCatalog {
     "concat_ws" -> classOf[ConcatWs],
     "lpad" -> classOf[Lpad],
     "rpad" -> classOf[Rpad],
+    "fromBase64" -> classOf[FromBase64],
 
     // math functions
     "plus" -> classOf[Plus],
@@ -443,6 +444,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.SHA384,
     ScalarSqlFunctions.SHA512,
     ScalarSqlFunctions.SHA2,
+    ScalarSqlFunctions.FROM_BASE64,
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
     BasicOperatorTable.HOP,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 8b0c3808f22..995762a6814 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -450,6 +450,28 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "1111111111111111111111111111111111111111111111111111111111111111")
   }
 
+  @Test
+  def testFromBase64(): Unit = {
+    testAllApis(
+      'f35.fromBase64(),
+      "f35.fromBase64()",
+      "from_base64(f35)",
+      "hello world")
+
+    testAllApis(
+      'f35.fromBase64(),
+      "f35.fromBase64()",
+      "FROM_BASE64(f35)",
+      "hello world")
+
+    //null test
+    testAllApis(
+      'f33.fromBase64(),
+      "f33.fromBase64()",
+      "FROM_BASE64(f33)",
+      "null")
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Math functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
index 2a90a9046aa..6ad59b17b73 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.types.Row
 class ScalarTypesTestBase extends ExpressionTestBase {
 
   def testData: Row = {
-    val testData = new Row(35)
+    val testData = new Row(36)
     testData.setField(0, "This is a test String.")
     testData.setField(1, true)
     testData.setField(2, 42.toByte)
@@ -64,6 +64,7 @@ class ScalarTypesTestBase extends ExpressionTestBase {
     testData.setField(32, -1)
     testData.setField(33, null)
     testData.setField(34, 256)
+    testData.setField(35, "aGVsbG8gd29ybGQ=")
     testData
   }
 
@@ -103,6 +104,7 @@ class ScalarTypesTestBase extends ExpressionTestBase {
       Types.DECIMAL,
       Types.INT,
       Types.STRING,
-      Types.INT).asInstanceOf[TypeInformation[Any]]
+      Types.INT,
+      Types.STRING).asInstanceOf[TypeInformation[Any]]
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services