You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/14 09:42:07 UTC

[flink] 01/02: [FLINK-9853] [table] Add HEX support for Table API & SQL

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab1d1dfb2ad872748833896d552e2d56a26a9a92
Author: xueyu <27...@qq.com>
AuthorDate: Sun Jul 15 20:01:15 2018 +0800

    [FLINK-9853] [table] Add HEX support for Table API & SQL
    
    This closes #6337.
---
 docs/dev/table/functions.md                        | 39 +++++++++
 .../flink/table/api/scala/expressionDsl.scala      |  9 +++
 .../flink/table/codegen/calls/BuiltInMethods.scala |  4 +
 .../table/codegen/calls/FunctionGenerator.scala    | 12 +++
 .../flink/table/expressions/mathExpressions.scala  | 18 +++++
 .../table/functions/sql/ScalarSqlFunctions.scala   |  9 +++
 .../table/runtime/functions/ScalarFunctions.scala  | 16 +++-
 .../flink/table/validate/FunctionCatalog.scala     |  2 +
 .../table/expressions/ScalarFunctionsTest.scala    | 93 ++++++++++++++++++++++
 9 files changed, 199 insertions(+), 3 deletions(-)

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 00a9605..f637b4d 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -1393,6 +1393,19 @@ BIN(integer)
         <p>E.g. <code>BIN(4)</code> returns '100' and <code>BIN(12)</code> returns '1100'.</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+{% highlight text %}
+HEX(numeric)
+HEX(string)
+      {% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a string representation of an integer <i>numeric</i> value or a <i>string</i> in hex format. Returns NULL if the argument is NULL.</p>
+        <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p>
+      </td>
+    </tr>
   </tbody>
 </table>
 </div>
@@ -1805,6 +1818,19 @@ INTEGER.bin()
       <p>E.g., <code>4.bin()</code> returns "100" and <code>12.bin()</code> returns "1100".</p>
     </td>
    </tr>
+
+    <tr>
+      <td>
+       {% highlight java %}
+NUMERIC.hex()
+STRING.hex()
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a string representation of an integer <i>NUMERIC</i> value or a <i>STRING</i> in hex format. Returns NULL if the argument is NULL.</p>
+      <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p>
+    </td>
+   </tr>
   </tbody>
 </table>
 </div>
@@ -2217,6 +2243,19 @@ INTEGER.bin()
       <p>E.g., <code>4.bin()</code> returns "100" and <code>12.bin()</code> returns "1100".</p>
     </td>
    </tr>
+
+    <tr>
+      <td>
+       {% highlight scala %}
+NUMERIC.hex()
+STRING.hex()
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a string representation of an integer <i>NUMERIC</i> value or a <i>STRING</i> in hex format. Returns NULL if the argument is NULL.</p>
+      <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p>
+    </td>
+   </tr>
   </tbody>
 </table>
 </div>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index d1bb06c..66e7544 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -406,6 +406,15 @@ trait ImplicitExpressionOperations {
     */
   def bin() = Bin(expr)
 
+  /**
+    * Returns a string representation of an integer numeric value or a string in hex format. Returns
+    * null if numeric or string is null.
+    *
+    * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads
+    * to "68656c6c6f2c776f726c64".
+    */
+  def hex() = Hex(expr)
+
   // String operations
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index f5ed9b3..942666a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.codegen.calls
 
+import java.lang.reflect.Method
 import java.lang.{Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 
@@ -135,4 +136,7 @@ object BuiltInMethods {
   val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
 
   val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
+
+  val HEX_LONG: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[Long])
+  val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String])
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 74b69d6..fd71126 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -455,6 +455,18 @@ object FunctionGenerator {
     STRING_TYPE_INFO,
     BuiltInMethods.BIN)
 
+  addSqlFunctionMethod(
+    ScalarSqlFunctions.HEX,
+    Seq(LONG_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.HEX_LONG)
+
+  addSqlFunctionMethod(
+    ScalarSqlFunctions.HEX,
+    Seq(STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.HEX_STRING)
+
   // ----------------------------------------------------------------------------------------------
   // Temporal functions
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index cf3efa9..13e005e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -422,3 +422,21 @@ case class Bin(child: Expression) extends UnaryExpression {
     relBuilder.call(ScalarSqlFunctions.BIN, child.toRexNode)
   }
 }
+
+case class Hex(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (TypeCheckUtils.isIntegerFamily(child.resultType) ||
+        TypeCheckUtils.isString(child.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"hex() requires an integer or string input but was '${child.resultType}'.")
+    }
+  }
+  override def toString: String = s"hex($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.HEX, child.toRexNode)
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 21793e3..a0b6c9c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -42,6 +42,15 @@ object ScalarSqlFunctions {
     OperandTypes.family(SqlTypeFamily.INTEGER),
     SqlFunctionCategory.NUMERIC)
 
+  val HEX = new SqlFunction(
+    "HEX",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.or(OperandTypes.family(SqlTypeFamily.INTEGER),
+      OperandTypes.family(SqlTypeFamily.STRING)),
+    SqlFunctionCategory.NUMERIC)
+
   val CONCAT = new SqlFunction(
     "CONCAT",
     SqlKind.OTHER_FUNCTION,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 03ee62c..1881874 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -17,11 +17,12 @@
  */
 package org.apache.flink.table.runtime.functions
 
-import scala.annotation.varargs
+import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
-import java.lang.StringBuilder
 
-import org.apache.commons.codec.binary.Base64
+import org.apache.commons.codec.binary.{Base64, Hex}
+
+import scala.annotation.varargs
 
 /**
   * Built-in scalar runtime functions.
@@ -212,4 +213,13 @@ object ScalarFunctions {
     */
   def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes())
 
+  /**
+    * Returns the hex string of a long argument.
+    */
+  def hex(x: Long): String = JLong.toHexString(x).toUpperCase()
+
+  /**
+    * Returns the hex string of a string argument.
+    */
+  def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase()
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index bca156c..a446401 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -239,6 +239,7 @@ object FunctionCatalog {
     "rand" -> classOf[Rand],
     "randInteger" -> classOf[RandInteger],
     "bin" -> classOf[Bin],
+    "hex" -> classOf[Hex],
 
     // temporal functions
     "extract" -> classOf[Extract],
@@ -438,6 +439,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.CONCAT,
     ScalarSqlFunctions.CONCAT_WS,
     ScalarSqlFunctions.BIN,
+    ScalarSqlFunctions.HEX,
     SqlStdOperatorTable.TIMESTAMP_ADD,
     ScalarSqlFunctions.LOG,
     ScalarSqlFunctions.LPAD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 6f9a9ae..8e85b34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -393,6 +393,99 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testHex(): Unit = {
+    testAllApis(
+      100.hex(),
+      "100.hex()",
+      "HEX(100)",
+      "64")
+
+    testAllApis(
+      'f2.hex(),
+      "f2.hex()",
+      "HEX(f2)",
+      "2A")
+
+    testAllApis(
+      Null(Types.BYTE).hex(),
+      "hex(Null(BYTE))",
+      "HEX(CAST(NULL AS TINYINT))",
+      "null")
+
+    testAllApis(
+      'f3.hex(),
+      "f3.hex()",
+      "HEX(f3)",
+      "2B")
+
+    testAllApis(
+      'f4.hex(),
+      "f4.hex()",
+      "HEX(f4)",
+      "2C")
+
+    testAllApis(
+      'f7.hex(),
+      "f7.hex()",
+      "HEX(f7)",
+      "3")
+
+    testAllApis(
+      12.hex(),
+      "12.hex()",
+      "HEX(12)",
+      "C")
+
+    testAllApis(
+      10.hex(),
+      "10.hex()",
+      "HEX(10)",
+      "A")
+
+    testAllApis(
+      0.hex(),
+      "0.hex()",
+      "HEX(0)",
+      "0")
+
+    testAllApis(
+      "ö".hex(),
+      "'ö'.hex()",
+      "HEX('ö')",
+      "C3B6")
+
+    testAllApis(
+      'f32.hex(),
+      "f32.hex()",
+      "HEX(f32)",
+      "FFFFFFFFFFFFFFFF")
+
+    testAllApis(
+      'f0.hex(),
+      "f0.hex()",
+      "HEX(f0)",
+      "546869732069732061207465737420537472696E672E")
+
+    testAllApis(
+      'f8.hex(),
+      "f8.hex()",
+      "HEX(f8)",
+      "20546869732069732061207465737420537472696E672E20")
+
+    testAllApis(
+      'f23.hex(),
+      "f23.hex()",
+      "HEX(f23)",
+      "25546869732069732061207465737420537472696E672E")
+
+    testAllApis(
+      'f24.hex(),
+      "f24.hex()",
+      "HEX(f24)",
+      "2A5F546869732069732061207465737420537472696E672E")
+  }
+
+  @Test
   def testBin(): Unit = {
 
     testAllApis(