You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/08 08:52:11 UTC

carbondata git commit: [CARBONDATA-2894] Add support for complex map type through spark carbon file format API

Repository: carbondata
Updated Branches:
  refs/heads/master f5c7a19b8 -> 68b359e15


[CARBONDATA-2894] Add support for complex map type through spark carbon file format API

This PR supports loading querying complex map type through spark carbon file format API.

This closes #2663


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/68b359e1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/68b359e1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/68b359e1

Branch: refs/heads/master
Commit: 68b359e156780cb92defdce3033ed5c2e1d7e744
Parents: f5c7a19
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Aug 27 19:17:21 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Sep 8 14:22:02 2018 +0530

----------------------------------------------------------------------
 .../datasources/CarbonSparkDataSourceUtil.scala |  12 +-
 .../datasources/SparkCarbonFileFormat.scala     |  23 +++-
 .../datasource/SparkCarbonDataSourceTest.scala  | 126 ++++++++++++++++++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |   4 +-
 .../sdk/file/CarbonWriterBuilder.java           |   5 +-
 .../org/apache/carbondata/sdk/file/Field.java   |  59 +++++++++
 6 files changed, 213 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
index 77c1dce..b097320 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField, StructType => CarbonStructType}
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -78,6 +78,10 @@ object CarbonSparkDataSourceUtil {
               convertSparkToCarbonDataType(field.dataType)))
         }
         CarbonDataTypes.createStructType(carbonFields)
+      case MapType(keyType, valueType, _) =>
+        val keyDataType: CarbonDataType = convertSparkToCarbonDataType(keyType)
+        val valueDataType: CarbonDataType = convertSparkToCarbonDataType(valueType)
+        CarbonDataTypes.createMapType(keyDataType, valueDataType)
       case NullType => CarbonDataTypes.NULL
       case decimal: DecimalType =>
         CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
@@ -196,11 +200,7 @@ object CarbonSparkDataSourceUtil {
       dataSchema: StructType): CarbonLoadModel = {
     val schema = new Schema(dataSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
-      dataType match {
-        case s: CarbonStructType =>
-          new Field(field.name, s, s.getFields)
-        case _ => new Field(field.name, dataType)
-      }
+      new Field(field.name, dataType)
     })
     val builder = new CarbonWriterBuilder
     builder.isTransactionalTable(false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 406e2c9..a5e1b39 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUn
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types._
@@ -200,8 +200,10 @@ class SparkCarbonFileFormat extends FileFormat
               data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
             case s: StructType =>
               data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
-            case s: ArrayType =>
-              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+            case a: ArrayType =>
+              data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType))
+            case m: MapType =>
+              data(i) = extractMapData(row.getMap(i), m)
             case d: DateType =>
               data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
             case d: TimestampType =>
@@ -217,6 +219,15 @@ class SparkCarbonFileFormat extends FileFormat
       data
     }
 
+    private def extractMapData(data: AnyRef, mapType: MapType): ArrayObject = {
+      val mapData = data.asInstanceOf[MapData]
+      val keys = extractData(mapData.keyArray(), mapType.keyType)
+      val values = extractData(mapData.valueArray(), mapType.valueType)
+      new ArrayObject(keys.zip(values).map { case (key, value) =>
+        new StructObject(Array(key, value))
+      })
+    }
+
     private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
       dataType match {
         case d: DateType =>
@@ -241,8 +252,10 @@ class SparkCarbonFileFormat extends FileFormat
               data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
             case s: StructType =>
               data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
-            case s: ArrayType =>
-              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+            case a: ArrayType =>
+              data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType))
+            case m: MapType =>
+              data(i) = extractMapData(row.getMap(i), m)
             case d: DateType =>
               data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
             case d: TimestampType =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index dcc76d8..66c0224 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -148,7 +148,6 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
 
     df.write
       .format("parquet").saveAsTable("parquet_table")
-    spark.sql("describe parquet_table").show(false)
     spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:string>, number int) using carbon")
     spark.sql("insert into carbon_table select * from parquet_table")
     assert(spark.sql("select * from carbon_table").count() == 10)
@@ -212,6 +211,131 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     spark.sql("drop table if exists parquet_table")
   }
 
+  test("test write with array type with value as nested map type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array(Map("b" -> "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 array<map<string,string>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with struct type with value as nested map type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, ("a", Map("b" -> "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:map<string,string>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("b" -> "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 map<string, string>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with Int data type as key") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map(99 -> "c"), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 map<int, string>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with value as nested map type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("a" -> Map("b" -> "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 map<string, map<string, string>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with value as nested struct type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("a" -> ("b", "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 map<string, struct<a1:string, a2:string>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test write with map type with value as nested array type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Map("a" -> Array("b", "c")), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 map<string, array<string>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
 
   test("test write using ddl and options") {
     spark.sql("drop table if exists carbon_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index d1e936e..14dbe16 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -468,7 +468,7 @@ public class AvroCarbonWriter extends CarbonWriter {
       case MAP:
         // recursively get the sub fields
         ArrayList<StructField> mapSubFields = new ArrayList<>();
-        StructField mapField = prepareSubFields("val", childSchema);
+        StructField mapField = prepareSubFields(fieldName, childSchema);
         if (null != mapField) {
           // key value field will be wrapped inside a map struct field
           StructField keyValueField = mapField.getChildren().get(0);
@@ -575,7 +575,7 @@ public class AvroCarbonWriter extends CarbonWriter {
           keyValueFields.add(keyField);
           keyValueFields.add(valueField);
           StructField mapKeyValueField =
-              new StructField(fieldName, DataTypes.createStructType(keyValueFields));
+              new StructField(fieldName + ".val", DataTypes.createStructType(keyValueFields));
           // value dataType will be at position 1 in the fields
           MapType mapType =
               DataTypes.createMapType(DataTypes.STRING, mapKeyValueField.getDataType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 76dd7aa..56757e4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.MapType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -673,8 +674,8 @@ public class CarbonWriterBuilder {
                 .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false);
           } else if (field.getDataType().getName().equalsIgnoreCase("MAP")) {
             // Loop through the inner columns for MapType
-            DataType mapType =
-                DataTypes.createMapType(DataTypes.STRING, field.getChildren().get(0).getDataType());
+            DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(),
+                field.getChildren().get(0).getDataType());
             tableSchemaBuilder
                 .addColumn(new StructField(field.getFieldName(), mapType), valIndex, false);
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index 1c5ab52..6903200 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -17,14 +17,18 @@
 
 package org.apache.carbondata.sdk.file;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.MapType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
@@ -130,6 +134,7 @@ public class Field {
   public Field(String name, DataType type) {
     this.name = name;
     this.type = type;
+    initComplexTypeChildren();
   }
 
   /**
@@ -218,4 +223,58 @@ public class Field {
   public void updateNameToLowerCase() {
     this.name = name.toLowerCase();
   }
+
+  private void initComplexTypeChildren() {
+    if (getDataType().isComplexType()) {
+      StructField subFields = prepareSubFields(getFieldName(), getDataType());
+      if (DataTypes.isArrayType(getDataType()) || DataTypes.isMapType(getDataType())) {
+        children = subFields.getChildren();
+      } else if (DataTypes.isStructType(getDataType())) {
+        children = ((StructType) subFields.getDataType()).getFields();
+      }
+    }
+  }
+
+  /**
+   * prepare sub fields for complex types
+   *
+   * @param fieldName column name
+   * @param dataType data type of column or it's children
+   * @return
+   */
+  private StructField prepareSubFields(String fieldName, DataType dataType) {
+    if (DataTypes.isArrayType(dataType)) {
+      List<StructField> arrayFields = new ArrayList<>();
+      StructField arrayField = prepareSubFields(fieldName, ((ArrayType) dataType).getElementType());
+      arrayFields.add(arrayField);
+      return new StructField(fieldName, DataTypes.createArrayType(arrayField.getDataType()),
+          arrayFields);
+    } else if (DataTypes.isStructType(dataType)) {
+      List<StructField> structFields = new ArrayList<>();
+      List<StructField> fields = ((StructType) dataType).getFields();
+      for (StructField field : fields) {
+        structFields.add(prepareSubFields(field.getFieldName(), field.getDataType()));
+      }
+      return new StructField(fieldName, DataTypes.createStructType(structFields), structFields);
+    } else if (DataTypes.isMapType(dataType)) {
+      // Internally Map<key, value> is stored as Array<struct<key, value>>. So the below method
+      // will convert a map type into similar field structure. The columnSchema will be formed
+      // as Map<Struct<key,value>>
+      List<StructField> mapFields = new ArrayList<>();
+      MapType mapType = (MapType) dataType;
+      // key is primitive type so type can be fetched directly
+      StructField keyField = new StructField(fieldName + ".key", mapType.getKeyType());
+      StructField valueField = prepareSubFields(fieldName + ".value", mapType.getValueType());
+      mapFields.add(keyField);
+      mapFields.add(valueField);
+      StructField field =
+          new StructField(fieldName + ".val", DataTypes.createStructType(mapFields));
+      MapType mapDataType = DataTypes.createMapType(keyField.getDataType(), field.getDataType());
+      List<StructField> mapStructField = new ArrayList<>();
+      mapStructField.add(field);
+      return new StructField(fieldName, mapDataType, mapStructField);
+    } else {
+      return new StructField(fieldName, dataType);
+    }
+  }
 }