You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 13:17:39 UTC
carbondata git commit: [CARBONDATA-3036] Cache Columns And Refresh
Table Isuue Fix
Repository: carbondata
Updated Branches:
refs/heads/master de6e98b08 -> 2c50ca5cf
[CARBONDATA-3036] Cache Columns And Refresh Table Isuue Fix
Refresh Table Issue : Refresh Table command acting in case sensitive manner.
Cache Columns Issue : Results inconsistent when cache is set but min/max exceeds.
Columns are dictionary excluded.
Fix 1 : Path for carbon file was been taken as whatever table name given
in the query(Lowercase/Uppercase). Changed it to lowercase.
Fix 2 : MinMaxFlag array was not set according to the columns to be cached giving inconsistent results.
Changed it according to the min/max values array for whatever columns given in Cache Columns only.
This closes #2848
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2c50ca5c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2c50ca5c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2c50ca5c
Branch: refs/heads/master
Commit: 2c50ca5cf42a456dd1f1d5a5df13b43a729e3148
Parents: de6e98b
Author: Manish Nalla <ma...@gmail.com>
Authored: Thu Oct 25 16:39:10 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Thu Oct 25 18:46:00 2018 +0530
----------------------------------------------------------------------
.../indexstore/blockletindex/BlockDataMap.java | 24 +++++----
.../blockletindex/BlockletDataMap.java | 9 ++--
.../core/indexstore/schema/SchemaGenerator.java | 23 +++++----
.../core/util/BlockletDataMapUtil.java | 23 +++++++++
...ithColumnMetCacheAndCacheLevelProperty.scala | 51 ++++++++++++++++++++
.../management/RefreshCarbonTableCommand.scala | 4 +-
6 files changed, 108 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 5a25bc5..3ab5923 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -215,7 +215,7 @@ public class BlockDataMap extends CoarseGrainDataMap
List<DataFileFooter> indexInfo) throws IOException, MemoryException {
DataMapRowImpl summaryRow = null;
CarbonRowSchema[] schema = getFileFooterEntrySchema();
- boolean[] minMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+ boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
@@ -255,8 +255,11 @@ public class BlockDataMap extends CoarseGrainDataMap
CarbonRowSchema[] taskSummarySchema, SegmentProperties segmentProperties,
boolean[] minMaxFlag) {
// add min max flag for all the dimension columns
- addMinMaxFlagValues(summaryRow, taskSummarySchema[TASK_MIN_MAX_FLAG], minMaxFlag,
- TASK_MIN_MAX_FLAG, segmentProperties.getDimensions().size());
+ boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
+ .getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, getMinMaxCacheColumns(),
+ minMaxFlag);
+ addMinMaxFlagValues(summaryRow, taskSummarySchema[TASK_MIN_MAX_FLAG],
+ minMaxFlagValuesForColumnsToBeCached, TASK_MIN_MAX_FLAG);
}
/**
@@ -281,10 +284,10 @@ public class BlockDataMap extends CoarseGrainDataMap
boolean isLastFileFooterEntryNeedToBeAdded = false;
CarbonRowSchema[] schema = getFileFooterEntrySchema();
// flag for each block entry
- boolean[] minMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+ boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
// min max flag for task summary
- boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+ boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(taskSummaryMinMaxFlag, true);
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
@@ -328,7 +331,7 @@ public class BlockDataMap extends CoarseGrainDataMap
summaryRow,
blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
- minMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+ minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
// flag to check whether last file footer entry is different from previous entry.
// If yes then it need to be added at last
@@ -412,6 +415,8 @@ public class BlockDataMap extends CoarseGrainDataMap
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues);
byte[][] maxValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, maxValues);
+ boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
+ .getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minMaxFlag);
row.setRow(addMinMax(schema[ordinal], minValuesForColumnsToBeCached), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, taskSummarySchema, taskMinMaxOrdinal,
@@ -440,8 +445,7 @@ public class BlockDataMap extends CoarseGrainDataMap
// store block size
row.setLong(blockMetaInfo.getSize(), ordinal++);
// add min max flag for all the dimension columns
- addMinMaxFlagValues(row, schema[ordinal], minMaxFlag, ordinal,
- segmentProperties.getDimensions().size());
+ addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
memoryDMStore.addIndexRow(schema, row);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -450,13 +454,13 @@ public class BlockDataMap extends CoarseGrainDataMap
}
protected void addMinMaxFlagValues(DataMapRow row, CarbonRowSchema carbonRowSchema,
- boolean[] minMaxFlag, int ordinal, int dimensionCount) {
+ boolean[] minMaxFlag, int ordinal) {
CarbonRowSchema[] minMaxFlagSchema =
((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
DataMapRow minMaxFlagRow = new DataMapRowImpl(minMaxFlagSchema);
int flagOrdinal = 0;
// min value adding
- for (int i = 0; i < dimensionCount; i++) {
+ for (int i = 0; i < minMaxFlag.length; i++) {
minMaxFlagRow.setBoolean(minMaxFlag[i], flagOrdinal++);
}
row.setRow(minMaxFlagRow, ordinal);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index a3ca1cd..242fc9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -116,7 +116,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
String tempFilePath = null;
DataMapRowImpl summaryRow = null;
CarbonRowSchema[] schema = getFileFooterEntrySchema();
- boolean[] summaryRowMinMaxFlag = new boolean[segmentProperties.getDimensions().size()];
+ boolean[] summaryRowMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(summaryRowMinMaxFlag, true);
// Relative blocklet ID is the id assigned to a blocklet within a part file
int relativeBlockletId = 0;
@@ -173,6 +173,9 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
byte[][] maxValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns,
minMaxIndex.getMaxValues());
+ boolean[] minMaxFlagValuesForColumnsToBeCached = BlockletDataMapUtil
+ .getMinMaxFlagValuesForColumnsToBeCached(segmentProperties, minMaxCacheColumns,
+ fileFooter.getBlockletIndex().getMinMaxIndex().getIsMinMaxSet());
row.setRow(addMinMax(schema[ordinal], minValuesForColumnsToBeCached), ordinal);
// compute and set task level min values
addTaskMinMaxValues(summaryRow, taskSummarySchema, taskMinMaxOrdinal,
@@ -201,9 +204,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
// Store block size
row.setLong(blockMetaInfo.getSize(), ordinal++);
// add min max flag for all the dimension columns
- addMinMaxFlagValues(row, schema[ordinal],
- fileFooter.getBlockletIndex().getMinMaxIndex().getIsMinMaxSet(), ordinal,
- segmentProperties.getDimensions().size());
+ addMinMaxFlagValues(row, schema[ordinal], minMaxFlagValuesForColumnsToBeCached, ordinal);
ordinal++;
// add blocklet info
ByteArrayOutputStream stream = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
index f3016a4..7a2e13a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -58,7 +58,7 @@ public class SchemaGenerator {
indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
// for storing min max flag for each column which reflects whether min max for a column is
// written in the metadata or not.
- addMinMaxFlagSchema(indexSchemas, segmentProperties);
+ addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
CarbonRowSchema[] schema = indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]);
return schema;
}
@@ -90,7 +90,7 @@ public class SchemaGenerator {
indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
// for storing min max flag for each column which reflects whether min max for a column is
// written in the metadata or not.
- addMinMaxFlagSchema(indexSchemas, segmentProperties);
+ addMinMaxFlagSchema(segmentProperties, indexSchemas, minMaxCacheColumns);
//for blocklet info
indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
// for number of pages.
@@ -123,7 +123,7 @@ public class SchemaGenerator {
.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
// for storing min max flag for each column which reflects whether min max for a column is
// written in the metadata or not.
- addMinMaxFlagSchema(taskMinMaxSchemas, segmentProperties);
+ addMinMaxFlagSchema(segmentProperties, taskMinMaxSchemas, minMaxCacheColumns);
// store path only in case of partition table or non transactional table
if (filePathToBeStored) {
// for storing file path
@@ -179,15 +179,18 @@ public class SchemaGenerator {
/**
* Method to add min max flag schema for all the dimensions
- *
- * @param indexSchemas
* @param segmentProperties
+ * @param indexSchemas
+ * @param minMaxCacheColumns
*/
- private static void addMinMaxFlagSchema(List<CarbonRowSchema> indexSchemas,
- SegmentProperties segmentProperties) {
- int totalDimensions = segmentProperties.getDimensions().size();
- CarbonRowSchema[] minMaxFlagSchemas = new CarbonRowSchema[totalDimensions];
- for (int i = 0; i < totalDimensions; i++) {
+ private static void addMinMaxFlagSchema(SegmentProperties segmentProperties,
+ List<CarbonRowSchema> indexSchemas, List<CarbonColumn> minMaxCacheColumns) {
+ int minMaxFlagLength = segmentProperties.getColumnsValueSize().length;
+ if (null != minMaxCacheColumns) {
+ minMaxFlagLength = minMaxCacheColumns.size();
+ }
+ CarbonRowSchema[] minMaxFlagSchemas = new CarbonRowSchema[minMaxFlagLength];
+ for (int i = 0; i < minMaxFlagLength; i++) {
minMaxFlagSchemas[i] = new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BOOLEAN);
}
CarbonRowSchema structMinMaxFlagSchema =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index ac53b56..5f78135 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -407,6 +407,29 @@ public class BlockletDataMapUtil {
}
/**
+ * Method to get the flag values for columns to be cached
+ *
+ * @param segmentProperties
+ * @param minMaxCacheColumns
+ * @param minMaxFlag
+ * @return
+ */
+ public static boolean[] getMinMaxFlagValuesForColumnsToBeCached(
+ SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns,
+ boolean[] minMaxFlag) {
+ boolean[] minMaxFlagValuesForColumnsToBeCached = minMaxFlag;
+ if (null != minMaxCacheColumns) {
+ minMaxFlagValuesForColumnsToBeCached = new boolean[minMaxCacheColumns.size()];
+ int counter = 0;
+ for (CarbonColumn column : minMaxCacheColumns) {
+ minMaxFlagValuesForColumnsToBeCached[counter++] =
+ minMaxFlag[getColumnOrdinal(segmentProperties, column)];
+ }
+ }
+ return minMaxFlagValuesForColumnsToBeCached;
+ }
+
+ /**
* compute the column ordinal as per data is stored
*
* @param segmentProperties
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 63fb2e6..10a3be8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.dev.DataMap
import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment, TableDataMap}
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
@@ -41,6 +42,7 @@ import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpressio
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression}
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
+import org.apache.carbondata.core.util.CarbonProperties
/**
* test class for validating COLUMN_META_CACHE and CACHE_LEVEL
@@ -52,6 +54,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
}
override def afterAll(): Unit = {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
dropSchema
}
@@ -316,4 +319,52 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
}
}
+ test("Test For Cache set but Min/Max exceeds") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "30")
+ sql("DROP TABLE IF EXISTS carbonCache")
+ sql(
+ s"""
+ | CREATE TABLE carbonCache (
+ | name STRING,
+ | age STRING,
+ | desc STRING
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('COLUMN_META_CACHE'='name,desc')
+ """.stripMargin)
+ sql(
+ "INSERT INTO carbonCache values('Manish Nalla','24'," +
+ "'gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg')")
+ checkAnswer(sql(
+ "SELECT count(*) FROM carbonCache where " +
+ "desc='gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg'"),
+ Row(1))
+ sql("DROP table IF EXISTS carbonCahe")
+ }
+ test("Test For Cache set but Min/Max exceeds with Cache Level as Blocklet") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "30")
+ sql("DROP TABLE IF EXISTS carbonCache")
+ sql(
+ s"""
+ | CREATE TABLE carbonCache (
+ | name STRING,
+ | age STRING,
+ | desc STRING
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('COLUMN_META_CACHE'='name,desc','CACHE_LEVEL'='BLOCKLET')
+ """.stripMargin)
+ sql(
+ "INSERT INTO carbonCache values('Manish Nalla','24'," +
+ "'gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg')")
+ checkAnswer(sql(
+ "SELECT count(*) FROM carbonCache where " +
+ "desc='gvsahgvsahjvcsahjgvavacavkjvaskjvsahgsvagkjvkjgvsackjg'"),
+ Row(1))
+ sql("DROP table IF EXISTS carbonCahe")
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2c50ca5c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index c129194..50b88c8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -63,8 +63,8 @@ case class RefreshCarbonTableCommand(
// then do the below steps
// 2.2.1 validate that all the aggregate tables are copied at the store location.
// 2.2.2 Register the aggregate tables
- val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
- val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName)
+ val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName.toLowerCase)(sparkSession)
+ val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName.toLowerCase)
// 2.1 check if the table already register with hive then ignore and continue with the next
// schema
if (!sparkSession.sessionState.catalog.listTables(databaseName)