You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:47 UTC
[carbondata] 27/41: [CARBONDATA-3293] Prune datamaps improvement
for count(*)
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit ef8001ed1031cd49283d0da4eb8cbb706a7145e7
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Fri Mar 15 12:37:27 2019 +0530
[CARBONDATA-3293] Prune datamaps improvement for count(*)
Problem:
(1) Currently for count (*) , the prune is same as select * query. Blocklet and ExtendedBlocklet are formed from the DataMapRow and that is of no need and it is a time consuming process.
(2) Checking the update/delete status all the time.
Solution:
(1) We have the blocklet row count in the DataMapRow itself, so it is just enough to read the count. With this count (*) query performance can be improved.
(2) No need to check the update/delete status all the time unless the table is not updated/deleted.
This closes #3148
---
.../constants/CarbonCommonConstantsInternal.java | 2 +
.../carbondata/core/datamap/TableDataMap.java | 44 +++++++++++++++
.../carbondata/core/datamap/dev/DataMap.java | 15 +++++
.../datamap/dev/cgdatamap/CoarseGrainDataMap.java | 14 +++++
.../datamap/dev/fgdatamap/FineGrainDataMap.java | 13 +++++
.../indexstore/blockletindex/BlockDataMap.java | 57 ++++++++++++++++++-
.../indexstore/blockletindex/BlockletDataMap.java | 3 +-
.../blockletindex/BlockletDataMapRowIndexes.java | 14 +++--
.../core/indexstore/schema/SchemaGenerator.java | 2 +
.../carbondata/core/mutate/CarbonUpdateUtil.java | 10 +++-
.../hadoop/api/CarbonTableInputFormat.java | 64 ++++++++++++----------
...ryWithColumnMetCacheAndCacheLevelProperty.scala | 5 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 2 +-
.../command/mutation/DeleteExecution.scala | 2 +-
14 files changed, 202 insertions(+), 45 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
index 398e03a..cfcbe44 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
@@ -24,4 +24,6 @@ public interface CarbonCommonConstantsInternal {
String QUERY_ON_PRE_AGG_STREAMING = "carbon.query.on.preagg.streaming.";
+ String ROW_COUNT = "rowCount";
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 0d46fd8..15b0e8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -34,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -499,4 +501,46 @@ public final class TableDataMap extends OperationEventListener {
}
return prunedSegments;
}
+
+ /**
+ * Prune the datamap of the given segments and return the Map of blocklet path and row count
+ *
+ * @param segments
+ * @param partitions
+ * @return
+ * @throws IOException
+ */
+ public Map<String, Long> getBlockRowCount(List<Segment> segments,
+ final List<PartitionSpec> partitions, TableDataMap defaultDataMap)
+ throws IOException {
+ Map<String, Long> blockletToRowCountMap = new HashMap<>();
+ for (Segment segment : segments) {
+ List<CoarseGrainDataMap> dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment);
+ for (CoarseGrainDataMap dataMap : dataMaps) {
+ dataMap.getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
+ }
+ }
+ return blockletToRowCountMap;
+ }
+
+ /**
+ * Prune the datamap of the given segments and return the Map of blocklet path and row count
+ *
+ * @param segments
+ * @param partitions
+ * @return
+ * @throws IOException
+ */
+ public long getRowCount(List<Segment> segments, final List<PartitionSpec> partitions,
+ TableDataMap defaultDataMap) throws IOException {
+ long totalRowCount = 0L;
+ for (Segment segment : segments) {
+ List<CoarseGrainDataMap> dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment);
+ for (CoarseGrainDataMap dataMap : dataMaps) {
+ totalRowCount += dataMap.getRowCount(segment, partitions);
+ }
+ }
+ return totalRowCount;
+ }
+
}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index c52cc41..adc74b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.datamap.dev;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -54,6 +56,19 @@ public interface DataMap<T extends Blocklet> {
List<T> prune(Expression filter, SegmentProperties segmentProperties,
List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException;
+ /**
+ * Prune the data maps for finding the row count. It returns a Map of
+ * blockletpath and the row count
+ */
+ long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException;
+
+ /**
+ * Prune the data maps for finding the row count for each block. It returns a Map of
+ * blockletpath and the row count
+ */
+ Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+ Map<String, Long> blockletToRowCountMap) throws IOException;
+
// TODO Move this method to Abstract class
/**
* Validate whether the current segment needs to be fetching the required data
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index b4af9d9..3aba163 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -18,9 +18,11 @@ package org.apache.carbondata.core.datamap.dev.cgdatamap;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.Blocklet;
@@ -41,6 +43,18 @@ public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {
throw new UnsupportedOperationException("Filter expression not supported");
}
+ @Override
+ public long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+ Map<String, Long> blockletToRowCountMap) throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+
@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index 03b2bfb..3a47df1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -18,9 +18,11 @@ package org.apache.carbondata.core.datamap.dev.fgdatamap;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -40,6 +42,17 @@ public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {
throw new UnsupportedOperationException("Filter expression not supported");
}
+ @Override
+ public long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+ Map<String, Long> blockletToRowCountMap) throws IOException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
@Override public int getNumberOfEntries() {
// keep default, one record in one datamap
return 1;
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index a7818c2..8ebd50d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -23,10 +23,13 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -217,6 +220,7 @@ public class BlockDataMap extends CoarseGrainDataMap
CarbonRowSchema[] schema = getFileFooterEntrySchema();
boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
+ long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
@@ -241,11 +245,14 @@ public class BlockDataMap extends CoarseGrainDataMap
summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties,
getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow,
blockMetaInfo, updatedMinValues, updatedMaxValues, minMaxFlag);
+ totalRowCount += fileFooter.getNumberOfRows();
}
}
List<Short> blockletCountList = new ArrayList<>();
blockletCountList.add((short) 0);
byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountList);
+ // set the total row count
+ summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, minMaxFlag);
return summaryRow;
@@ -289,6 +296,7 @@ public class BlockDataMap extends CoarseGrainDataMap
// min max flag for task summary
boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(taskSummaryMinMaxFlag, true);
+ long totalRowCount = 0;
for (DataFileFooter fileFooter : indexInfo) {
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
BlockMetaInfo blockMetaInfo =
@@ -331,6 +339,7 @@ public class BlockDataMap extends CoarseGrainDataMap
summaryRow,
blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
+ totalRowCount += previousDataFileFooter.getNumberOfRows();
minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
Arrays.fill(minMaxFlag, true);
// flag to check whether last file footer entry is different from previous entry.
@@ -361,9 +370,12 @@ public class BlockDataMap extends CoarseGrainDataMap
blockletDataMapInfo.getBlockMetaInfoMap()
.get(previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath()),
blockMinValues, blockMaxValues, minMaxFlag);
+ totalRowCount += previousDataFileFooter.getNumberOfRows();
blockletCountInEachBlock.add(totalBlockletsInOneBlock);
}
byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountInEachBlock);
+ // set the total row count
+ summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
// blocklet count index is the last index
summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
@@ -409,7 +421,7 @@ public class BlockDataMap extends CoarseGrainDataMap
}
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
- int taskMinMaxOrdinal = 0;
+ int taskMinMaxOrdinal = 1;
// get min max values for columns to be cached
byte[][] minValuesForColumnsToBeCached = BlockletDataMapUtil
.getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues);
@@ -648,6 +660,49 @@ public class BlockDataMap extends CoarseGrainDataMap
return sum;
}
+ @Override
+ public long getRowCount(Segment segment, List<PartitionSpec> partitions) {
+ long totalRowCount =
+ taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0).getLong(TASK_ROW_COUNT);
+ if (totalRowCount == 0) {
+ Map<String, Long> blockletToRowCountMap = new HashMap<>();
+ getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
+ for (long blockletRowCount : blockletToRowCountMap.values()) {
+ totalRowCount += blockletRowCount;
+ }
+ } else {
+ if (taskSummaryDMStore.getRowCount() == 0) {
+ return 0L;
+ }
+ }
+ return totalRowCount;
+ }
+
+ public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+ Map<String, Long> blockletToRowCountMap) {
+ if (memoryDMStore.getRowCount() == 0) {
+ return new HashMap<>();
+ }
+ // if it has partitioned datamap but there is no partitioned information stored, it means
+ // partitions are dropped so return empty list.
+ if (partitions != null) {
+ if (!validatePartitionInfo(partitions)) {
+ return new HashMap<>();
+ }
+ }
+ CarbonRowSchema[] schema = getFileFooterEntrySchema();
+ int numEntries = memoryDMStore.getRowCount();
+ for (int i = 0; i < numEntries; i++) {
+ DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i);
+ String fileName = new String(dataMapRow.getByteArray(FILE_PATH_INDEX),
+ CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + CarbonTablePath.getCarbonDataExtension();
+ int rowCount = dataMapRow.getInt(ROW_COUNT_INDEX);
+ // prepend segment number with the blocklet file path
+ blockletToRowCountMap.put((segment.getSegmentNo() + "," + fileName), (long) rowCount);
+ }
+ return blockletToRowCountMap;
+ }
+
private List<Blocklet> prune(FilterResolverIntf filterExp) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 191056d..7939a17 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -146,6 +146,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
relativeBlockletId += fileFooter.getBlockletList().size();
}
}
+ summaryRow.setLong(0L, TASK_ROW_COUNT);
setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
summaryRowMinMaxFlag);
return summaryRow;
@@ -163,7 +164,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
for (int index = 0; index < blockletList.size(); index++) {
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
- int taskMinMaxOrdinal = 0;
+ int taskMinMaxOrdinal = 1;
BlockletInfo blockletInfo = blockletList.get(index);
blockletInfo.setSorted(fileFooter.isSorted());
BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
index 085fb7d..dcaecd2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
@@ -50,15 +50,17 @@ public interface BlockletDataMapRowIndexes {
int BLOCKLET_ID_INDEX = 12;
// Summary dataMap row indexes
- int TASK_MIN_VALUES_INDEX = 0;
+ int TASK_ROW_COUNT = 0;
- int TASK_MAX_VALUES_INDEX = 1;
+ int TASK_MIN_VALUES_INDEX = 1;
- int SUMMARY_INDEX_FILE_NAME = 2;
+ int TASK_MAX_VALUES_INDEX = 2;
- int SUMMARY_SEGMENTID = 3;
+ int SUMMARY_INDEX_FILE_NAME = 3;
- int TASK_MIN_MAX_FLAG = 4;
+ int SUMMARY_SEGMENTID = 4;
- int SUMMARY_INDEX_PATH = 5;
+ int TASK_MIN_MAX_FLAG = 5;
+
+ int SUMMARY_INDEX_PATH = 6;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
index 7a2e13a..52b9fb3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -113,6 +113,8 @@ public class SchemaGenerator {
List<CarbonColumn> minMaxCacheColumns,
boolean storeBlockletCount, boolean filePathToBeStored) throws MemoryException {
List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
+ // for number of rows.
+ taskMinMaxSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
// get MinMax Schema
getMinMaxSchema(segmentProperties, taskMinMaxSchemas, minMaxCacheColumns);
// for storing file name
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index bd8c465..a632f03 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -747,9 +748,12 @@ public class CarbonUpdateUtil {
/**
* Return row count of input block
*/
- public static long getRowCount(
- BlockMappingVO blockMappingVO,
- CarbonTable carbonTable) {
+ public static long getRowCount(BlockMappingVO blockMappingVO, CarbonTable carbonTable) {
+ if (blockMappingVO.getBlockRowCountMapping().size() == 1
+ && blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT)
+ != null) {
+ return blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT);
+ }
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable);
long rowCount = 0;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 281143b..4ba8b8c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -28,11 +28,11 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -58,7 +58,6 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.stream.StreamFile;
import org.apache.carbondata.core.stream.StreamPruner;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.hadoop.conf.Configuration;
@@ -576,7 +575,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
* Get the row count of the Block and mapping of segment and Block count.
*/
public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
- List<PartitionSpec> partitions) throws IOException {
+ List<PartitionSpec> partitions, boolean isUpdateFlow) throws IOException {
// Normal query flow goes to CarbonInputFormat#getPrunedBlocklets and initialize the
// pruning info for table we queried. But here count star query without filter uses a different
// query plan, and no pruning info is initialized. When it calls default data map to
@@ -586,7 +585,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
ExplainCollector.remove();
AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
- TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
+ TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier);
LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
@@ -602,6 +601,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
// TODO: currently only batch segment is supported, add support for streaming table
List<Segment> filteredSegment =
getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope);
+ boolean isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
/* In the select * flow, getSplits() method was clearing the segmentMap if,
segment needs refreshing. same thing need for select count(*) flow also.
For NonTransactional table, one of the reason for a segment refresh is below scenario.
@@ -624,36 +624,40 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
.clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
toBeCleanedSegments);
}
- List<ExtendedBlocklet> blocklets =
- blockletMap.prune(filteredSegment, (FilterResolverIntf) null, partitions);
- for (ExtendedBlocklet blocklet : blocklets) {
- String blockName = blocklet.getPath();
- blockName = CarbonTablePath.getCarbonDataFileName(blockName);
- blockName = blockName + CarbonTablePath.getCarbonDataExtension();
-
- long rowCount = blocklet.getDetailInfo().getRowCount();
-
- String segmentId = Segment.toSegment(blocklet.getSegmentId()).getSegmentNo();
- String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
-
- // if block is invalid then don't add the count
- SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
-
- if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
- Long blockCount = blockRowCountMapping.get(key);
- if (blockCount == null) {
- blockCount = 0L;
- Long count = segmentAndBlockCountMapping.get(segmentId);
- if (count == null) {
- count = 0L;
+ if (isIUDTable || isUpdateFlow) {
+ Map<String, Long> blockletToRowCountMap =
+ defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap);
+ // key is the (segmentId","+blockletPath) and key is the row count of that blocklet
+ for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) {
+ String[] segmentIdAndPath = eachBlocklet.getKey().split(",", 2);
+ String segmentId = segmentIdAndPath[0];
+ String blockName = segmentIdAndPath[1];
+
+ long rowCount = eachBlocklet.getValue();
+
+ String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
+
+ // if block is invalid then don't add the count
+ SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+ if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
+ Long blockCount = blockRowCountMapping.get(key);
+ if (blockCount == null) {
+ blockCount = 0L;
+ Long count = segmentAndBlockCountMapping.get(segmentId);
+ if (count == null) {
+ count = 0L;
+ }
+ segmentAndBlockCountMapping.put(segmentId, count + 1);
}
- segmentAndBlockCountMapping.put(segmentId, count + 1);
+ blockCount += rowCount;
+ blockRowCountMapping.put(key, blockCount);
}
- blockCount += rowCount;
- blockRowCountMapping.put(key, blockCount);
}
+ } else {
+ long totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap);
+ blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount);
}
-
return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
}
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 7c9a9fc..001964a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.dev.DataMap
import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment, TableDataMap}
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
-import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap}
+import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap, BlockletDataMapRowIndexes}
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -93,7 +93,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false)
- val minSchemas = summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
+ val minSchemas = summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX)
+ .asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
.getChildSchemas
minSchemas.length == expectedLength
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 297cb54..cfceea4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -64,7 +64,7 @@ case class CarbonCountStar(
sparkSession,
TableIdentifier(
carbonTable.getTableName,
- Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
+ Some(carbonTable.getDatabaseName))).map(_.asJava).orNull, false),
carbonTable)
val valueRaw =
attributesRaw.head.dataType match {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a88a02b..7337496 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -104,7 +104,7 @@ object DeleteExecution {
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
- TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
+ TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull, true)
val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
CarbonUpdateUtil
.createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)