You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:27 UTC

[25/45] carbondata git commit: [CARBONDATA-2979] select count fails when carbondata file is written through SDK and read through sparkfileformat for complex datatype map(struct->array->map)

[CARBONDATA-2979] select count fails when carbondata file is written through SDK and read through sparkfileformat for complex datatype map(struct->array->map)

Problem
Select query failed issue for map type when data is loaded using avro SDK and external table using carbon file format is used to query the data

Analysis
When data is loaded through Avro SDK which has a schema of type struct<array>, fieldName was hard coded to val because of which during query the schema written in the file footer and schema inferred for the external table had a mismatch which lead to failure.

Solution
Instead of hard coding the field value as val use the given field name in the schema

This closes #2774


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/682160fa
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/682160fa
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/682160fa

Branch: refs/heads/branch-1.5
Commit: 682160fa1bbde5f13c8a28e0114d3f18e5ffaf79
Parents: e9a198a
Author: manishgupta88 <to...@gmail.com>
Authored: Thu Sep 27 18:02:34 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Oct 3 19:57:50 2018 +0530

----------------------------------------------------------------------
 .../datasource/SparkCarbonDataSourceTest.scala  | 63 +++++++++++++++++++-
 .../sql/carbondata/datasource/TestUtil.scala    | 56 ++++++++++++++++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   | 11 ++--
 3 files changed, 122 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/682160fa/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 3be8cb3..37677d0 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -1117,11 +1117,11 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
   }
 
   private def createParquetTable {
-    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+    val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2")
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path"))
     spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
               s"string," +
-              s"salary long, floatField float, bytefield byte) using parquet location " +
-              s"'$warehouse1/../warehouse2'")
+              s"salary long, floatField float, bytefield byte) using parquet location '$path'")
     (0 to 10).foreach {
       i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " +
                      s"'address$i', ${i*100}, $i.$i, '$i'")
@@ -1181,6 +1181,63 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     }
   }
 
+  def buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath: String, rows: Int): Unit = {
+    FileFactory.deleteAllFilesOfDir(new File(writerPath))
+    val mySchema =
+      """
+        |{
+        |  "name": "address",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "name",
+        |      "type": "string"
+        |    },
+        |    {
+        |      "name": "age",
+        |      "type": "int"
+        |    },
+        |    {
+        |      "name": "structRecord",
+        |      "type": {
+        |        "type": "record",
+        |        "name": "my_address",
+        |        "fields": [
+        |          {
+        |            "name": "street",
+        |            "type": "string"
+        |          },
+        |          {
+        |            "name": "houseDetails",
+        |            "type": {
+        |               "type": "array",
+        |               "items": {
+        |                   "name": "memberDetails",
+        |                   "type": "map",
+        |                   "values": "string"
+        |                }
+        |             }
+        |          }
+        |        ]
+        |      }
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    val json = """ {"name":"bob", "age":10, "structRecord": {"street":"street1", "houseDetails": [{"101": "Rahul", "102": "Pawan"}]}} """.stripMargin
+    TestUtil.WriteFilesWithAvroWriter(writerPath, rows, mySchema, json)
+  }
+
+  test("test external table with struct type with value as nested struct<array<map>> type") {
+    val writerPath: String = FileFactory.getUpdatedFilePath(warehouse1 + "/sdk1")
+    val rowCount = 3
+    buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount)
+    spark.sql("drop table if exists carbon_external")
+    spark.sql(s"create table carbon_external using carbon location '$writerPath'")
+    assert(spark.sql("select * from carbon_external").count() == rowCount)
+    spark.sql("drop table if exists carbon_external")
+  }
+
   test("test byte and float for multiple pages") {
     val path = new File(warehouse1+"/sdk1").getAbsolutePath
     FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/682160fa/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
index b9185aa..f2285d6 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala
@@ -16,17 +16,23 @@
  */
 package org.apache.spark.sql.carbondata.datasource
 
-import java.io.File
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream}
 
 import scala.collection.JavaConverters._
 
+import org.apache.avro
+import org.apache.avro.file.DataFileWriter
+import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
+import org.apache.avro.io.{DecoderFactory, Encoder}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.util.sideBySide
+import org.junit.Assert
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file.CarbonWriter
 
 object TestUtil {
 
@@ -134,4 +140,52 @@ object TestUtil {
     }
   }
 
+  def WriteFilesWithAvroWriter(writerPath: String,
+      rows: Int,
+      mySchema: String,
+      json: String) = {
+    // conversion to GenericData.Record
+    val nn = new avro.Schema.Parser().parse(mySchema)
+    val record = jsonToAvro(json, mySchema)
+    try {
+      val writer = CarbonWriter.builder
+        .outputPath(writerPath)
+        .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build()
+      var i = 0
+      while (i < rows) {
+        writer.write(record)
+        i = i + 1
+      }
+      writer.close()
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+
+  private def jsonToAvro(json: String, avroSchema: String): GenericRecord = {
+    var input: InputStream = null
+    var writer: DataFileWriter[GenericRecord] = null
+    var encoder: Encoder = null
+    var output: ByteArrayOutputStream = null
+    try {
+      val schema = new org.apache.avro.Schema.Parser().parse(avroSchema)
+      val reader = new GenericDatumReader[GenericRecord](schema)
+      input = new ByteArrayInputStream(json.getBytes())
+      output = new ByteArrayOutputStream()
+      val din = new DataInputStream(input)
+      writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
+      writer.create(schema, output)
+      val decoder = DecoderFactory.get().jsonDecoder(schema, din)
+      var datum: GenericRecord = reader.read(null, decoder)
+      return datum
+    } finally {
+      input.close()
+      writer.close()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/682160fa/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index ab1e154..d19a96d 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -625,7 +625,8 @@ public class AvroCarbonWriter extends CarbonWriter {
       case ARRAY:
         // recursively get the sub fields
         // array will have only one sub field.
-        DataType subType = getMappingDataTypeForCollectionRecord(childSchema.getElementType());
+        DataType subType =
+            getMappingDataTypeForCollectionRecord(fieldName, childSchema.getElementType());
         if (subType != null) {
           return (new StructField(fieldName, DataTypes.createArrayType(subType)));
         } else {
@@ -661,7 +662,8 @@ public class AvroCarbonWriter extends CarbonWriter {
     }
   }
 
-  private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema) {
+  private static DataType getMappingDataTypeForCollectionRecord(String fieldName,
+      Schema childSchema) {
     LogicalType logicalType = childSchema.getLogicalType();
     switch (childSchema.getType()) {
       case BOOLEAN:
@@ -700,7 +702,7 @@ public class AvroCarbonWriter extends CarbonWriter {
         return DataTypes.FLOAT;
       case MAP:
         // recursively get the sub fields
-        StructField mapField = prepareSubFields("val", childSchema);
+        StructField mapField = prepareSubFields(fieldName, childSchema);
         if (mapField != null) {
           return mapField.getDataType();
         }
@@ -717,7 +719,8 @@ public class AvroCarbonWriter extends CarbonWriter {
         return DataTypes.createStructType(structSubFields);
       case ARRAY:
         // array will have only one sub field.
-        DataType subType = getMappingDataTypeForCollectionRecord(childSchema.getElementType());
+        DataType subType =
+            getMappingDataTypeForCollectionRecord(fieldName, childSchema.getElementType());
         if (subType != null) {
           return DataTypes.createArrayType(subType);
         } else {