You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/08 08:52:11 UTC
carbondata git commit: [CARBONDATA-2894] Add support for complex map
type through spark carbon file format API
Repository: carbondata
Updated Branches:
refs/heads/master f5c7a19b8 -> 68b359e15
[CARBONDATA-2894] Add support for complex map type through spark carbon file format API
This PR supports loading querying complex map type through spark carbon file format API.
This closes #2663
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/68b359e1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/68b359e1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/68b359e1
Branch: refs/heads/master
Commit: 68b359e156780cb92defdce3033ed5c2e1d7e744
Parents: f5c7a19
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Aug 27 19:17:21 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Sep 8 14:22:02 2018 +0530
----------------------------------------------------------------------
.../datasources/CarbonSparkDataSourceUtil.scala | 12 +-
.../datasources/SparkCarbonFileFormat.scala | 23 +++-
.../datasource/SparkCarbonDataSourceTest.scala | 126 ++++++++++++++++++-
.../carbondata/sdk/file/AvroCarbonWriter.java | 4 +-
.../sdk/file/CarbonWriterBuilder.java | 5 +-
.../org/apache/carbondata/sdk/file/Field.java | 59 +++++++++
6 files changed, 213 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 77c1dce..b097320 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField, StructType => CarbonStructType}
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -78,6 +78,10 @@ object CarbonSparkDataSourceUtil {
convertSparkToCarbonDataType(field.dataType)))
}
CarbonDataTypes.createStructType(carbonFields)
+ case MapType(keyType, valueType, _) =>
+ val keyDataType: CarbonDataType = convertSparkToCarbonDataType(keyType)
+ val valueDataType: CarbonDataType = convertSparkToCarbonDataType(valueType)
+ CarbonDataTypes.createMapType(keyDataType, valueDataType)
case NullType => CarbonDataTypes.NULL
case decimal: DecimalType =>
CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
@@ -196,11 +200,7 @@ object CarbonSparkDataSourceUtil {
dataSchema: StructType): CarbonLoadModel = {
val schema = new Schema(dataSchema.fields.map { field =>
val dataType = convertSparkToCarbonDataType(field.dataType)
- dataType match {
- case s: CarbonStructType =>
- new Field(field.name, s, s.getFields)
- case _ => new Field(field.name, dataType)
- }
+ new Field(field.name, dataType)
})
val builder = new CarbonWriterBuilder
builder.isTransactionalTable(false)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 406e2c9..a5e1b39 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUn
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
@@ -200,8 +200,10 @@ class SparkCarbonFileFormat extends FileFormat
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
- case s: ArrayType =>
- data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case a: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType))
+ case m: MapType =>
+ data(i) = extractMapData(row.getMap(i), m)
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case d: TimestampType =>
@@ -217,6 +219,15 @@ class SparkCarbonFileFormat extends FileFormat
data
}
+ private def extractMapData(data: AnyRef, mapType: MapType): ArrayObject = {
+ val mapData = data.asInstanceOf[MapData]
+ val keys = extractData(mapData.keyArray(), mapType.keyType)
+ val values = extractData(mapData.valueArray(), mapType.valueType)
+ new ArrayObject(keys.zip(values).map { case (key, value) =>
+ new StructObject(Array(key, value))
+ })
+ }
+
private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
dataType match {
case d: DateType =>
@@ -241,8 +252,10 @@ class SparkCarbonFileFormat extends FileFormat
data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
case s: StructType =>
data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
- case s: ArrayType =>
- data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+ case a: ArrayType =>
+ data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType))
+ case m: MapType =>
+ data(i) = extractMapData(row.getMap(i), m)
case d: DateType =>
data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
case d: TimestampType =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index dcc76d8..66c0224 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -148,7 +148,6 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
df.write
.format("parquet").saveAsTable("parquet_table")
- spark.sql("describe parquet_table").show(false)
spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:string>, number int) using carbon")
spark.sql("insert into carbon_table select * from parquet_table")
assert(spark.sql("select * from carbon_table").count() == 10)
@@ -212,6 +211,131 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists parquet_table")
}
+ test("test write with array type with value as nested map type") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Array(Map("b" -> "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 array<map<string,string>>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
+
+ test("test write with struct type with value as nested map type") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, ("a", Map("b" -> "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:map<string,string>>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("b" -> "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 map<string, string>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with Int data type as key") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map(99 -> "c"), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 map<int, string>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with value as nested map type") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("a" -> Map("b" -> "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 map<string, map<string, string>>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with value as nested struct type") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("a" -> ("b", "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 map<string, struct<a1:string, a2:string>>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
+
+ test("test write with map type with value as nested array type") {
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ import spark.implicits._
+ val df = spark.sparkContext.parallelize(1 to 10)
+ .map(x => ("a" + x % 10, Map("a" -> Array("b", "c")), x))
+ .toDF("c1", "c2", "number")
+
+ df.write
+ .format("parquet").saveAsTable("parquet_table")
+ spark.sql("create table carbon_table(c1 string, c2 map<string, array<string>>, number int) using carbon")
+ spark.sql("insert into carbon_table select * from parquet_table")
+ assert(spark.sql("select * from carbon_table").count() == 10)
+ TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+ spark.sql("drop table if exists carbon_table")
+ spark.sql("drop table if exists parquet_table")
+ }
test("test write using ddl and options") {
spark.sql("drop table if exists carbon_table")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index d1e936e..14dbe16 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -468,7 +468,7 @@ public class AvroCarbonWriter extends CarbonWriter {
case MAP:
// recursively get the sub fields
ArrayList<StructField> mapSubFields = new ArrayList<>();
- StructField mapField = prepareSubFields("val", childSchema);
+ StructField mapField = prepareSubFields(fieldName, childSchema);
if (null != mapField) {
// key value field will be wrapped inside a map struct field
StructField keyValueField = mapField.getChildren().get(0);
@@ -575,7 +575,7 @@ public class AvroCarbonWriter extends CarbonWriter {
keyValueFields.add(keyField);
keyValueFields.add(valueField);
StructField mapKeyValueField =
- new StructField(fieldName, DataTypes.createStructType(keyValueFields));
+ new StructField(fieldName + ".val", DataTypes.createStructType(keyValueFields));
// value dataType will be at position 1 in the fields
MapType mapType =
DataTypes.createMapType(DataTypes.STRING, mapKeyValueField.getDataType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 76dd7aa..56757e4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.MapType;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -673,8 +674,8 @@ public class CarbonWriterBuilder {
.addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
} else if (field.getDataType().getName().equalsIgnoreCase("MAP")) {
// Loop through the inner columns for MapType
- DataType mapType =
- DataTypes.createMapType(DataTypes.STRING, field.getChildren().get(0).getDataType());
+ DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(),
+ field.getChildren().get(0).getDataType());
tableSchemaBuilder
.addColumn(new StructField(field.getFieldName(), mapType), valIndex, false);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index 1c5ab52..6903200 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -17,14 +17,18 @@
package org.apache.carbondata.sdk.file;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.MapType;
import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
/**
@@ -130,6 +134,7 @@ public class Field {
public Field(String name, DataType type) {
this.name = name;
this.type = type;
+ initComplexTypeChildren();
}
/**
@@ -218,4 +223,58 @@ public class Field {
public void updateNameToLowerCase() {
this.name = name.toLowerCase();
}
+
+ private void initComplexTypeChildren() {
+ if (getDataType().isComplexType()) {
+ StructField subFields = prepareSubFields(getFieldName(), getDataType());
+ if (DataTypes.isArrayType(getDataType()) || DataTypes.isMapType(getDataType())) {
+ children = subFields.getChildren();
+ } else if (DataTypes.isStructType(getDataType())) {
+ children = ((StructType) subFields.getDataType()).getFields();
+ }
+ }
+ }
+
+ /**
+ * prepare sub fields for complex types
+ *
+ * @param fieldName column name
+ * @param dataType data type of column or it's children
+ * @return
+ */
+ private StructField prepareSubFields(String fieldName, DataType dataType) {
+ if (DataTypes.isArrayType(dataType)) {
+ List<StructField> arrayFields = new ArrayList<>();
+ StructField arrayField = prepareSubFields(fieldName, ((ArrayType) dataType).getElementType());
+ arrayFields.add(arrayField);
+ return new StructField(fieldName, DataTypes.createArrayType(arrayField.getDataType()),
+ arrayFields);
+ } else if (DataTypes.isStructType(dataType)) {
+ List<StructField> structFields = new ArrayList<>();
+ List<StructField> fields = ((StructType) dataType).getFields();
+ for (StructField field : fields) {
+ structFields.add(prepareSubFields(field.getFieldName(), field.getDataType()));
+ }
+ return new StructField(fieldName, DataTypes.createStructType(structFields), structFields);
+ } else if (DataTypes.isMapType(dataType)) {
+ // Internally Map<key, value> is stored as Array<struct<key, value>>. So the below method
+ // will convert a map type into similar field structure. The columnSchema will be formed
+ // as Map<Struct<key,value>>
+ List<StructField> mapFields = new ArrayList<>();
+ MapType mapType = (MapType) dataType;
+ // key is primitive type so type can be fetched directly
+ StructField keyField = new StructField(fieldName + ".key", mapType.getKeyType());
+ StructField valueField = prepareSubFields(fieldName + ".value", mapType.getValueType());
+ mapFields.add(keyField);
+ mapFields.add(valueField);
+ StructField field =
+ new StructField(fieldName + ".val", DataTypes.createStructType(mapFields));
+ MapType mapDataType = DataTypes.createMapType(keyField.getDataType(), field.getDataType());
+ List<StructField> mapStructField = new ArrayList<>();
+ mapStructField.add(field);
+ return new StructField(fieldName, mapDataType, mapStructField);
+ } else {
+ return new StructField(fieldName, dataType);
+ }
+ }
}