You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/14 14:22:46 UTC

carbondata git commit: [CARBONDATA-2924] Fix parsing issue for map as a nested array child and change the error message in sort column validation for SDK

Repository: carbondata
Updated Branches:
  refs/heads/master f354e0b2b -> ac79a343f


[CARBONDATA-2924] Fix parsing issue for map as a nested array child and change the error message in sort column validation for SDK

This PR contains

Fix to support nested complex type till integer length. When the data length in one row for nested complex type was crossing the short limit exception was being throw. Now length till integer limit is supported.
Fixed error message when map type was given as sort column in SDK
Fixed map type parsing issue for nested structures like struct<array> or array<array>
Modified code to add to LRU cache only when the flag to add to cache is set to true

This closes #2702


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ac79a343
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ac79a343
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ac79a343

Branch: refs/heads/master
Commit: ac79a343f57736302fd01ab8312709f8e570abc2
Parents: f354e0b
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Sep 10 12:42:33 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Sep 14 19:52:33 2018 +0530

----------------------------------------------------------------------
 .../indexstore/BlockletDataMapIndexStore.java   |  13 ++-
 .../TableBlockIndexUniqueIdentifierWrapper.java |  13 ++-
 .../examples/MinMaxIndexDataMapFactory.java     |   3 +-
 ...tNonTransactionalCarbonTableForMapType.scala | 113 +++++++++++++++++++
 .../datasource/SparkCarbonDataSourceTest.scala  |  36 ++++++
 .../loading/sort/SortStepRowHandler.java        |   4 +-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |   5 +-
 .../sdk/file/CarbonWriterBuilder.java           |   6 +-
 8 files changed, 173 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 323899e..ed709c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -111,7 +111,8 @@ public class BlockletDataMapIndexStore
                   carbonDataFileBlockMetaInfoMapping);
           BlockDataMap blockletDataMap =
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
-                  identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe(),
+                  identifierWrapper.getCarbonTable(),
+                  identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(),
                   identifierWrapper.getConfiguration());
           dataMaps.add(blockletDataMap);
           blockletDataMapIndexWrapper =
@@ -131,7 +132,7 @@ public class BlockletDataMapIndexStore
               BlockDataMap blockletDataMap =
                   loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
                       identifierWrapper.getCarbonTable(),
-                      identifierWrapper.isAddTableBlockToUnsafe(),
+                      identifierWrapper.isAddTableBlockToUnsafeAndLRUCache(),
                       identifierWrapper.getConfiguration());
               dataMaps.add(blockletDataMap);
             }
@@ -140,8 +141,10 @@ public class BlockletDataMapIndexStore
               new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
                   identifierWrapper.getConfiguration());
         }
-        lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
-            blockletDataMapIndexWrapper.getMemorySize());
+        if (identifierWrapper.isAddTableBlockToUnsafeAndLRUCache()) {
+          lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
+              blockletDataMapIndexWrapper.getMemorySize());
+        }
       } catch (Throwable e) {
         // clear all the memory used by datamaps loaded
         for (DataMap dataMap : dataMaps) {
@@ -227,7 +230,7 @@ public class BlockletDataMapIndexStore
         // maintained at segment level so it need to be called only once for clearing
         SegmentPropertiesAndSchemaHolder.getInstance()
             .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex(),
-                tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafe());
+                tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafeAndLRUCache());
       }
     }
     lruCache.remove(tableSegmentUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
index 77756de..b125197 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
@@ -41,9 +41,10 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
 
   private transient Configuration configuration;
   /**
-   * flag to specify whether to load table block metadata in unsafe or safe. Default value is true
+   * flag to specify whether to load table block metadata in unsafe or safe and whether to add the
+   * table block metadata in LRU cache. Default value is true
    */
-  private boolean addTableBlockToUnsafe = true;
+  private boolean addTableBlockToUnsafeAndLRUCache = true;
 
   public TableBlockIndexUniqueIdentifierWrapper(
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) {
@@ -64,10 +65,10 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
   // Kindly do not remove
   public TableBlockIndexUniqueIdentifierWrapper(
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
-      boolean addTableBlockToUnsafe) {
+      boolean addTableBlockToUnsafeAndLRUCache) {
     this(tableBlockIndexUniqueIdentifier, carbonTable);
-    this.addTableBlockToUnsafe = addTableBlockToUnsafe;
     this.configuration = FileFactory.getConfiguration();
+    this.addTableBlockToUnsafeAndLRUCache = addTableBlockToUnsafeAndLRUCache;
   }
 
 
@@ -79,8 +80,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
     return carbonTable;
   }
 
-  public boolean isAddTableBlockToUnsafe() {
-    return addTableBlockToUnsafe;
+  public boolean isAddTableBlockToUnsafeAndLRUCache() {
+    return addTableBlockToUnsafeAndLRUCache;
   }
 
   public Configuration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 1361d7a..7f54a0e 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Min Max DataMap Factory
@@ -113,7 +114,7 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
           MinMaxDataWriter.genDataMapStorePath(
               CarbonTablePath.getSegmentPath(
                   identifier.getTablePath(), segment.getSegmentNo()),
-              dataMapName)));
+              dataMapName), new Configuration(false)));
     } catch (MemoryException ex) {
       throw new IOException(ex);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
index cd73b27..a6bc224 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableForMapType.scala
@@ -273,6 +273,53 @@ class TestNonTransactionalCarbonTableForMapType extends QueryTest with BeforeAnd
     nonTransactionalCarbonTable.WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
+  def buildStructSchemaWithNestedArrayOfMapTypeAsValue(rows: Int): Unit = {
+    deleteDirectory(writerPath)
+    val mySchema =
+      """
+        |{
+        |  "name": "address",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "name",
+        |      "type": "string"
+        |    },
+        |    {
+        |      "name": "age",
+        |      "type": "int"
+        |    },
+        |    {
+        |      "name": "structRecord",
+        |      "type": {
+        |        "type": "record",
+        |        "name": "my_address",
+        |        "fields": [
+        |          {
+        |            "name": "street",
+        |            "type": "string"
+        |          },
+        |          {
+        |            "name": "houseDetails",
+        |            "type": {
+        |               "type": "array",
+        |               "items": {
+        |                   "name": "memberDetails",
+        |                   "type": "map",
+        |                   "values": "string"
+        |                }
+        |             }
+        |          }
+        |        ]
+        |      }
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    val json = """ {"name":"bob", "age":10, "structRecord": {"street":"street1", "houseDetails": [{"101": "Rahul", "102": "Pawan"}]}} """.stripMargin
+    nonTransactionalCarbonTable.WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
   def buildArraySchemaWithMapTypeAsValue(rows: Int): Unit = {
     deleteDirectory(writerPath)
     val mySchema =
@@ -307,6 +354,44 @@ class TestNonTransactionalCarbonTableForMapType extends QueryTest with BeforeAnd
     nonTransactionalCarbonTable.WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
+  def buildArraySchemaWithNestedArrayOfMapTypeAsValue(rows: Int): Unit = {
+    deleteDirectory(writerPath)
+    val mySchema =
+      """
+        |{
+        |  "name": "address",
+        |  "type": "record",
+        |  "fields": [
+        |    {
+        |      "name": "name",
+        |      "type": "string"
+        |    },
+        |    {
+        |      "name": "age",
+        |      "type": "int"
+        |    },
+        |    {
+        |      "name": "arrayRecord",
+        |      "type": {
+        |        "type": "array",
+        |        "items": {
+        |           "name": "FloorNum",
+        |           "type": "array",
+        |           "items": {
+        |             "name": "houseDetails",
+        |             "type": "map",
+        |             "values": "string"
+        |           }
+        |        }
+        |      }
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+    val json = """ {"name":"bob", "age":10, "arrayRecord": [[{"101": "Rahul", "102": "Pawan"}]]} """.stripMargin
+    nonTransactionalCarbonTable.WriteFilesWithAvroWriter(rows, mySchema, json)
+  }
+
   private def dropSchema: Unit = {
     deleteDirectory(writerPath)
     sql("DROP TABLE IF EXISTS sdkMapOutputTable")
@@ -397,6 +482,21 @@ class TestNonTransactionalCarbonTableForMapType extends QueryTest with BeforeAnd
       Row("bob", 10, Row("street1", Map("101" -> "Rahul", "102" -> "Pawan")))))
   }
 
+  test("Read sdk writer Avro output Map Type with map type as child to struct<array> type") {
+    buildStructSchemaWithNestedArrayOfMapTypeAsValue(3)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkMapOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkMapOutputTable STORED BY 'carbondata' LOCATION
+          |'$writerPath' """.stripMargin)
+    sql("desc formatted sdkMapOutputTable").show(1000, false)
+    sql("select * from sdkMapOutputTable").show(false)
+    checkAnswer(sql("select * from sdkMapOutputTable"), Seq(
+      Row("bob", 10, Row("street1", Seq(Map("101" -> "Rahul", "102" -> "Pawan")))),
+      Row("bob", 10, Row("street1", Seq(Map("101" -> "Rahul", "102" -> "Pawan")))),
+      Row("bob", 10, Row("street1", Seq(Map("101" -> "Rahul", "102" -> "Pawan"))))))
+  }
+
   test("Read sdk writer Avro output Map Type with map type as child to array type") {
     buildArraySchemaWithMapTypeAsValue(3)
     assert(new File(writerPath).exists())
@@ -410,6 +510,19 @@ class TestNonTransactionalCarbonTableForMapType extends QueryTest with BeforeAnd
       Row("bob", 10, Seq(Map("101" -> "Rahul", "102" -> "Pawan")))))
   }
 
+  test("Read sdk writer Avro output Map Type with map type as child to array<array> type") {
+    buildArraySchemaWithNestedArrayOfMapTypeAsValue(3)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkMapOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkMapOutputTable STORED BY 'carbondata' LOCATION
+          |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkMapOutputTable"), Seq(
+      Row("bob", 10, Seq(Seq(Map("101" -> "Rahul", "102" -> "Pawan")))),
+      Row("bob", 10, Seq(Seq(Map("101" -> "Rahul", "102" -> "Pawan")))),
+      Row("bob", 10, Seq(Seq(Map("101" -> "Rahul", "102" -> "Pawan"))))))
+  }
+
   override def afterAll(): Unit = {
     dropSchema
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 825cdec..c97732a 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -229,6 +229,24 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     spark.sql("drop table if exists parquet_table")
   }
 
+  test("test write with array type with value as nested array<array<map>> type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, Array(Array(Map("b" -> "c"))), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 array<array<map<string,string>>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
   test("test write with struct type with value as nested map type") {
     spark.sql("drop table if exists carbon_table")
     spark.sql("drop table if exists parquet_table")
@@ -247,6 +265,24 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     spark.sql("drop table if exists parquet_table")
   }
 
+  test("test write with struct type with value as nested struct<array<map>> type") {
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, ("a", Array(Map("b" -> "c"))), x))
+      .toDF("c1", "c2", "number")
+
+    df.write
+      .format("parquet").saveAsTable("parquet_table")
+    spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:array<map<string,string>>>, number int) using carbon")
+    spark.sql("insert into carbon_table select * from parquet_table")
+    assert(spark.sql("select * from carbon_table").count() == 10)
+    TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table"))
+    spark.sql("drop table if exists carbon_table")
+    spark.sql("drop table if exists parquet_table")
+  }
+
   test("test write with map type") {
     spark.sql("drop table if exists carbon_table")
     spark.sql("drop table if exists parquet_table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index f6fc3ca..697f590 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -267,7 +267,7 @@ public class SortStepRowHandler implements Serializable {
 
     // read complex dims
     for (int i = 0; i < complexDimCnt; i++) {
-      short len = rowBuffer.getShort();
+      int len = rowBuffer.getInt();
       byte[] bytes = new byte[len];
       rowBuffer.get(bytes);
       noDictDims[noDictIndex++] = bytes;
@@ -587,7 +587,7 @@ public class SortStepRowHandler implements Serializable {
     // convert complex dims
     for (int idx = 0; idx < this.complexDimCnt; idx++) {
       byte[] bytes = (byte[]) row[this.complexDimIdx[idx]];
-      rowBuffer.putShort((short) bytes.length);
+      rowBuffer.putInt(bytes.length);
       rowBuffer.put(bytes);
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index a183197..02980a7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -702,10 +702,7 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         StructField mapField = prepareSubFields("val", childSchema);
         if (mapField != null) {
-          DataType ketType = ((StructType) mapField.getDataType()).getFields().get(0).getDataType();
-          DataType valueType =
-              ((StructType) mapField.getDataType()).getFields().get(1).getDataType();
-          return DataTypes.createMapType(ketType, valueType);
+          return mapField.getDataType();
         }
         return null;
       case RECORD:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac79a343/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 388870f..02434fc 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -663,8 +663,10 @@ public class CarbonWriterBuilder {
           if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT
               || DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType()
               || field.getDataType() == DataTypes.VARCHAR) {
-            throw new RuntimeException(
-                " sort columns not supported for array, struct, double, float, decimal, varchar");
+            String errorMsg =
+                "sort columns not supported for array, struct, map, double, float, decimal,"
+                    + "varchar";
+            throw new RuntimeException(errorMsg);
           }
         }
         if (field.getChildren() != null && field.getChildren().size() > 0) {