You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/12 07:07:43 UTC
[carbondata] branch master updated: [CARBONDATA-3408] CarbonSession
partition support binary data type
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0a113ce [CARBONDATA-3408] CarbonSession partition support binary data type
0a113ce is described below
commit 0a113ce065c8fea0165b41f2aca104705036ba51
Author: xubo245 <xu...@huawei.com>
AuthorDate: Fri May 31 17:14:40 2019 +0800
[CARBONDATA-3408] CarbonSession partition support binary data type
CarbonSession partition support binary data type
This closes #3251
---
.../apache/carbondata/core/util/DataTypeUtil.java | 7 +-
docs/ddl-of-carbondata.md | 4 +-
.../test/resources/binarystringdatawithHead.csv | 4 +
.../testsuite/binary/TestBinaryDataType.scala | 202 ++++++++++++++++++++-
.../loading/model/CarbonLoadModelBuilder.java | 6 +
.../processing/util/CarbonLoaderUtil.java | 8 +-
6 files changed, 223 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 7129f34..9aea579 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -478,7 +478,12 @@ public final class DataTypeUtil {
} else if (actualDataType == DataTypes.TIMESTAMP) {
return ByteUtil.toXorBytes((Long) dimensionValue);
} else if (actualDataType == DataTypes.BINARY) {
- return (byte[]) dimensionValue;
+ if (dimensionValue instanceof String) {
+ return ((String) dimensionValue).getBytes(
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ } else {
+ return (byte[]) dimensionValue;
+ }
} else {
// Default action for String/Varchar
return ByteUtil.toBytes(dimensionValue.toString());
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 845d46d..387c46b 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -919,7 +919,7 @@ Users can specify which columns to include and exclude for local dictionary gene
PARTITIONED BY (productCategory STRING, productBatch STRING)
STORED AS carbondata
```
- **NOTE:** Hive partition is not supported on complex data type columns and binary data type.
+ **NOTE:** Hive partition is not supported on complex data type columns.
#### Show Partitions
@@ -962,7 +962,7 @@ Users can specify which columns to include and exclude for local dictionary gene
### CARBONDATA PARTITION(HASH,RANGE,LIST) -- Alpha feature, this partition feature does not support update and delete data.
- The partition supports three type:(Hash,Range,List), similar to other system's partition features, CarbonData's partition feature can be used to improve query performance by filtering on the partition column. Partition feature doesn't support binary data type.
+ The partition supports three type:(Hash,Range,List), similar to other system's partition features, CarbonData's partition feature can be used to improve query performance by filtering on the partition column.
### Create Hash Partition Table
diff --git a/integration/spark-common-test/src/test/resources/binarystringdatawithHead.csv b/integration/spark-common-test/src/test/resources/binarystringdatawithHead.csv
new file mode 100644
index 0000000..4661807
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/binarystringdatawithHead.csv
@@ -0,0 +1,4 @@
+id|label|name|autolabel|binaryfield
+2|false|2.png|true|binary
+3|false|3.png|false|1
+1|true|1.png|true|Hello world
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 89c89dc..15e3ee9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -22,8 +22,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
+
import org.apache.commons.codec.binary.{Base64, Hex}
-import org.apache.spark.sql.Row
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -1349,8 +1351,206 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
"Binary decoder only support Base64, Hex or no decode for string, don't support he"))
}
+ test("insert into partition table") {
+ sql("DROP TABLE IF EXISTS hive_table")
+ sql("DROP TABLE IF EXISTS hive_table2")
+ sql("DROP TABLE IF EXISTS parquet_table")
+ sql("DROP TABLE IF EXISTS carbon_partition_table")
+
+ sql("set hive.exec.dynamic.partition.mode=strict")
+
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS hive_table (
+ | name STRING,
+ | id string)
+ | PARTITIONED BY(photo binary)
+ | row format delimited fields terminated by '|'
+ """.stripMargin)
+
+ sql("INSERT INTO hive_table PARTITION(photo='binary') select 'a','b'");
+ sql("INSERT INTO hive_table PARTITION(photo=1) select 'a','b'");
+ checkAnswer(sql("select cast(photo as string) from hive_table"), Seq(Row("binary"), Row("1")));
+
+
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS hive_table2 (
+ | name STRING,
+ | id string)
+ | PARTITIONED BY(photo binary)
+ """.stripMargin)
+
+ sql("INSERT INTO hive_table2 PARTITION(photo='binary') select 'a','b'");
+ sql("INSERT INTO hive_table2 PARTITION(photo=1) select 'a','b'");
+ checkAnswer(sql("select cast(photo as string) from hive_table2"), Seq(Row("binary"), Row("1")));
+
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS parquet_table (
+ | name STRING,
+ | id string)
+ | PARTITIONED BY(photo binary)
+ | STORED AS PARQUET
+ """.stripMargin)
+
+ sql("INSERT INTO parquet_table PARTITION(photo='binary') select 'a','b'");
+ sql("INSERT INTO parquet_table PARTITION(photo=1) select 'a','b'");
+
+ sql("select cast(photo as string) from parquet_table").show()
+ //TODOļ¼ is it a bug in parquet?
+ // checkAnswer(sql("select cast(photo as string) from parquet_table"), Seq(Row(),Row()));
+
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS carbon_partition_table (
+ | name STRING,
+ | id string)
+ | PARTITIONED BY(photo binary)
+ | STORED BY 'carbondata'
+ """.stripMargin)
+
+
+ sql("INSERT INTO carbon_partition_table PARTITION(photo='binary') select 'a','b'");
+ sql("INSERT INTO carbon_partition_table PARTITION(photo=1) select 'a','b'");
+ sql("select * from carbon_partition_table").show()
+ sql("select cast(photo as string) from carbon_partition_table").show()
+ checkAnswer(sql("select cast(photo as string) from carbon_partition_table"), Seq(Row("binary"), Row("1")))
+ checkAnswer(sql("select * from carbon_partition_table"), sql("select * from hive_table"))
+
+ val e = intercept[SparkException] {
+ sql("insert into hive_table select 'a','b','binary'");
+ }
+
+ assert(e.getMessage.contains("Dynamic partition strict mode requires at least one static partition column"))
+
+ val eInt = intercept[Exception] {
+ sql("insert into hive_table select 'a','b',1");
+ }
+
+ val e2 = intercept[SparkException] {
+ sql("insert into hive_table2 select 'a','b','binary'");
+ }
+
+ assert(e2.getMessage.contains("Dynamic partition strict mode requires at least one static partition column"))
+
+ val eInt2 = intercept[Exception] {
+ sql("insert into hive_table2 select 'a','b',1");
+ }
+
+ val e3 = intercept[SparkException] {
+ sql("insert into parquet_table select 'a','b','binary'");
+ }
+
+ assert(e3.getMessage.contains("Dynamic partition strict mode requires at least one static partition column"))
+
+ val eInt3 = intercept[Exception] {
+ sql("insert into parquet_table select 'a','b',1");
+ }
+
+ sql("insert into carbon_partition_table select 'a','b','binary'");
+ sql("insert into carbon_partition_table select 'a','b',1");
+
+ checkAnswer(sql("select cast(photo as string) from carbon_partition_table"),
+ Seq(Row("binary"), Row("1"), Row("binary"), Row("1")))
+
+ sql("select * from carbon_partition_table").show()
+
+ // set hive.exec.dynamic.partition.mode=nonstrict
+ sql("set hive.exec.dynamic.partition.mode=nonstrict")
+ sql("insert into hive_table select 'a','b','binary'");
+ val eInt11 = intercept[AnalysisException] {
+ sql("insert into hive_table select 'a','b',1");
+ }
+ assert(eInt11.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast "))
+
+ checkAnswer(sql("select cast(photo as string) from hive_table"),
+ Seq(Row("binary"), Row("1"), Row("binary")))
+
+ sql("insert into hive_table2 select 'a','b','binary'");
+ val eInt22 = intercept[AnalysisException] {
+ sql("insert into hive_table2 select 'a','b',1");
+ }
+ assert(eInt22.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast "))
+
+ checkAnswer(sql("select cast(photo as string) from hive_table2"),
+ Seq(Row("binary"), Row("1"), Row("binary")))
+
+ sql("insert into parquet_table select 'a','b','binary'");
+ val eInt32 = intercept[AnalysisException] {
+ sql("insert into parquet_table select 'a','b',1");
+ }
+ assert(eInt32.getMessage.contains("cannot resolve 'CAST(`1` AS BINARY)' due to data type mismatch: cannot cast "))
+
+ //TODO: is it bug in parquet?
+ // checkAnswer(sql("select cast(photo as string) from parquet_table"),
+ // Seq(Row(),Row(),Row()))
+
+ sql("insert into carbon_partition_table select 'a','b','binary'");
+ sql("insert into carbon_partition_table select 'a','b',1");
+
+ checkAnswer(sql("select cast(photo as string) from carbon_partition_table"),
+ Seq(Row("binary"), Row("1"), Row("binary"), Row("1"), Row("binary"), Row("1")))
+ }
+
+ test("Create table and load data with binary column for partition") {
+ sql("DROP TABLE IF EXISTS binaryTable")
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS binaryTable (
+ | id int,
+ | label boolean,
+ | name string,
+ | autoLabel boolean)
+ | PARTITIONED BY(binaryfield binary)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='','PARTITION_TYPE'='HASH','NUM_PARTITIONS'='9')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$resourcesPath/binarystringdatawithHead.csv'
+ | INTO TABLE binaryTable
+ | partition(binaryfield)
+ | OPTIONS('header'='true','DELIMITER'='|')
+ """.stripMargin)
+
+ val result = sql("desc formatted binaryTable").collect()
+ var flag = false
+ result.foreach { each =>
+ if ("binary".equals(each.get(1))) {
+ flag = true
+ }
+ }
+ assert(flag)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+ try {
+ val df = sql("SELECT * FROM binaryTable").collect()
+ assert(3 == df.length)
+
+ df.foreach { each =>
+ assert(5 == each.length)
+ if (2 == each.get(0)) {
+ assert("binary".equals(new String(each.getAs[Array[Byte]](4))))
+ } else if (1 == each.get(0)) {
+ assert("Hello world".equals(new String(each.getAs[Array[Byte]](4))))
+ } else if (3 == each.get(0)) {
+ assert("1".equals(new String(each.getAs[Array[Byte]](4))))
+ } else {
+ assert(false)
+ }
+ }
+
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+ assert(false)
+ }
+ }
+
override def afterAll: Unit = {
sql("DROP TABLE IF EXISTS binaryTable")
sql("DROP TABLE IF EXISTS hiveTable")
+ sql("DROP TABLE IF EXISTS hive_table")
}
}
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 907f3ec..052db05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -40,7 +40,9 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
import org.apache.carbondata.processing.util.TableOptionConstant;
import org.apache.commons.lang.StringUtils;
@@ -434,6 +436,10 @@ public class CarbonLoadModelBuilder {
private void validateAndSetBinaryDecoder(CarbonLoadModel carbonLoadModel) {
String binaryDecoder = carbonLoadModel.getBinaryDecoder();
+ if (!CarbonLoaderUtil.isValidBinaryDecoder(binaryDecoder)) {
+ throw new CarbonDataLoadingException("Binary decoder only support Base64, " +
+ "Hex or no decode for string, don't support " + binaryDecoder);
+ }
if (StringUtils.isBlank(binaryDecoder)) {
binaryDecoder = CarbonProperties.getInstance().getProperty(
CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1588005..fe91099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -426,10 +426,10 @@ public final class CarbonLoaderUtil {
}
public static boolean isValidBinaryDecoder(String binaryDecoderChar) {
- return binaryDecoderChar.equalsIgnoreCase(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64) ||
- binaryDecoderChar.equalsIgnoreCase(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX) ||
+ return CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_BASE64.equalsIgnoreCase(
+ binaryDecoderChar) ||
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_HEX.equalsIgnoreCase(
+ binaryDecoderChar) ||
StringUtils.isBlank(binaryDecoderChar);
}