You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/07 11:06:58 UTC
carbondata git commit: [CARBONDATA-2430][SDK] Reshuffling of Columns
given by user in SDK.
Repository: carbondata
Updated Branches:
refs/heads/master 531ecdf3f -> b1c85fa55
[CARBONDATA-2430][SDK] Reshuffling of Columns given by user in SDK.
Reshuffling of Columns given by the user in SDK. Order should be Sort COlumns -> Dimension -> Complex --> Measure
This closes #2261
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b1c85fa5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b1c85fa5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b1c85fa5
Branch: refs/heads/master
Commit: b1c85fa55eeca1a98b61117c6b46df9a28d60bca
Parents: 531ecdf
Author: sounakr <so...@gmail.com>
Authored: Wed May 2 21:29:57 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 7 16:36:46 2018 +0530
----------------------------------------------------------------------
.../core/metadata/datatype/ArrayType.java | 4 +
.../core/metadata/datatype/StructField.java | 15 +
.../core/metadata/schema/table/CarbonTable.java | 26 --
.../schema/table/TableSchemaBuilder.java | 72 ++-
.../examples/DataFrameComplexTypeExample.scala | 147 +++++-
.../TestNonTransactionalCarbonTable.scala | 459 +++++++++++++++++--
.../apache/spark/util/SparkTypeConverter.scala | 58 +--
.../carbondata/sdk/file/AvroCarbonWriter.java | 48 +-
.../sdk/file/CarbonWriterBuilder.java | 104 +++--
.../org/apache/carbondata/sdk/file/Field.java | 4 +-
.../sdk/file/AvroCarbonWriterTest.java | 214 +++++++++
11 files changed, 963 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
index c30e21c..c327d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
@@ -30,4 +30,8 @@ public class ArrayType extends DataType {
public boolean isComplexType() {
return true;
}
+
+ public DataType getElementType() {
+ return elementType;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
index efdc8e2..bfca057 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.metadata.datatype;
import java.io.Serializable;
+import java.util.List;
public class StructField implements Serializable {
@@ -27,9 +28,19 @@ public class StructField implements Serializable {
private DataType dataType;
+ private List<StructField> children;
+
public StructField(String fieldName, DataType dataType) {
this.fieldName = fieldName;
this.dataType = dataType;
+ this.children = null;
+ }
+
+
+ public StructField(String fieldName, DataType dataType, List<StructField> children) {
+ this.fieldName = fieldName;
+ this.dataType = dataType;
+ this.children = children;
}
public DataType getDataType() {
@@ -39,4 +50,8 @@ public class StructField implements Serializable {
public String getFieldName() {
return fieldName;
}
+
+ public List<StructField> getChildren() {
+ return children;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 4178d8a..cf5660f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -37,8 +37,6 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -49,7 +47,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -60,7 +57,6 @@ import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.format.FileHeader;
/**
* Mapping class for Carbon actual table
@@ -223,28 +219,6 @@ public class CarbonTable implements Serializable {
}
}
- public static CarbonTable buildFromDataFile(
- String tableName, String tablePath, String filePath) throws IOException {
- CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(filePath);
- FileHeader fileHeader = carbonHeaderReader.readHeader();
- TableSchemaBuilder builder = TableSchema.builder();
- ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
- for (org.apache.carbondata.format.ColumnSchema column : fileHeader.getColumn_schema()) {
- ColumnSchema columnSchema = schemaConverter.fromExternalToWrapperColumnSchema(column);
- builder.addColumn(
- new StructField(columnSchema.getColumnName(), columnSchema.getDataType()), false);
- }
-
- TableSchema tableSchema = builder.tableName(tableName).build();
- TableInfo tableInfo = new TableInfo();
- tableInfo.setFactTable(tableSchema);
- tableInfo.setTablePath(tablePath);
- tableInfo.setDatabaseName("default");
- tableInfo.setTableUniqueName(
- CarbonTable.buildUniqueName("default", tableSchema.getTableName()));
- return buildFromTableInfo(tableInfo);
- }
-
public static CarbonTable buildFromTablePath(String tableName, String tablePath,
boolean isTransactionalTable) throws IOException {
if (isTransactionalTable) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 731fea8..42bb958 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalType;
@@ -46,7 +47,11 @@ public class TableSchemaBuilder {
private List<ColumnSchema> sortColumns = new LinkedList<>();
- private List<ColumnSchema> otherColumns = new LinkedList<>();
+ private List<ColumnSchema> dimension = new LinkedList<>();
+
+ private List<ColumnSchema> complex = new LinkedList<>();
+
+ private List<ColumnSchema> measures = new LinkedList<>();
private int blockSize;
@@ -86,7 +91,9 @@ public class TableSchemaBuilder {
schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
schema.setSchemaEvalution(schemaEvol);
List<ColumnSchema> allColumns = new LinkedList<>(sortColumns);
- allColumns.addAll(otherColumns);
+ allColumns.addAll(dimension);
+ allColumns.addAll(complex);
+ allColumns.addAll(measures);
schema.setListOfColumns(allColumns);
Map<String, String> property = new HashMap<>();
@@ -108,21 +115,36 @@ public class TableSchemaBuilder {
}
public ColumnSchema addColumn(StructField field, boolean isSortColumn) {
+ return addColumn(field, null, isSortColumn, false);
+ }
+
+ private ColumnSchema addColumn(StructField field, String parentName, boolean isSortColumn,
+ boolean isComplexChild) {
Objects.requireNonNull(field);
checkRepeatColumnName(field);
ColumnSchema newColumn = new ColumnSchema();
- newColumn.setColumnName(field.getFieldName());
+ if (parentName != null) {
+ newColumn.setColumnName(parentName + "." + field.getFieldName());
+ } else {
+ newColumn.setColumnName(field.getFieldName());
+ }
newColumn.setDataType(field.getDataType());
if (isSortColumn ||
field.getDataType() == DataTypes.STRING ||
field.getDataType() == DataTypes.DATE ||
field.getDataType() == DataTypes.TIMESTAMP ||
- DataTypes.isStructType(field.getDataType())) {
+ field.getDataType().isComplexType() ||
+ (isComplexChild)) {
newColumn.setDimensionColumn(true);
} else {
newColumn.setDimensionColumn(false);
}
- newColumn.setSchemaOrdinal(ordinal++);
+ if (!isComplexChild) {
+ newColumn.setSchemaOrdinal(ordinal++);
+ } else {
+ // child column should not be counted for schema ordinal
+ newColumn.setSchemaOrdinal(-1);
+ }
newColumn.setColumnar(true);
// For NonTransactionalTable, multiple sdk writer output with same column name can be placed in
@@ -135,7 +157,11 @@ public class TableSchemaBuilder {
newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
if (field.getDataType().isComplexType()) {
- newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size());
+ if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+ newColumn.setNumberOfChild(1);
+ } else {
+ newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size());
+ }
}
if (DataTypes.isDecimal(field.getDataType())) {
DecimalType decimalType = (DecimalType) field.getDataType();
@@ -143,17 +169,29 @@ public class TableSchemaBuilder {
newColumn.setScale(decimalType.getScale());
}
if (!isSortColumn) {
- otherColumns.add(newColumn);
+ if (!newColumn.isDimensionColumn()) {
+ measures.add(newColumn);
+ } else if (DataTypes.isStructType(field.getDataType()) ||
+ DataTypes.isArrayType(field.getDataType()) || isComplexChild) {
+ complex.add(newColumn);
+ } else {
+ dimension.add(newColumn);
+ }
}
if (newColumn.isDimensionColumn()) {
newColumn.setUseInvertedIndex(true);
}
if (field.getDataType().isComplexType()) {
- if (((StructType) field.getDataType()).getFields().size() > 0) {
+ String parentFieldName = newColumn.getColumnName();
+ if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+ addColumn(new StructField("val",
+ ((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true);
+ } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
+ && ((StructType) field.getDataType()).getFields().size() > 0) {
// This field has children.
List<StructField> fields = ((StructType) field.getDataType()).getFields();
- for (int i = 0; i < fields.size(); i ++) {
- addColumn(fields.get(i), false);
+ for (int i = 0; i < fields.size(); i++) {
+ addColumn(fields.get(i), parentFieldName, false, true);
}
}
}
@@ -169,7 +207,19 @@ public class TableSchemaBuilder {
throw new IllegalArgumentException("column name already exists");
}
}
- for (ColumnSchema column : otherColumns) {
+ for (ColumnSchema column : dimension) {
+ if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
+ throw new IllegalArgumentException("column name already exists");
+ }
+ }
+
+ for (ColumnSchema column : complex) {
+ if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
+ throw new IllegalArgumentException("column name already exists");
+ }
+ }
+
+ for (ColumnSchema column : measures) {
if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
throw new IllegalArgumentException("column name already exists");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
index 34b32f4..0abf5c5 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
@@ -22,8 +22,19 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.carbondata.examples.util.ExampleUtils
case class StructElement(school: Array[String], age: Int)
+
+case class StructElement1(school: Array[String], school1: Array[String], age: Int)
+
case class ComplexTypeData(id: Int, name: String, city: String, salary: Float, file: StructElement)
+case class ComplexTypeData1(id: Int,
+ name: String,
+ city: String,
+ salary: Float,
+ file: StructElement1)
+
+case class ComplexTypeData2(id: Int, name: String, city: String, salary: Float, file: Array[String])
+
// scalastyle:off println
object DataFrameComplexTypeExample {
@@ -34,16 +45,21 @@ object DataFrameComplexTypeExample {
spark.close()
}
- def exampleBody(spark : SparkSession): Unit = {
- val complexTableName = s"complex_type_table"
+ def exampleBody(spark: SparkSession): Unit = {
+ val complexTypeDictionaryTableName = s"complex_type_dictionary_table"
+ val complexTypeNoDictionaryTableName = s"complex_type_noDictionary_table"
+ val complexTypeNoDictionaryTableNameArray = s"complex_type_noDictionary_array_table"
import spark.implicits._
// drop table if exists previously
- spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }")
+ spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeDictionaryTableName }")
+ spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }")
+ spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }")
+
spark.sql(
s"""
- | CREATE TABLE ${ complexTableName }(
+ | CREATE TABLE ${ complexTypeDictionaryTableName }(
| id INT,
| name STRING,
| city STRING,
@@ -56,6 +72,37 @@ object DataFrameComplexTypeExample {
| 'dictionary_include'='city')
| """.stripMargin)
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ complexTypeNoDictionaryTableNameArray }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file array<string>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'sort_columns'='name',
+ | 'dictionary_include'='city')
+ | """.stripMargin)
+
+
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ complexTypeNoDictionaryTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | file struct<school:array<string>, school1:array<string>, age:int>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'sort_columns'='name')
+ | """.stripMargin)
+
+
val sc = spark.sparkContext
// generate data
val df = sc.parallelize(Seq(
@@ -66,30 +113,108 @@ object DataFrameComplexTypeExample {
ComplexTypeData(3, "index_3", "city_3", 30000.0f,
StructElement(Array("struct_31", "struct_32"), 30))
)).toDF
+
+ // generate data
+ val df2 = sc.parallelize(Seq(
+ ComplexTypeData2(1, "index_1", "city_1", 10000.0f, Array("struct_11", "struct_12")),
+ ComplexTypeData2(2, "index_2", "city_2", 20000.0f, Array("struct_21", "struct_22")),
+ ComplexTypeData2(3, "index_3", "city_3", 30000.0f, Array("struct_31", "struct_32"))
+ )).toDF
+
+ // generate data
+ val df1 = sc.parallelize(Seq(
+ ComplexTypeData1(1, "index_1", "city_1", 10000.0f,
+ StructElement1(Array("struct_11", "struct_12"), Array("struct_11", "struct_12"), 10)),
+ ComplexTypeData1(2, "index_2", "city_2", 20000.0f,
+ StructElement1(Array("struct_21", "struct_22"), Array("struct_11", "struct_12"), 20)),
+ ComplexTypeData1(3, "index_3", "city_3", 30000.0f,
+ StructElement1(Array("struct_31", "struct_32"), Array("struct_11", "struct_12"), 30))
+ )).toDF
+
+
df.printSchema()
df.write
.format("carbondata")
- .option("tableName", complexTableName)
+ .option("tableName", complexTypeDictionaryTableName)
+ .mode(SaveMode.Append)
+ .save()
+
+ df1.printSchema()
+ df1.write
+ .format("carbondata")
+ .option("tableName", complexTypeNoDictionaryTableName)
.mode(SaveMode.Append)
.save()
- spark.sql(s"select count(*) from ${ complexTableName }").show(100, truncate = false)
- spark.sql(s"select * from ${ complexTableName } order by id desc").show(300, truncate = false)
+ df2.printSchema()
+ df2.write
+ .format("carbondata")
+ .option("tableName", complexTypeNoDictionaryTableNameArray)
+ .mode(SaveMode.Append)
+ .save()
+
+
+ spark.sql(s"select count(*) from ${ complexTypeDictionaryTableName }")
+ .show(100, truncate = false)
+
+ spark.sql(s"select * from ${ complexTypeDictionaryTableName } order by id desc")
+ .show(300, truncate = false)
+
+ spark.sql(s"select * " +
+ s"from ${ complexTypeDictionaryTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ spark.sql(s"select * " +
+ s"from ${ complexTypeDictionaryTableName } " +
+ s"where id > 10 limit 100").show(100, truncate = false)
+
+ // show segments
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeDictionaryTableName }").show(false)
+
+ // drop table
+ spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeDictionaryTableName }")
+
+
+ spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableName }")
+ .show(100, truncate = false)
+
+ spark.sql(s"select * from ${ complexTypeNoDictionaryTableName } order by id desc")
+ .show(300, truncate = false)
+
+ spark.sql(s"select * " +
+ s"from ${ complexTypeNoDictionaryTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ spark.sql(s"select * " +
+ s"from ${ complexTypeNoDictionaryTableName } " +
+ s"where id > 10 limit 100").show(100, truncate = false)
+
+ // show segments
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableName }").show(false)
+
+ // drop table
+ spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }")
+
+ spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableNameArray }")
+ .show(100, truncate = false)
+
+ spark.sql(s"select * from ${ complexTypeNoDictionaryTableNameArray } order by id desc")
+ .show(300, truncate = false)
spark.sql(s"select * " +
- s"from ${ complexTableName } " +
+ s"from ${ complexTypeNoDictionaryTableNameArray } " +
s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
spark.sql(s"select * " +
- s"from ${ complexTableName } " +
+ s"from ${ complexTypeNoDictionaryTableNameArray } " +
s"where id > 10 limit 100").show(100, truncate = false)
// show segments
- spark.sql(s"SHOW SEGMENTS FOR TABLE ${complexTableName}").show(false)
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableNameArray }").show(false)
// drop table
- spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }")
+ spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }")
}
}
// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index d8e5374..fabcd02 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,13 +17,13 @@
package org.apache.carbondata.spark.testsuite.createTable
-import java.io.{File, FileFilter, IOException}
+import java.io.{File, FileFilter}
import java.util
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
-import org.junit.{Assert, Test}
+import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -35,6 +35,7 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.avro
import org.apache.commons.lang.CharEncoding
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
@@ -254,7 +255,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
- sql("select * from t1").show(200,false)
sql("insert into sdkOutputTable select * from t1").show(200,false)
checkAnswer(sql(s"""select * from sdkOutputTable where age = 12"""),
@@ -545,7 +545,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
- sql("select * from sdkOutputTable").show(false)
}
assert(exception.getMessage()
.contains("Operation not allowed: Invalid table path provided:"))
@@ -687,49 +686,20 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
- def buildAvroTestData(rows: Int, options: util.Map[String, String]): Any = {
- FileUtils.deleteDirectory(new File(writerPath))
- val newAvroSchema = "{" + " \"type\" : \"record\", " + " \"name\" : \"userInfo\", " +
- " \"namespace\" : \"my.example\", " +
- " \"fields\" : [{\"name\" : \"username\", " +
- " \"type\" : \"string\", " + " \"default\" : \"NONE\"}, " +
- " {\"name\" : \"age\", " + " \"type\" : \"int\", " +
- " \"default\" : -1}, " + "{\"name\" : \"address\", " +
- " \"type\" : { " + " \"type\" : \"record\", " +
- " \"name\" : \"mailing_address\", " + " \"fields\" : [ {" +
- " \"name\" : \"street\", " +
- " \"type\" : \"string\", " +
- " \"default\" : \"NONE\"}, { " + " \"name\" : \"city\", " +
- " \"type\" : \"string\", " + " \"default\" : \"NONE\"}, " +
- " ]}, " + " \"default\" : {} " + " } " + "}"
- val mySchema = "{" + " \"name\": \"address\", " + " \"type\": \"record\", " +
- " \"fields\": [ " +
- " { \"name\": \"name\", \"type\": \"string\"}, " +
- " { \"name\": \"age\", \"type\": \"int\"}, " + " { " +
- " \"name\": \"address\", " + " \"type\": { " +
- " \"type\" : \"record\", " + " \"name\" : \"my_address\", " +
- " \"fields\" : [ " +
- " {\"name\": \"street\", \"type\": \"string\"}, " +
- " {\"name\": \"city\", \"type\": \"string\"} " + " ]} " + " } " +
- "] " + "}"
- val json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", " +
- "\"city\":\"bang\"}}"
+ private def WriteFilesWithAvroWriter(rows: Int,
+ mySchema: String,
+ json: String,
+ fields: Array[Field]) = {
// conversion to GenericData.Record
- val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
+ val nn = new avro.Schema.Parser().parse(mySchema)
val converter = new JsonAvroConverter
val record = converter
.convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
- val fields = new Array[Field](3)
- fields(0) = new Field("name", DataTypes.STRING)
- fields(1) = new Field("age", DataTypes.STRING)
- // fields[1] = new Field("age", DataTypes.INT);
- val fld = new util.ArrayList[StructField]
- fld.add(new StructField("street", DataTypes.STRING))
- fld.add(new StructField("city", DataTypes.STRING))
- fields(2) = new Field("address", "struct", fld)
+
try {
val writer = CarbonWriter.builder.withSchema(new Schema(fields))
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+ .outputPath(writerPath).isTransactionalTable(false)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
var i = 0
while (i < rows) {
writer.write(record)
@@ -745,28 +715,419 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}
}
- def buildAvroTestDataSingleFile(): Any = {
+ // struct type test
+ def buildAvroTestDataStruct(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ val mySchema =
+ """
+ |{"name": "address",
+ | "type": "record",
+ | "fields": [
+ | { "name": "name", "type": "string"},
+ | { "name": "age", "type": "int"},
+ | { "name": "address", "type": {
+ | "type" : "record", "name" : "my_address",
+ | "fields" : [
+ | {"name": "street", "type": "string"},
+ | {"name": "city", "type": "string"}]}}
+ |]}
+ """.stripMargin
+
+ val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
+
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fields(2) = new Field("address", "struct", fld)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataStructType(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataStruct(3, null)
+ }
+
+ // array type test
+ def buildAvroTestDataArrayType(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "address",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "name": "street",
+ | "type": "string"
+ | }
+ | }
+ | }
+ | ]
+ | }
+ """.stripMargin
+
+ val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
+
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ // fields[1] = new Field("age", DataTypes.INT);
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fields(2) = new Field("address", "array", fld)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataSingleFileArrayType(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataArrayType(3, null)
+ }
+
+ // struct with array type test
+ def buildAvroTestDataStructWithArrayType(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """
+ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | { "name": "name", "type": "string"},
+ | { "name": "age", "type": "int"},
+ | {
+ | "name": "address",
+ | "type": {
+ | "type" : "record",
+ | "name" : "my_address",
+ | "fields" : [
+ | {"name": "street", "type": "string"},
+ | {"name": "city", "type": "string"}
+ | ]}
+ | },
+ | {"name" :"doorNum",
+ | "type" : {
+ | "type" :"array",
+ | "items":{
+ | "name" :"EachdoorNums",
+ | "type" : "int",
+ | "default":-1
+ | }}
+ | }]}
+ """.stripMargin
+
+ val json =
+ """ {"name":"bob", "age":10,
+ |"address" : {"street":"abc", "city":"bang"},
+ |"doorNum" : [1,2,3,4]}""".stripMargin
+
+ val fields = new Array[Field](4)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fields(2) = new Field("address", "struct", fld)
+ val fld1 = new util.ArrayList[StructField]
+ fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+ fields(3) = new Field("doorNum", "array", fld1)
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataBothStructArrayType(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataStructWithArrayType(3, null)
+ }
+
+
+ // ArrayOfStruct test
+ def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "doorNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "type": "record",
+ | "name": "my_address",
+ | "fields": [
+ | {
+ | "name": "street",
+ | "type": "string"
+ | },
+ | {
+ | "name": "city",
+ | "type": "string"
+ | }
+ | ]
+ | }
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+ val json =
+ """ {"name":"bob","age":10,"doorNum" :
+ |[{"street":"abc","city":"city1"},
+ |{"street":"def","city":"city2"},
+ |{"street":"ghi","city":"city3"},
+ |{"street":"jkl","city":"city4"}]} """.stripMargin
+
+
+
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+ fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataArrayOfStructType(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestDataArrayOfStruct(3, null)
+ }
+
+
+ // StructOfArray test
+ def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+
+ val mySchema = """ {
+ | "name": "address",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "name",
+ | "type": "string"
+ | },
+ | {
+ | "name": "age",
+ | "type": "int"
+ | },
+ | {
+ | "name": "address",
+ | "type": {
+ | "type": "record",
+ | "name": "my_address",
+ | "fields": [
+ | {
+ | "name": "street",
+ | "type": "string"
+ | },
+ | {
+ | "name": "city",
+ | "type": "string"
+ | },
+ | {
+ | "name": "doorNum",
+ | "type": {
+ | "type": "array",
+ | "items": {
+ | "name": "EachdoorNums",
+ | "type": "int",
+ | "default": -1
+ | }
+ | }
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |} """.stripMargin
+
+ val json = """ {
+ | "name": "bob",
+ | "age": 10,
+ | "address": {
+ | "street": "abc",
+ | "city": "bang",
+ | "doorNum": [
+ | 1,
+ | 2,
+ | 3,
+ | 4
+ | ]
+ | }
+ |} """.stripMargin
+
+
+
+
+
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.INT)
+
+ val fld1 = new util.ArrayList[StructField]
+ fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+
+ val fld2 = new util.ArrayList[StructField]
+ fld2.add(new StructField("street", DataTypes.STRING))
+ fld2.add(new StructField("city", DataTypes.STRING))
+ fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
+
+ fields(2) = new Field("address","struct",fld2)
+ WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+ }
+
+ def buildAvroTestDataStructOfArrayType(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
- buildAvroTestData(3, null)
+ buildAvroTestDataStructOfArray(3, null)
}
- test("Read sdk writer Avro output ") {
- buildAvroTestDataSingleFile()
+
+ test("Read sdk writer Avro output Record Type") {
+ buildAvroTestDataStructType()
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
sql(
s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
|'$writerPath' """.stripMargin)
- sql("select * from sdkOutputTable").show(false)
checkAnswer(sql("select * from sdkOutputTable"), Seq(
- Row("bob", "10", Row("abc","bang")),
- Row("bob", "10", Row("abc","bang")),
- Row("bob", "10", Row("abc","bang"))))
+ Row("bob", 10, Row("abc","bang")),
+ Row("bob", 10, Row("abc","bang")),
+ Row("bob", 10, Row("abc","bang"))))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).listFiles().length > 0)
+ }
+
+ test("Read sdk writer Avro output Array Type") {
+ buildAvroTestDataSingleFileArrayType()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(200,false)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))),
+ Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))),
+ Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg")))))
sql("DROP TABLE sdkOutputTable")
// drop table should not delete the files
assert(new File(writerPath).exists())
}
+
+ test("Read sdk writer Avro output with both Array and Struct Type") {
+ buildAvroTestDataBothStructArrayType()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ /*
+ *-+----+---+----------+------------+
+ |name|age|address |doorNum |
+ +----+---+----------+------------+
+ |bob |10 |[abc,bang]|[1, 2, 3, 4]|
+ |bob |10 |[abc,bang]|[1, 2, 3, 4]|
+ |bob |10 |[abc,bang]|[1, 2, 3, 4]|
+ +----+---+----------+------------+
+ * */
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)),
+ Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)),
+ Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4))))
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ }
+
+
+ test("Read sdk writer Avro output with Array of struct") {
+ buildAvroTestDataArrayOfStructType()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+ /*
+ +----+---+----------------------------------------------------+
+ |name|age|doorNum |
+ +----+---+----------------------------------------------------+
+ |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]|
+ |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]|
+ |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]|
+ +----+---+----------------------------------------------------+ */
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ }
+
+
+ // Struct of array
+ test("Read sdk writer Avro output with struct of Array") {
+ buildAvroTestDataStructOfArrayType()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ // TODO: Add a validation
+ /*
+ +----+---+-------------------------------------------------------+
+ |name|age|address |
+ +----+---+-------------------------------------------------------+
+ |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)] |
+ |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)] |
+ |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)] |
+ +----+---+-------------------------------------------------------+*/
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
index fe11b98..65210b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -97,32 +97,16 @@ private[spark] object SparkTypeConverter {
def getStructChildren(table: CarbonTable, dimName: String): String = {
table.getChildren(dimName).asScala.map(childDim => {
childDim.getDataType.getName.toLowerCase match {
- case "array" => if (table.isTransactionalTable) {s"${
+ case "array" => s"${
childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(table, childDim.getColName) }>" } else {
- // For non Transactional table the Childrends of Struct Columns
- // are not appended with their parent.
- s"${
- childDim.getColName
- }:array<${ getArrayChildren(table, childDim.getColName) }>"
- }
- case "struct" => if (table.isTransactionalTable) { s"${
+ }:array<${ getArrayChildren(table, childDim.getColName) }>"
+ case "struct" => s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ table.getChildren(childDim.getColName)
.asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
- }>"} else {
- s"${
- childDim.getColName
- }:struct<${ table.getChildren(childDim.getColName)
- .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
- }>"
- }
- case dType => if (table.isTransactionalTable) {
- s"${ childDim.getColName
+ }>"
+ case dType => s"${ childDim.getColName
.substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
- } else {
- s"${ childDim.getColName}:${ addDecimalScaleAndPrecision(childDim, dType) }"
- }
}
}).mkString(",")
}
@@ -139,31 +123,13 @@ private[spark] object SparkTypeConverter {
private def recursiveMethod(
table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
childDim.getDataType.getName.toLowerCase match {
- case "array" => if (table.isTransactionalTable) {
- s"${
- childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(table, childDim.getColName) }>"
- } else {
- // For non Transactional table the Childrends of Struct Columns
- // are not appended with their parent.
- s"${
- childDim.getColName
- }:array<${ getArrayChildren(table, childDim.getColName) }>"
- }
- case "struct" => if (table.isTransactionalTable) {
- s"${
- childDim.getColName.substring(dimName.length + 1)
- }:struct<${ getStructChildren(table, childDim.getColName) }>"
- } else {
- s"${
- childDim.getColName
- }:struct<${ getStructChildren(table, childDim.getColName) }>"
- }
- case dType => if (table.isTransactionalTable) {
- s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
- } else {
- s"${ childDim.getColName }:${ dType }"
- }
+ case "array" => s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:array<${ getArrayChildren(table, childDim.getColName) }>"
+ case "struct" => s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:struct<${ getStructChildren(table, childDim.getColName) }>"
+ case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 8f1994a..bc2e9db 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.sdk.file;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -69,16 +70,16 @@ class AvroCarbonWriter extends CarbonWriter {
avroSchema = avroRecord.getSchema();
}
List<Schema.Field> fields = avroSchema.getFields();
- Object [] csvField = new String[fields.size()];
+ Object [] csvField = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
- csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i));
+ csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0);
}
return csvField;
}
- private String avroFieldToString(Schema.Field fieldType, Object fieldValue) {
+ private String avroFieldToObject(Schema.Field avroField, Object fieldValue, int delimiterLevel) {
StringBuilder out = new StringBuilder();
- Schema.Type type = fieldType.schema().getType();
+ Schema.Type type = avroField.schema().getType();
switch (type) {
case BOOLEAN:
case INT:
@@ -89,22 +90,47 @@ class AvroCarbonWriter extends CarbonWriter {
out.append(fieldValue.toString());
break;
case RECORD:
- List<Schema.Field> fields = fieldType.schema().getFields();
+ List<Schema.Field> fields = avroField.schema().getFields();
String delimiter = null;
- for (int i = 0; i < fields.size(); i ++) {
- if (i == 0) {
+ delimiterLevel ++;
+ for (int i = 0; i < fields.size(); i++) {
+ if (delimiterLevel == 1) {
delimiter = "$";
- } else {
+ } else if (delimiterLevel > 1) {
delimiter = ":";
}
if (i != (fields.size() - 1)) {
- out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)))
- .append(delimiter);
+ out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
+ delimiterLevel)).append(delimiter);
+ } else {
+ out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
+ delimiterLevel));
+ }
+ }
+ break;
+ case ARRAY:
+ int size = ((ArrayList) fieldValue).size();
+ String delimiterArray = null;
+ delimiterLevel ++;
+ if (delimiterLevel == 1) {
+ delimiterArray = "$";
+ } else if (delimiterLevel > 1) {
+ delimiterArray = ":";
+ }
+
+ for (int i = 0; i < size; i++) {
+ if (i != size - 1) {
+ out.append(avroFieldToObject(
+ new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+ ((ArrayList) fieldValue).get(i), delimiterLevel)).append(delimiterArray);
} else {
- out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)));
+ out.append(avroFieldToObject(
+ new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+ ((ArrayList) fieldValue).get(i), delimiterLevel));
}
}
break;
+
default:
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 68bc3ab..397f151 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -333,6 +333,23 @@ public class CarbonWriterBuilder {
return new AvroCarbonWriter(loadModel);
}
+ private void setCsvHeader(CarbonLoadModel model) {
+ Field[] fields = schema.getFields();
+ StringBuilder builder = new StringBuilder();
+ String[] columns = new String[fields.length];
+ int i = 0;
+ for (Field field : fields) {
+ if (null != field) {
+ builder.append(field.getFieldName());
+ builder.append(",");
+ columns[i++] = field.getFieldName();
+ }
+ }
+ String header = builder.toString();
+ model.setCsvHeader(header.substring(0, header.length() - 1));
+ model.setCsvHeaderColumns(columns);
+ }
+
private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionException {
// build CarbonTable using schema
CarbonTable table = buildCarbonTable();
@@ -368,7 +385,7 @@ public class CarbonWriterBuilder {
for (Field field : schema.getFields()) {
if (null != field) {
if (field.getDataType() == DataTypes.STRING ||
- field.getDataType() == DataTypes.DATE ||
+ field.getDataType() == DataTypes.DATE ||
field.getDataType() == DataTypes.TIMESTAMP) {
sortColumnsList.add(field.getFieldName());
}
@@ -380,30 +397,9 @@ public class CarbonWriterBuilder {
sortColumnsList = Arrays.asList(sortColumns);
}
ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
- for (Field field : schema.getFields()) {
- if (null != field) {
- if (field.getChildren() != null && field.getChildren().size() > 0) {
- // Loop through the inner columns and for a StructData
- List<StructField> structFieldsArray =
- new ArrayList<StructField>(field.getChildren().size());
- String parentName = field.getFieldName();
- for (StructField childFld : field.getChildren()) {
- structFieldsArray.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
- }
- DataType complexType = DataTypes.createStructType(structFieldsArray);
- tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
- } else {
- int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
- ColumnSchema columnSchema = tableSchemaBuilder
- .addColumn(new StructField(field.getFieldName(), field.getDataType()),
- isSortColumn > -1);
- if (isSortColumn > -1) {
- columnSchema.setSortColumn(true);
- sortColumnsSchemaList[isSortColumn] = columnSchema;
- }
- }
- }
- }
+ Field[] fields = schema.getFields();
+ buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList);
+
tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
String tableName;
String dbName;
@@ -416,16 +412,56 @@ public class CarbonWriterBuilder {
}
TableSchema schema = tableSchemaBuilder.build();
schema.setTableName(tableName);
- CarbonTable table = CarbonTable.builder()
- .tableName(schema.getTableName())
- .databaseName(dbName)
- .tablePath(path)
- .tableSchema(schema)
- .isTransactionalTable(isTransactionalTable)
- .build();
+ CarbonTable table =
+ CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path)
+ .tableSchema(schema).isTransactionalTable(isTransactionalTable).build();
return table;
}
+ private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
+ List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) {
+ for (Field field : fields) {
+ if (null != field) {
+ int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
+ if (isSortColumn > -1) {
+ // unsupported types for ("array", "struct", "double", "float", "decimal")
+ if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT
+ || DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType()) {
+ throw new RuntimeException(
+ " sort columns not supported for " + "array, struct, double, float, decimal ");
+ }
+ }
+
+ if (field.getChildren() != null && field.getChildren().size() > 0) {
+ if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+ // Loop through the inner columns and for a StructData
+ DataType complexType =
+ DataTypes.createArrayType(field.getChildren().get(0).getDataType());
+ tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
+ } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
+ // Loop through the inner columns and for a StructData
+ List<StructField> structFieldsArray =
+ new ArrayList<StructField>(field.getChildren().size());
+ for (StructField childFld : field.getChildren()) {
+ structFieldsArray
+ .add(new StructField(childFld.getFieldName(), childFld.getDataType()));
+ }
+ DataType complexType = DataTypes.createStructType(structFieldsArray);
+ tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
+ }
+ } else {
+ ColumnSchema columnSchema = tableSchemaBuilder
+ .addColumn(new StructField(field.getFieldName(), field.getDataType()),
+ isSortColumn > -1);
+ columnSchema.setSortColumn(true);
+ if (isSortColumn > -1) {
+ sortColumnsSchemaList[isSortColumn] = columnSchema;
+ }
+ }
+ }
+ }
+ }
+
/**
* Save the schema of the {@param table} to {@param persistFilePath}
* @param table table object containing schema
@@ -465,6 +501,8 @@ public class CarbonWriterBuilder {
options = new HashMap<>();
}
CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
- return builder.build(options, UUID, taskNo);
+ CarbonLoadModel build = builder.build(options, UUID, taskNo);
+ setCsvHeader(build);
+ return build;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index 677047b..0db3bc5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -104,7 +104,7 @@ public class Field {
} else if (type.equalsIgnoreCase("double")) {
this.type = DataTypes.DOUBLE;
} else if (type.equalsIgnoreCase("array")) {
- this.type = DataTypes.createStructType(fields);
+ this.type = DataTypes.createArrayType(fields.get(0).getDataType());
} else if (type.equalsIgnoreCase("struct")) {
this.type = DataTypes.createStructType(fields);
}
@@ -113,6 +113,8 @@ public class Field {
}
}
+
+
public Field(String name, DataType type, List<StructField> fields) {
this.name = name;
this.type = type;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index ed3f2f1..105fb6d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -20,10 +20,13 @@ package org.apache.carbondata.sdk.file;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.datatype.StructType;
@@ -37,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
+import scala.Array;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
import org.apache.avro.Schema;
@@ -285,4 +289,214 @@ public class AvroCarbonWriterTest {
FileUtils.deleteDirectory(new File(path));
}
+
+ @Test
+ public void testWriteNestedRecordWithMeasure() throws IOException {
+ FileUtils.deleteDirectory(new File(path));
+
+ String mySchema =
+ "{" +
+ " \"name\": \"address\", " +
+ " \"type\": \"record\", " +
+ " \"fields\": [ " +
+ " { \"name\": \"name\", \"type\": \"string\"}, " +
+ " { \"name\": \"age\", \"type\": \"int\"}, " +
+ " { " +
+ " \"name\": \"address\", " +
+ " \"type\": { " +
+ " \"type\" : \"record\", " +
+ " \"name\" : \"my_address\", " +
+ " \"fields\" : [ " +
+ " {\"name\": \"street\", \"type\": \"string\"}, " +
+ " {\"name\": \"city\", \"type\": \"string\"} " +
+ " ]} " +
+ " } " +
+ "] " +
+ "}";
+
+ String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}";
+
+
+ // conversion to GenericData.Record
+ Schema nn = new Schema.Parser().parse(mySchema);
+ JsonAvroConverter converter = new JsonAvroConverter();
+ GenericData.Record record = converter.convertToGenericDataRecord(
+ json.getBytes(CharEncoding.UTF_8), nn);
+
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("name1", DataTypes.STRING);
+ // fields[1] = new Field("age", DataTypes.INT);
+ List fld = new ArrayList<StructField>();
+ fld.add(new StructField("street", DataTypes.STRING));
+ fld.add(new StructField("city", DataTypes.STRING));
+ fields[2] = new Field("address", "struct", fld);
+
+ try {
+ CarbonWriter writer = CarbonWriter.builder()
+ .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+ .outputPath(path)
+ .isTransactionalTable(true)
+ .buildWriterForAvroInput();
+
+ for (int i = 0; i < 100; i++) {
+ writer.write(record);
+ }
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ Assert.assertTrue(segmentFolder.exists());
+
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertEquals(1, dataFiles.length);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+
+ private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns)
+ throws UnsupportedEncodingException, IOException, InvalidLoadOptionException {
+ Field[] fields = new Field[4];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("name1", DataTypes.STRING);
+ // fields[1] = new Field("age", DataTypes.INT);
+ List fld = new ArrayList<StructField>();
+ fld.add(new StructField("street", DataTypes.STRING));
+ fld.add(new StructField("city", DataTypes.STRING));
+ fields[2] = new Field("address", "struct", fld);
+ List fld1 = new ArrayList<StructField>();
+ fld1.add(new StructField("eachDoorNum", DataTypes.INT));
+ fields[3] = new Field("doorNum","array",fld1);
+
+ // conversion to GenericData.Record
+ Schema nn = new Schema.Parser().parse(mySchema);
+ JsonAvroConverter converter = new JsonAvroConverter();
+ GenericData.Record record = converter.convertToGenericDataRecord(
+ json.getBytes(CharEncoding.UTF_8), nn);
+
+ try {
+ CarbonWriter writer = CarbonWriter.builder()
+ .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+ .outputPath(path)
+ .isTransactionalTable(true).sortBy(sortColumns)
+ .buildWriterForAvroInput();
+
+ for (int i = 0; i < 100; i++) {
+ writer.write(record);
+ }
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ @Test
+ public void testWriteComplexRecord() throws IOException, InvalidLoadOptionException {
+ FileUtils.deleteDirectory(new File(path));
+
+ String mySchema =
+ "{" +
+ " \"name\": \"address\", " +
+ " \"type\": \"record\", " +
+ " \"fields\": [ " +
+ " { \"name\": \"name\", \"type\": \"string\"}, " +
+ " { \"name\": \"age\", \"type\": \"int\"}, " +
+ " { " +
+ " \"name\": \"address\", " +
+ " \"type\": { " +
+ " \"type\" : \"record\", " +
+ " \"name\" : \"my_address\", " +
+ " \"fields\" : [ " +
+ " {\"name\": \"street\", \"type\": \"string\"}, " +
+ " {\"name\": \"city\", \"type\": \"string\"} " +
+ " ]} " +
+ " }, " +
+ " {\"name\" :\"doorNum\", " +
+ " \"type\" : { " +
+ " \"type\" :\"array\", " +
+ " \"items\":{ " +
+ " \"name\" :\"EachdoorNums\", " +
+ " \"type\" : \"int\", " +
+ " \"default\":-1} " +
+ " } " +
+ " }] " +
+ "}";
+
+ String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, "
+ + " \"doorNum\" : [1,2,3,4]}";
+
+ WriteAvroComplexData(mySchema, json, null);
+
+ File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ Assert.assertTrue(segmentFolder.exists());
+
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertEquals(1, dataFiles.length);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+
+ @Test
+ public void testWriteComplexRecordWithSortColumns() throws IOException {
+ FileUtils.deleteDirectory(new File(path));
+
+ String mySchema =
+ "{" +
+ " \"name\": \"address\", " +
+ " \"type\": \"record\", " +
+ " \"fields\": [ " +
+ " { \"name\": \"name\", \"type\": \"string\"}, " +
+ " { \"name\": \"age\", \"type\": \"int\"}, " +
+ " { " +
+ " \"name\": \"address\", " +
+ " \"type\": { " +
+ " \"type\" : \"record\", " +
+ " \"name\" : \"my_address\", " +
+ " \"fields\" : [ " +
+ " {\"name\": \"street\", \"type\": \"string\"}, " +
+ " {\"name\": \"city\", \"type\": \"string\"} " +
+ " ]} " +
+ " }, " +
+ " {\"name\" :\"doorNum\", " +
+ " \"type\" : { " +
+ " \"type\" :\"array\", " +
+ " \"items\":{ " +
+ " \"name\" :\"EachdoorNums\", " +
+ " \"type\" : \"int\", " +
+ " \"default\":-1} " +
+ " } " +
+ " }] " +
+ "}";
+
+ String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, "
+ + " \"doorNum\" : [1,2,3,4]}";
+
+ try {
+ WriteAvroComplexData(mySchema, json, new String[] { "doorNum" });
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+
+
}