You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 13:17:39 UTC

carbondata git commit: [CARBONDATA-3036] Cache Columns And Refresh Table Isuue Fix

Repository: carbondata
Updated Branches:
  refs/heads/master de6e98b08 -> 2c50ca5cf


[CARBONDATA-3036] Cache Columns And Refresh Table Isuue Fix

Refresh Table Issue : Refresh Table command acting in case sensitive manner.
Cache Columns Issue : Results inconsistent when cache is set but min/max exceeds.
Columns are dictionary excluded.

Fix 1 : Path for carbon file was been taken as whatever table name given
in the query(Lowercase/Uppercase). Changed it to lowercase.
Fix 2 : MinMaxFlag array was not set according to the columns to be cached giving inconsistent results.
Changed it according to the min/max values array for whatever columns given in Cache Columns only.

This closes #2848


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2c50ca5c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2c50ca5c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2c50ca5c

Branch: refs/heads/master
Commit: 2c50ca5cf42a456dd1f1d5a5df13b43a729e3148
Parents: de6e98b
Author: Manish Nalla <ma...@gmail.com>
Authored: Thu Oct 25 16:39:10 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu Oct 25 18:46:00 2018 +0530

----------------------------------------------------------------------
 .../indexstore/blockletindex/BlockDataMap.java  | 24 +++++----
 .../blockletindex/BlockletDataMap.java          |  9 ++--
 .../core/indexstore/schema/SchemaGenerator.java | 23 +++++----
 .../core/util/BlockletDataMapUtil.java          | 23 +++++++++
 ...ithColumnMetCacheAndCacheLevelProperty.scala | 51 ++++++++++++++++++++
 .../management/RefreshCarbonTableCommand.scala  |  4 +-
 6 files changed, 108 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 5a25bc5..3ab5923 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -215,7 +215,7 @@ public class BlockDataMap extends CoarseGrainDataMap
       List<DataFileFooter> indexInfo) throws IOException, MemoryException {
     DataMapRowImpl summaryRow = null;
     CarbonRowSchema[] schema = getFileFooterEntrySchema();
-    boolean[] minMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+    boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
     Arrays.fill(minMaxFlag, true);
     for (DataFileFooter fileFooter : indexInfo) {
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
@@ -255,8 +255,11 @@ public class BlockDataMap extends CoarseGrainDataMap
       CarbonRowSchema[] taskSummarySchema, SegmentProperties segmentProperties,
       boolean[] minMaxFlag) {
     // add min max flag for all the dimension columns
-    addMinMaxFlagValues(summaryRow, taskSummarySchema[TASK_MIN_MAX_FLAG], minMaxFlag,
-        TASK_MIN_MAX_FLAG, segmentProperties.getDimensions().size());
+    boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
+        .getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, getMinMaxCacheColumns(),
+            minMaxFlag);
+    addMinMaxFlagValues(summaryRow, taskSummarySchema[TASK_MIN_MAX_FLAG],
+        minMaxFlagValuesForColumnsToBeCached, TASK_MIN_MAX_FLAG);
   }
 
   /**
@@ -281,10 +284,10 @@ public class BlockDataMap extends CoarseGrainDataMap
     boolean isLastFileFooterEntryNeedToBeAdded = false;
     CarbonRowSchema[] schema = getFileFooterEntrySchema();
     // flag for each block entry
-    boolean[] minMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+    boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
     Arrays.fill(minMaxFlag, true);
     // min max flag for task summary
-    boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+    boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
     Arrays.fill(taskSummaryMinMaxFlag, true);
     for (DataFileFooter fileFooter : indexInfo) {
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
@@ -328,7 +331,7 @@ public class BlockDataMap extends CoarseGrainDataMap
               summaryRow,
               blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()),
               blockMinValues, blockMaxValues, minMaxFlag);
-          minMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+          minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
           Arrays.fill(minMaxFlag, true);
           // flag to check whether last file footer entry is different from previous entry.
           // If yes then it need to be added at last
@@ -412,6 +415,8 @@ public class BlockDataMap extends CoarseGrainDataMap
         .getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues);
     byte[][] maxValuesForColumnsToBeCached = BlockletDataMapUtil
         .getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, maxValues);
+    boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
+        .getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minMaxFlag);
     row.setRow(addMinMax(schema[ordinal], minValuesForColumnsToBeCached), ordinal);
     // compute and set task level min values
     addTaskMinMaxValues(summaryRow, taskSummarySchema, taskMinMaxOrdinal,
@@ -440,8 +445,7 @@ public class BlockDataMap extends CoarseGrainDataMap
       // store block size
       row.setLong(blockMetaInfo.getSize(), ordinal++);
       // add min max flag for all the dimension columns
-      addMinMaxFlagValues(row, schema[ordinal], minMaxFlag, ordinal,
-          segmentProperties.getDimensions().size());
+      addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
       memoryDMStore.addIndexRow(schema, row);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -450,13 +454,13 @@ public class BlockDataMap extends CoarseGrainDataMap
   }
 
   protected void addMinMaxFlagValues(DataMapRow row, CarbonRowSchema carbonRowSchema,
-      boolean[] minMaxFlag, int ordinal, int dimensionCount) {
+      boolean[] minMaxFlag, int ordinal) {
     CarbonRowSchema[] minMaxFlagSchema =
         ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
     DataMapRow minMaxFlagRow = new DataMapRowImpl(minMaxFlagSchema);
     int flagOrdinal = 0;
     // min value adding
-    for (int i = 0; i < dimensionCount; i++) {
+    for (int i = 0; i < minMaxFlag.length; i++) {
       minMaxFlagRow.setBoolean(minMaxFlag[i], flagOrdinal++);
     }
     row.setRow(minMaxFlagRow, ordinal);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index a3ca1cd..242fc9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -116,7 +116,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
     String tempFilePath = null;
     DataMapRowImpl summaryRow = null;
     CarbonRowSchema[] schema = getFileFooterEntrySchema();
-    boolean[] summaryRowMinMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+    boolean[] summaryRowMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
     Arrays.fill(summaryRowMinMaxFlag, true);
     // Relative blocklet ID is the id assigned to a blocklet within a part file
     int relativeBlockletId = 0;
@@ -173,6 +173,9 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
       byte[][] maxValuesForColumnsToBeCached = BlockletDataMapUtil
           .getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns,
               minMaxIndex.getMaxValues());
+      boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
+          .getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, minMaxCacheColumns,
+              fileFooter.getBlockletIndex().getMinMaxIndex().getIsMinMaxSet());
       row.setRow(addMinMax(schema[ordinal], minValuesForColumnsToBeCached), ordinal);
       // compute and set task level min values
       addTaskMinMaxValues(summaryRow, taskSummarySchema, taskMinMaxOrdinal,
@@ -201,9 +204,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
         // Store block size
         row.setLong(blockMetaInfo.getSize(), ordinal++);
         // add min max flag for all the dimension columns
-        addMinMaxFlagValues(row, schema[ordinal],
-            fileFooter.getBlockletIndex().getMinMaxIndex().getIsMinMaxSet(), ordinal,
-            segmentProperties.getDimensions().size());
+        addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
         ordinal++;
         // add blocklet info
         ByteArrayOutputStream stream = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
index f3016a4..7a2e13a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -58,7 +58,7 @@ public class SchemaGenerator {
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
     // for storing min max flag for each column which reflects whether min max for a column is
     // written in the metadata or not.
-    addMinMaxFlagSchema(indexSchemas, segmentProperties);
+    addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
     CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
     return schema;
   }
@@ -90,7 +90,7 @@ public class SchemaGenerator {
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
     // for storing min max flag for each column which reflects whether min max for a column is
     // written in the metadata or not.
-    addMinMaxFlagSchema(indexSchemas, segmentProperties);
+    addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
     //for blocklet info
     indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
     // for number of pages.
@@ -123,7 +123,7 @@ public class SchemaGenerator {
         .add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
     // for storing min max flag for each column which reflects whether min max for a column is
     // written in the metadata or not.
-    addMinMaxFlagSchema(taskMinMaxSchemas, segmentProperties);
+    addMinMaxFlagSchema(segmentProperties, taskMinMaxSchemas, minMaxCacheColumns);
     // store path only in case of partition table or non transactional table
     if (filePathToBeStored) {
       // for storing file path
@@ -179,15 +179,18 @@ public class SchemaGenerator {
 
   /**
    * Method to add min max flag schema for all the dimensions
-   *
-   * @param indexSchemas
    * @param segmentProperties
+   * @param indexSchemas
+   * @param minMaxCacheColumns
    */
-  private static void addMinMaxFlagSchema(List<CarbonRowSchema> indexSchemas,
-      SegmentProperties segmentProperties) {
-    int totalDimensions = segmentProperties.getDimensions().size();
-    CarbonRowSchema[] minMaxFlagSchemas = new CarbonRowSchema[totalDimensions];
-    for (int i = 0; i < totalDimensions; i++) {
+  private static void addMinMaxFlagSchema(SegmentProperties segmentProperties,
+      List<CarbonRowSchema> indexSchemas, List<CarbonColumn> minMaxCacheColumns) {
+    int minMaxFlagLength = segmentProperties.getColumnsValueSize().length;
+    if (null != minMaxCacheColumns) {
+      minMaxFlagLength = minMaxCacheColumns.size();
+    }
+    CarbonRowSchema[] minMaxFlagSchemas = new CarbonRowSchema[minMaxFlagLength];
+    for (int i = 0; i < minMaxFlagLength; i++) {
       minMaxFlagSchemas[i] = new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BOOLEAN);
     }
     CarbonRowSchema structMinMaxFlagSchema =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index ac53b56..5f78135 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -407,6 +407,29 @@ public class BlockletDataMapUtil {
   }
 
   /**
+   * Method to get the flag values for columns to be cached
+   *
+   * @param segmentProperties
+   * @param minMaxCacheColumns
+   * @param minMaxFlag
+   * @return
+   */
+  public static boolean[] getMinMaxFlagValuesForColumnsToBeCached(
+      SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns,
+      boolean[] minMaxFlag) {
+    boolean[] minMaxFlagValuesForColumnsToBeCached = minMaxFlag;
+    if (null != minMaxCacheColumns) {
+      minMaxFlagValuesForColumnsToBeCached = new boolean[minMaxCacheColumns.size()];
+      int counter = 0;
+      for (CarbonColumn column : minMaxCacheColumns) {
+        minMaxFlagValuesForColumnsToBeCached[counter++] =
+            minMaxFlag[getColumnOrdinal(segmentProperties, column)];
+      }
+    }
+    return minMaxFlagValuesForColumnsToBeCached;
+  }
+
+  /**
    * compute the column ordinal as per data is stored
    *
    * @param segmentProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 63fb2e6..10a3be8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.DataMap
 import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment, TableDataMap}
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
@@ -41,6 +42,7 @@ import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpressio
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression}
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * test class for validating COLUMN_META_CACHE and CACHE_LEVEL
@@ -52,6 +54,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
   }
 
   override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
     dropSchema
   }
 
@@ -316,4 +319,52 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     }
   }
 
+  test("Test For Cache set but Min/Max exceeds") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "30")
+    sql("DROP TABLE IF EXISTS carbonCache")
+    sql(
+      s"""
+         | CREATE TABLE carbonCache (
+         | name STRING,
+         | age STRING,
+         | desc STRING
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('COLUMN_META_CACHE'='name,desc')
+       """.stripMargin)
+    sql(
+      "INSERT INTO carbonCache values('Manish Nalla','24'," +
+      "'gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg')")
+    checkAnswer(sql(
+      "SELECT count(*) FROM carbonCache where " +
+      "desc='gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg'"),
+      Row(1))
+    sql("DROP table IF EXISTS carbonCahe")
+  }
+  test("Test For Cache set but Min/Max exceeds with Cache Level as Blocklet") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "30")
+    sql("DROP TABLE IF EXISTS carbonCache")
+    sql(
+      s"""
+         | CREATE TABLE carbonCache (
+         | name STRING,
+         | age STRING,
+         | desc STRING
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('COLUMN_META_CACHE'='name,desc','CACHE_LEVEL'='BLOCKLET')
+       """.stripMargin)
+    sql(
+      "INSERT INTO carbonCache values('Manish Nalla','24'," +
+      "'gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg')")
+    checkAnswer(sql(
+      "SELECT count(*) FROM carbonCache where " +
+      "desc='gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg'"),
+      Row(1))
+    sql("DROP table IF EXISTS carbonCahe")
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index c129194..50b88c8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -63,8 +63,8 @@ case class RefreshCarbonTableCommand(
     // then do the below steps
     // 2.2.1 validate that all the aggregate tables are copied at the store location.
     // 2.2.2 Register the aggregate tables
-    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
-    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
+    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
+    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
     // 2.1 check if the table already register with hive then ignore and continue with the next
     // schema
     if (!sparkSession.sessionState.catalog.listTables(databaseName)