You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/12 07:07:43 UTC

[carbondata] branch master updated: [CARBONDATA-3408] CarbonSession partition support binary data type

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a113ce  [CARBONDATA-3408] CarbonSession partition support binary data type
0a113ce is described below

commit 0a113ce065c8fea0165b41f2aca104705036ba51
Author: xubo245 <xu...@huawei.com>
AuthorDate: Fri May 31 17:14:40 2019 +0800

    [CARBONDATA-3408] CarbonSession partition support binary data type
    
    CarbonSession partition support binary data type
    
    This closes #3251
---
 .../apache/carbondata/core/util/DataTypeUtil.java  |   7 +-
 docs/ddl-of-carbondata.md                          |   4 +-
 .../test/resources/binarystringdatawithHead.csv    |   4 +
 .../testsuite/binary/TestBinaryDataType.scala      | 202 ++++++++++++++++++++-
 .../loading/model/CarbonLoadModelBuilder.java      |   6 +
 .../processing/util/CarbonLoaderUtil.java          |   8 +-
 6 files changed, 223 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 7129f34..9aea579 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -478,7 +478,12 @@ public final class DataTypeUtil {
     } else if (actualDataType == DataTypes.TIMESTAMP) {
       return ByteUtil.toXorBytes((Long) dimensionValue);
     } else if (actualDataType == DataTypes.BINARY) {
-      return (byte[]) dimensionValue;
+      if (dimensionValue instanceof String) {
+        return ((String) dimensionValue).getBytes(
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      } else {
+        return (byte[]) dimensionValue;
+      }
     } else {
       // Default action for String/Varchar
       return ByteUtil.toBytes(dimensionValue.toString());
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 845d46d..387c46b 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -919,7 +919,7 @@ Users can specify which columns to include and exclude for local dictionary gene
   PARTITIONED BY (productCategory STRING, productBatch STRING)
   STORED AS carbondata
   ```
-   **NOTE:** Hive partition is not supported on complex data type columns and binary data type.
+   **NOTE:** Hive partition is not supported on complex data type columns.
 
 
 #### Show Partitions
@@ -962,7 +962,7 @@ Users can specify which columns to include and exclude for local dictionary gene
 
 ### CARBONDATA PARTITION(HASH,RANGE,LIST) -- Alpha feature, this partition feature does not support update and delete data.
 
-  The partition supports three type:(Hash,Range,List), similar to other system's partition features, CarbonData's partition feature can be used to improve query performance by filtering on the partition column. Partition feature doesn't support binary data type.
+  The partition supports three type:(Hash,Range,List), similar to other system's partition features, CarbonData's partition feature can be used to improve query performance by filtering on the partition column.
 
 ### Create Hash Partition Table
 
diff --git a/integration/spark-common-test/src/test/resources/binarystringdatawithHead.csv b/integration/spark-common-test/src/test/resources/binarystringdatawithHead.csv
new file mode 100644
index 0000000..4661807
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binarystringdatawithHead.csv
@@ -0,0 +1,4 @@
+id|label|name|autolabel|binaryfield
+2|false|2.png|true|binary
+3|false|3.png|false|1
+1|true|1.png|true|Hello world
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 89c89dc..15e3ee9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -22,8 +22,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
+
 import org.apache.commons.codec.binary.{Base64, Hex}
-import org.apache.spark.sql.Row
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -1349,8 +1351,206 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
             "Binary decoder only support Base64, Hex or no decode for string, don't support he"))
     }
 
+    test("insert into partition table") {
+        sql("DROP TABLE IF EXISTS hive_table")
+        sql("DROP TABLE IF EXISTS hive_table2")
+        sql("DROP TABLE IF EXISTS parquet_table")
+        sql("DROP TABLE IF EXISTS carbon_partition_table")
+
+        sql("set hive.exec.dynamic.partition.mode=strict")
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hive_table (
+               |    name STRING,
+               |    id string)
+               | PARTITIONED BY(photo binary)
+               | row format delimited fields terminated by '|'
+             """.stripMargin)
+
+        sql("INSERT INTO hive_table PARTITION(photo='binary') select 'a','b'");
+        sql("INSERT INTO hive_table PARTITION(photo=1) select 'a','b'");
+        checkAnswer(sql("select cast(photo as string) from hive_table"), Seq(Row("binary"), Row("1")));
+
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS hive_table2 (
+               |    name STRING,
+               |    id string)
+               | PARTITIONED BY(photo binary)
+             """.stripMargin)
+
+        sql("INSERT INTO hive_table2 PARTITION(photo='binary') select 'a','b'");
+        sql("INSERT INTO hive_table2 PARTITION(photo=1) select 'a','b'");
+        checkAnswer(sql("select cast(photo as string) from hive_table2"), Seq(Row("binary"), Row("1")));
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS parquet_table (
+               |    name STRING,
+               |    id string)
+               | PARTITIONED BY(photo binary)
+               | STORED AS PARQUET
+             """.stripMargin)
+
+        sql("INSERT INTO parquet_table PARTITION(photo='binary') select 'a','b'");
+        sql("INSERT INTO parquet_table PARTITION(photo=1) select 'a','b'");
+
+        sql("select cast(photo as string) from parquet_table").show()
+        //TODOļ¼š is it a bug in parquet?
+        //        checkAnswer(sql("select cast(photo as string) from parquet_table"), Seq(Row(),Row()));
+
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS carbon_partition_table (
+               |    name STRING,
+               |    id string)
+               | PARTITIONED BY(photo binary)
+               | STORED BY 'carbondata'
+             """.stripMargin)
+
+
+        sql("INSERT INTO carbon_partition_table PARTITION(photo='binary') select 'a','b'");
+        sql("INSERT INTO carbon_partition_table PARTITION(photo=1) select 'a','b'");
+        sql("select * from carbon_partition_table").show()
+        sql("select cast(photo as string) from carbon_partition_table").show()
+        checkAnswer(sql("select cast(photo as string) from carbon_partition_table"), Seq(Row("binary"), Row("1")))
+        checkAnswer(sql("select * from carbon_partition_table"), sql("select * from hive_table"))
+
+        val e = intercept[SparkException] {
+            sql("insert into hive_table select 'a','b','binary'");
+        }
+
+        assert(e.getMessage.contains("Dynamic partition strict mode requires at least one static partition column"))
+
+        val eInt = intercept[Exception] {
+            sql("insert into hive_table select 'a','b',1");
+        }
+
+        val e2 = intercept[SparkException] {
+            sql("insert into hive_table2 select 'a','b','binary'");
+        }
+
+        assert(e2.getMessage.contains("Dynamic partition strict mode requires at least one static partition column"))
+
+        val eInt2 = intercept[Exception] {
+            sql("insert into hive_table2 select 'a','b',1");
+        }
+
+        val e3 = intercept[SparkException] {
+            sql("insert into parquet_table select 'a','b','binary'");
+        }
+
+        assert(e3.getMessage.contains("Dynamic partition strict mode requires at least one static partition column"))
+
+        val eInt3 = intercept[Exception] {
+            sql("insert into parquet_table select 'a','b',1");
+        }
+
+        sql("insert into carbon_partition_table select 'a','b','binary'");
+        sql("insert into carbon_partition_table select 'a','b',1");
+
+        checkAnswer(sql("select cast(photo as string) from carbon_partition_table"),
+            Seq(Row("binary"), Row("1"), Row("binary"), Row("1")))
+
+        sql("select * from carbon_partition_table").show()
+
+        // set hive.exec.dynamic.partition.mode=nonstrict
+        sql("set hive.exec.dynamic.partition.mode=nonstrict")
+        sql("insert into hive_table select 'a','b','binary'");
+        val eInt11 = intercept[AnalysisException] {
+            sql("insert into hive_table select 'a','b',1");
+        }
+        assert(eInt11.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast "))
+
+        checkAnswer(sql("select cast(photo as string) from hive_table"),
+            Seq(Row("binary"), Row("1"), Row("binary")))
+
+        sql("insert into hive_table2 select 'a','b','binary'");
+        val eInt22 = intercept[AnalysisException] {
+            sql("insert into hive_table2 select 'a','b',1");
+        }
+        assert(eInt22.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast "))
+
+        checkAnswer(sql("select cast(photo as string) from hive_table2"),
+            Seq(Row("binary"), Row("1"), Row("binary")))
+
+        sql("insert into parquet_table select 'a','b','binary'");
+        val eInt32 = intercept[AnalysisException] {
+            sql("insert into parquet_table select 'a','b',1");
+        }
+        assert(eInt32.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast "))
+
+        //TODO: is it bug in parquet?
+        //        checkAnswer(sql("select cast(photo as string) from parquet_table"),
+        //            Seq(Row(),Row(),Row()))
+
+        sql("insert into carbon_partition_table select 'a','b','binary'");
+        sql("insert into carbon_partition_table select 'a','b',1");
+
+        checkAnswer(sql("select cast(photo as string) from carbon_partition_table"),
+            Seq(Row("binary"), Row("1"), Row("binary"), Row("1"), Row("binary"), Row("1")))
+    }
+
+    test("Create table and load data with binary column for partition") {
+        sql("DROP TABLE IF EXISTS binaryTable")
+        sql(
+            s"""
+               | CREATE TABLE IF NOT EXISTS binaryTable (
+               |    id int,
+               |    label boolean,
+               |    name string,
+               |    autoLabel boolean)
+               | PARTITIONED BY(binaryfield binary)
+               | STORED BY 'carbondata'
+               | TBLPROPERTIES('SORT_COLUMNS'='','PARTITION_TYPE'='HASH','NUM_PARTITIONS'='9')
+             """.stripMargin)
+        sql(
+            s"""
+               | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdatawithHead.csv'
+               | INTO TABLE binaryTable
+               | partition(binaryfield)
+               | OPTIONS('header'='true','DELIMITER'='|')
+             """.stripMargin)
+
+        val result = sql("desc formatted binaryTable").collect()
+        var flag = false
+        result.foreach { each =>
+            if ("binary".equals(each.get(1))) {
+                flag = true
+            }
+        }
+        assert(flag)
+
+        checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+        try {
+            val df = sql("SELECT * FROM binaryTable").collect()
+            assert(3 == df.length)
+
+            df.foreach { each =>
+                assert(5 == each.length)
+                if (2 == each.get(0)) {
+                    assert("binary".equals(new String(each.getAs[Array[Byte]](4))))
+                } else if (1 == each.get(0)) {
+                    assert("Hello world".equals(new String(each.getAs[Array[Byte]](4))))
+                } else if (3 == each.get(0)) {
+                    assert("1".equals(new String(each.getAs[Array[Byte]](4))))
+                } else {
+                    assert(false)
+                }
+            }
+
+        } catch {
+            case e: Exception =>
+                e.printStackTrace()
+                assert(false)
+        }
+    }
+
     override def afterAll: Unit = {
         sql("DROP TABLE IF EXISTS binaryTable")
         sql("DROP TABLE IF EXISTS hiveTable")
+        sql("DROP TABLE IF EXISTS hive_table")
     }
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 907f3ec..052db05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -40,7 +40,9 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 import org.apache.carbondata.processing.util.TableOptionConstant;
 
 import org.apache.commons.lang.StringUtils;
@@ -434,6 +436,10 @@ public class CarbonLoadModelBuilder {
 
   private void validateAndSetBinaryDecoder(CarbonLoadModel carbonLoadModel) {
     String binaryDecoder = carbonLoadModel.getBinaryDecoder();
+    if (!CarbonLoaderUtil.isValidBinaryDecoder(binaryDecoder)) {
+      throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
+          "Hex or no decode for string, don't support " + binaryDecoder);
+    }
     if (StringUtils.isBlank(binaryDecoder)) {
       binaryDecoder = CarbonProperties.getInstance().getProperty(
           CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1588005..fe91099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -426,10 +426,10 @@ public final class CarbonLoaderUtil {
   }
 
   public static boolean isValidBinaryDecoder(String binaryDecoderChar) {
-    return binaryDecoderChar.equalsIgnoreCase(
-        CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64) ||
-        binaryDecoderChar.equalsIgnoreCase(
-            CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX) ||
+    return CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64.equalsIgnoreCase(
+        binaryDecoderChar) ||
+        CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX.equalsIgnoreCase(
+            binaryDecoderChar) ||
         StringUtils.isBlank(binaryDecoderChar);
   }