You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/12 15:37:00 UTC
[carbondata] branch master updated: [CARBONDATA-3410] Add UDF,
Hex/Base64 SQL functions for binary
This is an automated email from the ASF dual-hosted git repository.
kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c497142 [CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary
c497142 is described below
commit c4971422f283288491cf6e8eea65b35d3a6af091
Author: xubo245 <xu...@huawei.com>
AuthorDate: Fri May 31 20:33:25 2019 +0800
[CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary
Add UDF, Hex/Base64 SQL functions for binary
This closes # 3253
---
.../testsuite/binary/TestBinaryDataType.scala | 32 +++++
.../SparkCarbonDataSourceBinaryTest.scala | 140 +++++++++++++--------
2 files changed, 117 insertions(+), 55 deletions(-)
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 15e3ee9..1b73aba 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -65,6 +65,17 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
}
assert(flag)
+ sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray))
+ sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))
+
+ val udfHexResult = sql("SELECT decodeHex(binaryField) FROM binaryTable")
+ val unhexResult = sql("SELECT unhex(binaryField) FROM binaryTable")
+ checkAnswer(udfHexResult, unhexResult)
+
+ val udfBase64Result = sql("SELECT decodeBase64(binaryField) FROM binaryTable")
+ val unbase64Result = sql("SELECT unbase64(binaryField) FROM binaryTable")
+ checkAnswer(udfBase64Result, unbase64Result)
+
checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
try {
val df = sql("SELECT * FROM binaryTable").collect()
@@ -614,6 +625,27 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
| OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail')
""".stripMargin)
+ val hexHiveResult = sql("SELECT hex(binaryField) FROM hivetable")
+ val hexCarbonResult = sql("SELECT hex(binaryField) FROM carbontable")
+ checkAnswer(hexHiveResult, hexCarbonResult)
+ hexCarbonResult.collect().foreach { each =>
+ val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
+ assert("\u0001history\u0002".equals(result)
+ || "\u0001biology\u0002".equals(result)
+ || "\u0001education\u0002".equals(result))
+ }
+
+ val base64HiveResult = sql("SELECT base64(binaryField) FROM hivetable")
+ val base64CarbonResult = sql("SELECT base64(binaryField) FROM carbontable")
+ checkAnswer(base64HiveResult, base64CarbonResult)
+ base64CarbonResult.collect().foreach { each =>
+ val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
+ assert("\u0001history\u0002".equals(result)
+ || "\u0001biology\u0002".equals(result)
+ || "\u0001education\u0002".equals(result))
+ }
+
+
val hiveResult = sql("SELECT * FROM hivetable")
val carbonResult = sql("SELECT * FROM carbontable")
checkAnswer(hiveResult, carbonResult)
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
index bdfc9dd..d234576 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
@@ -17,16 +17,14 @@
package org.apache.spark.sql.carbondata.datasource
import java.io.File
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.util.BinaryUtil
+import org.apache.commons.codec.binary.{Base64, Hex}
import org.apache.commons.io.FileUtils
-
import org.apache.spark.sql.Row
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.util.SparkUtil
-
import org.scalatest.{BeforeAndAfterAll, FunSuite}
class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
@@ -257,9 +255,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
test("insert into for hive and carbon, CTAS") {
sql("DROP TABLE IF EXISTS hiveTable")
- sql("DROP TABLE IF EXISTS carbontable")
+ sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS hiveTable2")
- sql("DROP TABLE IF EXISTS carbontable2")
+ sql("DROP TABLE IF EXISTS carbon_table2")
sql(
s"""
| CREATE TABLE IF NOT EXISTS hivetable (
@@ -275,7 +273,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
sql(
s"""
- | CREATE TABLE IF NOT EXISTS carbontable (
+ | CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
@@ -283,9 +281,28 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
| autoLabel boolean)
| using carbon
""".stripMargin)
- sql("insert into carbontable values(1,true,'Bob','binary',false)")
- sql("insert into carbontable values(2,false,'Xu','test',true)")
- val carbonResult = sql("SELECT * FROM carbontable")
+ sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+ sql("insert into carbon_table values(2,false,'Xu','test',true)")
+
+ val hexHiveResult = sql("SELECT hex(image) FROM hivetable")
+ val hexCarbonResult = sql("SELECT hex(image) FROM carbon_table")
+ checkAnswer(hexHiveResult, hexCarbonResult)
+ hexCarbonResult.collect().foreach { each =>
+ val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
+ assert("binary".equals(result)
+ || "test".equals(result))
+ }
+
+ val base64HiveResult = sql("SELECT base64(image) FROM hivetable")
+ val base64CarbonResult = sql("SELECT base64(image) FROM carbon_table")
+ checkAnswer(base64HiveResult, base64CarbonResult)
+ base64CarbonResult.collect().foreach { each =>
+ val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
+ assert("binary".equals(result)
+ || "test".equals(result))
+ }
+
+ val carbonResult = sql("SELECT * FROM carbon_table")
val hiveResult = sql("SELECT * FROM hivetable")
assert(2 == carbonResult.collect().length)
@@ -301,9 +318,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
}
- sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
- sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM hivetable")
- val carbonResult2 = sql("SELECT * FROM carbontable2")
+ sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
+ sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable")
+ val carbonResult2 = sql("SELECT * FROM carbon_table2")
val hiveResult2 = sql("SELECT * FROM hivetable2")
checkAnswer(hiveResult2, carbonResult2)
checkAnswer(carbonResult, carbonResult2)
@@ -311,9 +328,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
assert(2 == carbonResult2.collect().length)
assert(2 == hiveResult2.collect().length)
- sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
- sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
- val carbonResult3 = sql("SELECT * FROM carbontable2")
+ sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
+ sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
+ val carbonResult3 = sql("SELECT * FROM carbon_table2")
val hiveResult3 = sql("SELECT * FROM hivetable2")
checkAnswer(carbonResult3, hiveResult3)
assert(4 == carbonResult3.collect().length)
@@ -322,9 +339,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
test("insert into for parquet and carbon, CTAS") {
sql("DROP TABLE IF EXISTS parquetTable")
- sql("DROP TABLE IF EXISTS carbontable")
+ sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS parquetTable2")
- sql("DROP TABLE IF EXISTS carbontable2")
+ sql("DROP TABLE IF EXISTS carbon_table2")
sql(
s"""
| CREATE TABLE IF NOT EXISTS parquettable (
@@ -340,7 +357,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
sql(
s"""
- | CREATE TABLE IF NOT EXISTS carbontable (
+ | CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
@@ -348,9 +365,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
| autoLabel boolean)
| using carbon
""".stripMargin)
- sql("insert into carbontable values(1,true,'Bob','binary',false)")
- sql("insert into carbontable values(2,false,'Xu','test',true)")
- val carbonResult = sql("SELECT * FROM carbontable")
+ sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+ sql("insert into carbon_table values(2,false,'Xu','test',true)")
+ val carbonResult = sql("SELECT * FROM carbon_table")
val parquetResult = sql("SELECT * FROM parquettable")
assert(2 == carbonResult.collect().length)
@@ -366,9 +383,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
}
- sql("CREATE TABLE parquettable2 AS SELECT * FROM carbontable")
- sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM parquettable")
- val carbonResult2 = sql("SELECT * FROM carbontable2")
+ sql("CREATE TABLE parquettable2 AS SELECT * FROM carbon_table")
+ sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM parquettable")
+ val carbonResult2 = sql("SELECT * FROM carbon_table2")
val parquetResult2 = sql("SELECT * FROM parquettable2")
checkAnswer(parquetResult2, carbonResult2)
checkAnswer(carbonResult, carbonResult2)
@@ -376,9 +393,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
assert(2 == carbonResult2.collect().length)
assert(2 == parquetResult2.collect().length)
- sql("INSERT INTO parquettable2 SELECT * FROM carbontable")
- sql("INSERT INTO carbontable2 SELECT * FROM parquettable")
- val carbonResult3 = sql("SELECT * FROM carbontable2")
+ sql("INSERT INTO parquettable2 SELECT * FROM carbon_table")
+ sql("INSERT INTO carbon_table2 SELECT * FROM parquettable")
+ val carbonResult3 = sql("SELECT * FROM carbon_table2")
val parquetResult3 = sql("SELECT * FROM parquettable2")
checkAnswer(carbonResult3, parquetResult3)
assert(4 == carbonResult3.collect().length)
@@ -387,9 +404,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
test("insert into carbon as select from hive after hive load data") {
sql("DROP TABLE IF EXISTS hiveTable")
- sql("DROP TABLE IF EXISTS carbontable")
+ sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS hiveTable2")
- sql("DROP TABLE IF EXISTS carbontable2")
+ sql("DROP TABLE IF EXISTS carbon_table2")
sql(
s"""
@@ -409,7 +426,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
sql(
s"""
- | CREATE TABLE IF NOT EXISTS carbontable (
+ | CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
@@ -417,8 +434,20 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
| autoLabel boolean)
| using carbon
""".stripMargin)
- sql("insert into carbontable select * from hivetable")
- val carbonResult = sql("SELECT * FROM carbontable")
+ sql("insert into carbon_table select * from hivetable")
+
+ sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray))
+ sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))
+
+ val udfHexResult = sql("SELECT decodeHex(image) FROM carbon_table")
+ val unhexResult = sql("SELECT unhex(image) FROM carbon_table")
+ checkAnswer(udfHexResult, unhexResult)
+
+ val udfBase64Result = sql("SELECT decodeBase64(image) FROM carbon_table")
+ val unbase64Result = sql("SELECT unbase64(image) FROM carbon_table")
+ checkAnswer(udfBase64Result, unbase64Result)
+
+ val carbonResult = sql("SELECT * FROM carbon_table")
val hiveResult = sql("SELECT * FROM hivetable")
assert(3 == carbonResult.collect().length)
@@ -437,9 +466,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
}
- sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
- sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM hivetable")
- val carbonResult2 = sql("SELECT * FROM carbontable2")
+ sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
+ sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable")
+ val carbonResult2 = sql("SELECT * FROM carbon_table2")
val hiveResult2 = sql("SELECT * FROM hivetable2")
checkAnswer(hiveResult2, carbonResult2)
checkAnswer(carbonResult, carbonResult2)
@@ -447,18 +476,19 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
assert(3 == carbonResult2.collect().length)
assert(3 == hiveResult2.collect().length)
- sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
- sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
- val carbonResult3 = sql("SELECT * FROM carbontable2")
+ sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
+ sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
+ val carbonResult3 = sql("SELECT * FROM carbon_table2")
val hiveResult3 = sql("SELECT * FROM hivetable2")
checkAnswer(carbonResult3, hiveResult3)
assert(6 == carbonResult3.collect().length)
assert(6 == hiveResult3.collect().length)
+
}
test("filter for hive and carbon") {
sql("DROP TABLE IF EXISTS hiveTable")
- sql("DROP TABLE IF EXISTS carbontable")
+ sql("DROP TABLE IF EXISTS carbon_table")
sql(
s"""
@@ -475,7 +505,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
sql(
s"""
- | CREATE TABLE IF NOT EXISTS carbontable (
+ | CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
@@ -483,12 +513,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
| autoLabel boolean)
| using carbon
""".stripMargin)
- sql("insert into carbontable values(1,true,'Bob','binary',false)")
- sql("insert into carbontable values(2,false,'Xu','test',true)")
+ sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+ sql("insert into carbon_table values(2,false,'Xu','test',true)")
// filter with equal
val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)")
- val carbonResult = sql("SELECT * FROM carbontable where image=cast('binary' as binary)")
+ val carbonResult = sql("SELECT * FROM carbon_table where image=cast('binary' as binary)")
checkAnswer(hiveResult, carbonResult)
assert(1 == carbonResult.collect().length)
@@ -499,13 +529,13 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
// filter with non string
val exception = intercept[Exception] {
- sql("SELECT * FROM carbontable where image=binary").collect()
+ sql("SELECT * FROM carbon_table where image=binary").collect()
}
assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))
// filter with not equal
val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)")
- val carbonResult3 = sql("SELECT * FROM carbontable where image!=cast('binary' as binary)")
+ val carbonResult3 = sql("SELECT * FROM carbon_table where image!=cast('binary' as binary)")
checkAnswer(hiveResult3, carbonResult3)
assert(1 == carbonResult3.collect().length)
carbonResult3.collect().foreach { each =>
@@ -515,7 +545,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
// filter with in
val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))")
- val carbonResult4 = sql("SELECT * FROM carbontable where image in (cast('binary' as binary))")
+ val carbonResult4 = sql("SELECT * FROM carbon_table where image in (cast('binary' as binary))")
checkAnswer(hiveResult4, carbonResult4)
assert(1 == carbonResult4.collect().length)
carbonResult4.collect().foreach { each =>
@@ -525,7 +555,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
// filter with not in
val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))")
- val carbonResult5 = sql("SELECT * FROM carbontable where image not in (cast('binary' as binary))")
+ val carbonResult5 = sql("SELECT * FROM carbon_table where image not in (cast('binary' as binary))")
checkAnswer(hiveResult5, carbonResult5)
assert(1 == carbonResult5.collect().length)
carbonResult5.collect().foreach { each =>
@@ -535,12 +565,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
test("Spark DataSource don't support update, delete") {
- sql("DROP TABLE IF EXISTS carbontable")
- sql("DROP TABLE IF EXISTS carbontable2")
+ sql("DROP TABLE IF EXISTS carbon_table")
+ sql("DROP TABLE IF EXISTS carbon_table2")
sql(
s"""
- | CREATE TABLE IF NOT EXISTS carbontable (
+ | CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
@@ -548,10 +578,10 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
| autoLabel boolean)
| using carbon
""".stripMargin)
- sql("insert into carbontable values(1,true,'Bob','binary',false)")
- sql("insert into carbontable values(2,false,'Xu','test',true)")
+ sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+ sql("insert into carbon_table values(2,false,'Xu','test',true)")
- val carbonResult = sql("SELECT * FROM carbontable")
+ val carbonResult = sql("SELECT * FROM carbon_table")
carbonResult.collect().foreach { each =>
if (1 == each.get(0)) {
@@ -564,12 +594,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
var exception = intercept[Exception] {
- sql("UPDATE carbontable SET binaryField = 'binary2' WHERE id = 1").show()
+ sql("UPDATE carbon_table SET binaryField = 'binary2' WHERE id = 1").show()
}
assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting"))
exception = intercept[Exception] {
- sql("DELETE FROM carbontable WHERE id = 1").show()
+ sql("DELETE FROM carbon_table WHERE id = 1").show()
}
assert(exception.getMessage.contains("Operation not allowed: DELETE FROM"))
}