You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/12 15:37:00 UTC

[carbondata] branch master updated: [CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c497142  [CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary
c497142 is described below

commit c4971422f283288491cf6e8eea65b35d3a6af091
Author: xubo245 <xu...@huawei.com>
AuthorDate: Fri May 31 20:33:25 2019 +0800

    [CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary
    
    Add UDF, Hex/Base64 SQL functions for binary
    
    This closes # 3253
---
 .../testsuite/binary/TestBinaryDataType.scala      |  32 +++++
 .../SparkCarbonDataSourceBinaryTest.scala          | 140 +++++++++++++--------
 2 files changed, 117 insertions(+), 55 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 15e3ee9..1b73aba 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -65,6 +65,17 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
         }
         assert(flag)
 
+        sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray))
+        sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))
+
+        val udfHexResult = sql("SELECT decodeHex(binaryField) FROM binaryTable")
+        val unhexResult = sql("SELECT unhex(binaryField) FROM binaryTable")
+        checkAnswer(udfHexResult, unhexResult)
+
+        val udfBase64Result = sql("SELECT decodeBase64(binaryField) FROM binaryTable")
+        val unbase64Result = sql("SELECT unbase64(binaryField) FROM binaryTable")
+        checkAnswer(udfBase64Result, unbase64Result)
+
         checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
         try {
             val df = sql("SELECT * FROM binaryTable").collect()
@@ -614,6 +625,27 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
                | OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail')
              """.stripMargin)
 
+        val hexHiveResult = sql("SELECT hex(binaryField) FROM hivetable")
+        val hexCarbonResult = sql("SELECT hex(binaryField) FROM carbontable")
+        checkAnswer(hexHiveResult, hexCarbonResult)
+        hexCarbonResult.collect().foreach { each =>
+            val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
+            assert("\u0001history\u0002".equals(result)
+                    || "\u0001biology\u0002".equals(result)
+                    || "\u0001education\u0002".equals(result))
+        }
+
+        val base64HiveResult = sql("SELECT base64(binaryField) FROM hivetable")
+        val base64CarbonResult = sql("SELECT base64(binaryField) FROM carbontable")
+        checkAnswer(base64HiveResult, base64CarbonResult)
+        base64CarbonResult.collect().foreach { each =>
+            val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
+            assert("\u0001history\u0002".equals(result)
+                    || "\u0001biology\u0002".equals(result)
+                    || "\u0001education\u0002".equals(result))
+        }
+
+
         val hiveResult = sql("SELECT * FROM hivetable")
         val carbonResult = sql("SELECT * FROM carbontable")
         checkAnswer(hiveResult, carbonResult)
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
index bdfc9dd..d234576 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
@@ -17,16 +17,14 @@
 package org.apache.spark.sql.carbondata.datasource
 
 import java.io.File
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.sdk.util.BinaryUtil
+import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.commons.io.FileUtils
-
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
 import org.apache.spark.util.SparkUtil
-
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
@@ -257,9 +255,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
     test("insert into for hive and carbon, CTAS") {
         sql("DROP TABLE IF EXISTS hiveTable")
-        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS carbon_table")
         sql("DROP TABLE IF EXISTS hiveTable2")
-        sql("DROP TABLE IF EXISTS carbontable2")
+        sql("DROP TABLE IF EXISTS carbon_table2")
         sql(
             s"""
                | CREATE TABLE IF NOT EXISTS hivetable (
@@ -275,7 +273,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         sql(
             s"""
-               | CREATE TABLE IF NOT EXISTS carbontable (
+               | CREATE TABLE IF NOT EXISTS carbon_table (
                |    id int,
                |    label boolean,
                |    name string,
@@ -283,9 +281,28 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
                |    autoLabel boolean)
                | using carbon
              """.stripMargin)
-        sql("insert into carbontable values(1,true,'Bob','binary',false)")
-        sql("insert into carbontable values(2,false,'Xu','test',true)")
-        val carbonResult = sql("SELECT * FROM carbontable")
+        sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+        sql("insert into carbon_table values(2,false,'Xu','test',true)")
+
+        val hexHiveResult = sql("SELECT hex(image) FROM hivetable")
+        val hexCarbonResult = sql("SELECT hex(image) FROM carbon_table")
+        checkAnswer(hexHiveResult, hexCarbonResult)
+        hexCarbonResult.collect().foreach { each =>
+            val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
+            assert("binary".equals(result)
+                    || "test".equals(result))
+        }
+
+        val base64HiveResult = sql("SELECT base64(image) FROM hivetable")
+        val base64CarbonResult = sql("SELECT base64(image) FROM carbon_table")
+        checkAnswer(base64HiveResult, base64CarbonResult)
+        base64CarbonResult.collect().foreach { each =>
+            val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
+            assert("binary".equals(result)
+                    || "test".equals(result))
+        }
+
+        val carbonResult = sql("SELECT * FROM carbon_table")
         val hiveResult = sql("SELECT * FROM hivetable")
 
         assert(2 == carbonResult.collect().length)
@@ -301,9 +318,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
             }
         }
 
-        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
-        sql("CREATE TABLE carbontable2  USING CARBON AS SELECT * FROM hivetable")
-        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
+        sql("CREATE TABLE carbon_table2  USING CARBON AS SELECT * FROM hivetable")
+        val carbonResult2 = sql("SELECT * FROM carbon_table2")
         val hiveResult2 = sql("SELECT * FROM hivetable2")
         checkAnswer(hiveResult2, carbonResult2)
         checkAnswer(carbonResult, carbonResult2)
@@ -311,9 +328,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
         assert(2 == carbonResult2.collect().length)
         assert(2 == hiveResult2.collect().length)
 
-        sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
-        sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
-        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
+        sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
+        val carbonResult3 = sql("SELECT * FROM carbon_table2")
         val hiveResult3 = sql("SELECT * FROM hivetable2")
         checkAnswer(carbonResult3, hiveResult3)
         assert(4 == carbonResult3.collect().length)
@@ -322,9 +339,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
     test("insert into for parquet and carbon, CTAS") {
         sql("DROP TABLE IF EXISTS parquetTable")
-        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS carbon_table")
         sql("DROP TABLE IF EXISTS parquetTable2")
-        sql("DROP TABLE IF EXISTS carbontable2")
+        sql("DROP TABLE IF EXISTS carbon_table2")
         sql(
             s"""
                | CREATE TABLE IF NOT EXISTS parquettable (
@@ -340,7 +357,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         sql(
             s"""
-               | CREATE TABLE IF NOT EXISTS carbontable (
+               | CREATE TABLE IF NOT EXISTS carbon_table (
                |    id int,
                |    label boolean,
                |    name string,
@@ -348,9 +365,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
                |    autoLabel boolean)
                | using carbon
              """.stripMargin)
-        sql("insert into carbontable values(1,true,'Bob','binary',false)")
-        sql("insert into carbontable values(2,false,'Xu','test',true)")
-        val carbonResult = sql("SELECT * FROM carbontable")
+        sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+        sql("insert into carbon_table values(2,false,'Xu','test',true)")
+        val carbonResult = sql("SELECT * FROM carbon_table")
         val parquetResult = sql("SELECT * FROM parquettable")
 
         assert(2 == carbonResult.collect().length)
@@ -366,9 +383,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
             }
         }
 
-        sql("CREATE TABLE parquettable2 AS SELECT * FROM carbontable")
-        sql("CREATE TABLE carbontable2  USING CARBON AS SELECT * FROM parquettable")
-        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        sql("CREATE TABLE parquettable2 AS SELECT * FROM carbon_table")
+        sql("CREATE TABLE carbon_table2  USING CARBON AS SELECT * FROM parquettable")
+        val carbonResult2 = sql("SELECT * FROM carbon_table2")
         val parquetResult2 = sql("SELECT * FROM parquettable2")
         checkAnswer(parquetResult2, carbonResult2)
         checkAnswer(carbonResult, carbonResult2)
@@ -376,9 +393,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
         assert(2 == carbonResult2.collect().length)
         assert(2 == parquetResult2.collect().length)
 
-        sql("INSERT INTO parquettable2 SELECT * FROM carbontable")
-        sql("INSERT INTO carbontable2 SELECT * FROM parquettable")
-        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        sql("INSERT INTO parquettable2 SELECT * FROM carbon_table")
+        sql("INSERT INTO carbon_table2 SELECT * FROM parquettable")
+        val carbonResult3 = sql("SELECT * FROM carbon_table2")
         val parquetResult3 = sql("SELECT * FROM parquettable2")
         checkAnswer(carbonResult3, parquetResult3)
         assert(4 == carbonResult3.collect().length)
@@ -387,9 +404,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
     test("insert into carbon as select from hive after hive load data") {
         sql("DROP TABLE IF EXISTS hiveTable")
-        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS carbon_table")
         sql("DROP TABLE IF EXISTS hiveTable2")
-        sql("DROP TABLE IF EXISTS carbontable2")
+        sql("DROP TABLE IF EXISTS carbon_table2")
 
         sql(
             s"""
@@ -409,7 +426,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         sql(
             s"""
-               | CREATE TABLE IF NOT EXISTS carbontable (
+               | CREATE TABLE IF NOT EXISTS carbon_table (
                |    id int,
                |    label boolean,
                |    name string,
@@ -417,8 +434,20 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
                |    autoLabel boolean)
                | using carbon
              """.stripMargin)
-        sql("insert into carbontable select * from hivetable")
-        val carbonResult = sql("SELECT * FROM carbontable")
+        sql("insert into carbon_table select * from hivetable")
+
+        sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray))
+        sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))
+
+        val udfHexResult = sql("SELECT decodeHex(image) FROM carbon_table")
+        val unhexResult = sql("SELECT unhex(image) FROM carbon_table")
+        checkAnswer(udfHexResult, unhexResult)
+
+        val udfBase64Result = sql("SELECT decodeBase64(image) FROM carbon_table")
+        val unbase64Result = sql("SELECT unbase64(image) FROM carbon_table")
+        checkAnswer(udfBase64Result, unbase64Result)
+
+        val carbonResult = sql("SELECT * FROM carbon_table")
         val hiveResult = sql("SELECT * FROM hivetable")
 
         assert(3 == carbonResult.collect().length)
@@ -437,9 +466,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
             }
         }
 
-        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
-        sql("CREATE TABLE carbontable2  USING CARBON AS SELECT * FROM hivetable")
-        val carbonResult2 = sql("SELECT * FROM carbontable2")
+        sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
+        sql("CREATE TABLE carbon_table2  USING CARBON AS SELECT * FROM hivetable")
+        val carbonResult2 = sql("SELECT * FROM carbon_table2")
         val hiveResult2 = sql("SELECT * FROM hivetable2")
         checkAnswer(hiveResult2, carbonResult2)
         checkAnswer(carbonResult, carbonResult2)
@@ -447,18 +476,19 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
         assert(3 == carbonResult2.collect().length)
         assert(3 == hiveResult2.collect().length)
 
-        sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
-        sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
-        val carbonResult3 = sql("SELECT * FROM carbontable2")
+        sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
+        sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
+        val carbonResult3 = sql("SELECT * FROM carbon_table2")
         val hiveResult3 = sql("SELECT * FROM hivetable2")
         checkAnswer(carbonResult3, hiveResult3)
         assert(6 == carbonResult3.collect().length)
         assert(6 == hiveResult3.collect().length)
+
     }
 
     test("filter for hive and carbon") {
         sql("DROP TABLE IF EXISTS hiveTable")
-        sql("DROP TABLE IF EXISTS carbontable")
+        sql("DROP TABLE IF EXISTS carbon_table")
 
         sql(
             s"""
@@ -475,7 +505,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         sql(
             s"""
-               | CREATE TABLE IF NOT EXISTS carbontable (
+               | CREATE TABLE IF NOT EXISTS carbon_table (
                |    id int,
                |    label boolean,
                |    name string,
@@ -483,12 +513,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
                |    autoLabel boolean)
                | using carbon
              """.stripMargin)
-        sql("insert into carbontable values(1,true,'Bob','binary',false)")
-        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+        sql("insert into carbon_table values(2,false,'Xu','test',true)")
 
         // filter with equal
         val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)")
-        val carbonResult = sql("SELECT * FROM carbontable where image=cast('binary' as binary)")
+        val carbonResult = sql("SELECT * FROM carbon_table where image=cast('binary' as binary)")
 
         checkAnswer(hiveResult, carbonResult)
         assert(1 == carbonResult.collect().length)
@@ -499,13 +529,13 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         // filter with non string
         val exception = intercept[Exception] {
-            sql("SELECT * FROM carbontable where image=binary").collect()
+            sql("SELECT * FROM carbon_table where image=binary").collect()
         }
         assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))
 
         // filter with not equal
         val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)")
-        val carbonResult3 = sql("SELECT * FROM carbontable where image!=cast('binary' as binary)")
+        val carbonResult3 = sql("SELECT * FROM carbon_table where image!=cast('binary' as binary)")
         checkAnswer(hiveResult3, carbonResult3)
         assert(1 == carbonResult3.collect().length)
         carbonResult3.collect().foreach { each =>
@@ -515,7 +545,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         // filter with in
         val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))")
-        val carbonResult4 = sql("SELECT * FROM carbontable where image in (cast('binary' as binary))")
+        val carbonResult4 = sql("SELECT * FROM carbon_table where image in (cast('binary' as binary))")
         checkAnswer(hiveResult4, carbonResult4)
         assert(1 == carbonResult4.collect().length)
         carbonResult4.collect().foreach { each =>
@@ -525,7 +555,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
 
         // filter with not in
         val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))")
-        val carbonResult5 = sql("SELECT * FROM carbontable where image not in (cast('binary' as binary))")
+        val carbonResult5 = sql("SELECT * FROM carbon_table where image not in (cast('binary' as binary))")
         checkAnswer(hiveResult5, carbonResult5)
         assert(1 == carbonResult5.collect().length)
         carbonResult5.collect().foreach { each =>
@@ -535,12 +565,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
     }
 
     test("Spark DataSource don't support update, delete") {
-        sql("DROP TABLE IF EXISTS carbontable")
-        sql("DROP TABLE IF EXISTS carbontable2")
+        sql("DROP TABLE IF EXISTS carbon_table")
+        sql("DROP TABLE IF EXISTS carbon_table2")
 
         sql(
             s"""
-               | CREATE TABLE IF NOT EXISTS carbontable (
+               | CREATE TABLE IF NOT EXISTS carbon_table (
                |    id int,
                |    label boolean,
                |    name string,
@@ -548,10 +578,10 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
                |    autoLabel boolean)
                | using carbon
              """.stripMargin)
-        sql("insert into carbontable values(1,true,'Bob','binary',false)")
-        sql("insert into carbontable values(2,false,'Xu','test',true)")
+        sql("insert into carbon_table values(1,true,'Bob','binary',false)")
+        sql("insert into carbon_table values(2,false,'Xu','test',true)")
 
-        val carbonResult = sql("SELECT * FROM carbontable")
+        val carbonResult = sql("SELECT * FROM carbon_table")
 
         carbonResult.collect().foreach { each =>
             if (1 == each.get(0)) {
@@ -564,12 +594,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
         }
 
         var exception = intercept[Exception] {
-            sql("UPDATE carbontable SET binaryField = 'binary2' WHERE id = 1").show()
+            sql("UPDATE carbon_table SET binaryField = 'binary2' WHERE id = 1").show()
         }
         assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting"))
 
         exception = intercept[Exception] {
-            sql("DELETE FROM carbontable WHERE id = 1").show()
+            sql("DELETE FROM carbon_table WHERE id = 1").show()
         }
         assert(exception.getMessage.contains("Operation not allowed: DELETE FROM"))
     }