You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/14 09:42:07 UTC
[flink] 01/02: [FLINK-9853] [table] Add HEX support for Table API &
SQL
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ab1d1dfb2ad872748833896d552e2d56a26a9a92
Author: xueyu <27...@qq.com>
AuthorDate: Sun Jul 15 20:01:15 2018 +0800
[FLINK-9853] [table] Add HEX support for Table API & SQL
This closes #6337.
---
docs/dev/table/functions.md | 39 +++++++++
.../flink/table/api/scala/expressionDsl.scala | 9 +++
.../flink/table/codegen/calls/BuiltInMethods.scala | 4 +
.../table/codegen/calls/FunctionGenerator.scala | 12 +++
.../flink/table/expressions/mathExpressions.scala | 18 +++++
.../table/functions/sql/ScalarSqlFunctions.scala | 9 +++
.../table/runtime/functions/ScalarFunctions.scala | 16 +++-
.../flink/table/validate/FunctionCatalog.scala | 2 +
.../table/expressions/ScalarFunctionsTest.scala | 93 ++++++++++++++++++++++
9 files changed, 199 insertions(+), 3 deletions(-)
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 00a9605..f637b4d 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -1393,6 +1393,19 @@ BIN(integer)
<p>E.g. <code>BIN(4)</code> returns '100' and <code>BIN(12)</code> returns '1100'.</p>
</td>
</tr>
+
+ <tr>
+ <td>
+{% highlight text %}
+HEX(numeric)
+HEX(string)
+ {% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string representation of an integer <i>numeric</i> value or a <i>string</i> in hex format. Returns NULL if the argument is NULL.</p>
+ <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p>
+ </td>
+ </tr>
</tbody>
</table>
</div>
@@ -1805,6 +1818,19 @@ INTEGER.bin()
<p>E.g., <code>4.bin()</code> returns "100" and <code>12.bin()</code> returns "1100".</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+NUMERIC.hex()
+STRING.hex()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string representation of an integer <i>NUMERIC</i> value or a <i>STRING</i> in hex format. Returns NULL if the argument is NULL.</p>
+ <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p>
+ </td>
+ </tr>
</tbody>
</table>
</div>
@@ -2217,6 +2243,19 @@ INTEGER.bin()
<p>E.g., <code>4.bin()</code> returns "100" and <code>12.bin()</code> returns "1100".</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+NUMERIC.hex()
+STRING.hex()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns a string representation of an integer <i>NUMERIC</i> value or a <i>STRING</i> in hex format. Returns NULL if the argument is NULL.</p>
+ <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", a string "hello,world" leads to "68656C6C6F2C776F726C64".</p>
+ </td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index d1bb06c..66e7544 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -406,6 +406,15 @@ trait ImplicitExpressionOperations {
*/
def bin() = Bin(expr)
+ /**
+ * Returns a string representation of an integer numeric value or a string in hex format. Returns
+ * null if numeric or string is null.
+ *
+ * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads
+ * to "68656c6c6f2c776f726c64".
+ */
+ def hex() = Hex(expr)
+
// String operations
/**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index f5ed9b3..942666a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.codegen.calls
+import java.lang.reflect.Method
import java.lang.{Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
@@ -135,4 +136,7 @@ object BuiltInMethods {
val FROMBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "fromBase64", classOf[String])
val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
+
+ val HEX_LONG: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[Long])
+ val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String])
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 74b69d6..fd71126 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -455,6 +455,18 @@ object FunctionGenerator {
STRING_TYPE_INFO,
BuiltInMethods.BIN)
+ addSqlFunctionMethod(
+ ScalarSqlFunctions.HEX,
+ Seq(LONG_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethods.HEX_LONG)
+
+ addSqlFunctionMethod(
+ ScalarSqlFunctions.HEX,
+ Seq(STRING_TYPE_INFO),
+ STRING_TYPE_INFO,
+ BuiltInMethods.HEX_STRING)
+
// ----------------------------------------------------------------------------------------------
// Temporal functions
// ----------------------------------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index cf3efa9..13e005e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -422,3 +422,21 @@ case class Bin(child: Expression) extends UnaryExpression {
relBuilder.call(ScalarSqlFunctions.BIN, child.toRexNode)
}
}
+
+case class Hex(child: Expression) extends UnaryExpression {
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (TypeCheckUtils.isIntegerFamily(child.resultType) ||
+ TypeCheckUtils.isString(child.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"hex() requires an integer or string input but was '${child.resultType}'.")
+ }
+ }
+ override def toString: String = s"hex($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(ScalarSqlFunctions.HEX, child.toRexNode)
+ }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 21793e3..a0b6c9c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -42,6 +42,15 @@ object ScalarSqlFunctions {
OperandTypes.family(SqlTypeFamily.INTEGER),
SqlFunctionCategory.NUMERIC)
+ val HEX = new SqlFunction(
+ "HEX",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ InferTypes.RETURN_TYPE,
+ OperandTypes.or(OperandTypes.family(SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.STRING)),
+ SqlFunctionCategory.NUMERIC)
+
val CONCAT = new SqlFunction(
"CONCAT",
SqlKind.OTHER_FUNCTION,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 03ee62c..1881874 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -17,11 +17,12 @@
*/
package org.apache.flink.table.runtime.functions
-import scala.annotation.varargs
+import java.lang.{StringBuilder, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
-import java.lang.StringBuilder
-import org.apache.commons.codec.binary.Base64
+import org.apache.commons.codec.binary.{Base64, Hex}
+
+import scala.annotation.varargs
/**
* Built-in scalar runtime functions.
@@ -212,4 +213,13 @@ object ScalarFunctions {
*/
def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes())
+ /**
+ * Returns the hex string of a long argument.
+ */
+ def hex(x: Long): String = JLong.toHexString(x).toUpperCase()
+
+ /**
+ * Returns the hex string of a string argument.
+ */
+ def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase()
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index bca156c..a446401 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -239,6 +239,7 @@ object FunctionCatalog {
"rand" -> classOf[Rand],
"randInteger" -> classOf[RandInteger],
"bin" -> classOf[Bin],
+ "hex" -> classOf[Hex],
// temporal functions
"extract" -> classOf[Extract],
@@ -438,6 +439,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
ScalarSqlFunctions.CONCAT,
ScalarSqlFunctions.CONCAT_WS,
ScalarSqlFunctions.BIN,
+ ScalarSqlFunctions.HEX,
SqlStdOperatorTable.TIMESTAMP_ADD,
ScalarSqlFunctions.LOG,
ScalarSqlFunctions.LPAD,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 6f9a9ae..8e85b34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -393,6 +393,99 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
}
@Test
+ def testHex(): Unit = {
+ testAllApis(
+ 100.hex(),
+ "100.hex()",
+ "HEX(100)",
+ "64")
+
+ testAllApis(
+ 'f2.hex(),
+ "f2.hex()",
+ "HEX(f2)",
+ "2A")
+
+ testAllApis(
+ Null(Types.BYTE).hex(),
+ "hex(Null(BYTE))",
+ "HEX(CAST(NULL AS TINYINT))",
+ "null")
+
+ testAllApis(
+ 'f3.hex(),
+ "f3.hex()",
+ "HEX(f3)",
+ "2B")
+
+ testAllApis(
+ 'f4.hex(),
+ "f4.hex()",
+ "HEX(f4)",
+ "2C")
+
+ testAllApis(
+ 'f7.hex(),
+ "f7.hex()",
+ "HEX(f7)",
+ "3")
+
+ testAllApis(
+ 12.hex(),
+ "12.hex()",
+ "HEX(12)",
+ "C")
+
+ testAllApis(
+ 10.hex(),
+ "10.hex()",
+ "HEX(10)",
+ "A")
+
+ testAllApis(
+ 0.hex(),
+ "0.hex()",
+ "HEX(0)",
+ "0")
+
+ testAllApis(
+ "ö".hex(),
+ "'ö'.hex()",
+ "HEX('ö')",
+ "C3B6")
+
+ testAllApis(
+ 'f32.hex(),
+ "f32.hex()",
+ "HEX(f32)",
+ "FFFFFFFFFFFFFFFF")
+
+ testAllApis(
+ 'f0.hex(),
+ "f0.hex()",
+ "HEX(f0)",
+ "546869732069732061207465737420537472696E672E")
+
+ testAllApis(
+ 'f8.hex(),
+ "f8.hex()",
+ "HEX(f8)",
+ "20546869732069732061207465737420537472696E672E20")
+
+ testAllApis(
+ 'f23.hex(),
+ "f23.hex()",
+ "HEX(f23)",
+ "25546869732069732061207465737420537472696E672E")
+
+ testAllApis(
+ 'f24.hex(),
+ "f24.hex()",
+ "HEX(f24)",
+ "2A5F546869732069732061207465737420537472696E672E")
+ }
+
+ @Test
def testBin(): Unit = {
testAllApis(