You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/03/21 17:54:22 UTC

[carbondata] branch master updated: [CARBONDATA-3293] Prune datamaps improvement for count(*)

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd81af  [CARBONDATA-3293] Prune datamaps improvement for count(*)
6fd81af is described below

commit 6fd81af183344f67ab09e846a7aee22e2d8a3ae4
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Fri Mar 15 12:37:27 2019 +0530

    [CARBONDATA-3293] Prune datamaps improvement for count(*)
    
    Problem:
    (1) Currently for count (*) , the prune is same as select * query. Blocklet and ExtendedBlocklet are formed from the DataMapRow and that is of no need and it is a time consuming process.
    (2) Checking the update/delete status all the time.
    
    Solution:
    (1) We have the blocklet row count in the DataMapRow itself, so it is just enough to read the count. With this count (*) query performance can be improved.
    (2) No need to check the update/delete status all the time unless the table is not updated/deleted.
    
    This closes #3148
---
 .../constants/CarbonCommonConstantsInternal.java   |  2 +
 .../carbondata/core/datamap/TableDataMap.java      | 44 +++++++++++++++
 .../carbondata/core/datamap/dev/DataMap.java       | 15 +++++
 .../datamap/dev/cgdatamap/CoarseGrainDataMap.java  | 14 +++++
 .../datamap/dev/fgdatamap/FineGrainDataMap.java    | 13 +++++
 .../indexstore/blockletindex/BlockDataMap.java     | 57 ++++++++++++++++++-
 .../indexstore/blockletindex/BlockletDataMap.java  |  3 +-
 .../blockletindex/BlockletDataMapRowIndexes.java   | 14 +++--
 .../core/indexstore/schema/SchemaGenerator.java    |  2 +
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 10 +++-
 .../hadoop/api/CarbonTableInputFormat.java         | 64 ++++++++++++----------
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala |  5 +-
 .../org/apache/spark/sql/CarbonCountStar.scala     |  2 +-
 .../command/mutation/DeleteExecution.scala         |  2 +-
 14 files changed, 202 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
index 398e03a..cfcbe44 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstantsInternal.java
@@ -24,4 +24,6 @@ public interface CarbonCommonConstantsInternal {
 
   String QUERY_ON_PRE_AGG_STREAMING = "carbon.query.on.preagg.streaming.";
 
+  String ROW_COUNT = "rowCount";
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 0d46fd8..15b0e8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -34,6 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.BlockletSerializer;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -499,4 +501,46 @@ public final class TableDataMap extends OperationEventListener {
     }
     return prunedSegments;
   }
+
+  /**
+   * Prune the datamap of the given segments and return the Map of blocklet path and row count
+   *
+   * @param segments
+   * @param partitions
+   * @return
+   * @throws IOException
+   */
+  public Map<String, Long> getBlockRowCount(List<Segment> segments,
+      final List<PartitionSpec> partitions, TableDataMap defaultDataMap)
+      throws IOException {
+    Map<String, Long> blockletToRowCountMap = new HashMap<>();
+    for (Segment segment : segments) {
+      List<CoarseGrainDataMap> dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment);
+      for (CoarseGrainDataMap dataMap : dataMaps) {
+        dataMap.getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
+      }
+    }
+    return blockletToRowCountMap;
+  }
+
+  /**
+   * Prune the datamap of the given segments and return the Map of blocklet path and row count
+   *
+   * @param segments
+   * @param partitions
+   * @return
+   * @throws IOException
+   */
+  public long getRowCount(List<Segment> segments, final List<PartitionSpec> partitions,
+      TableDataMap defaultDataMap) throws IOException {
+    long totalRowCount = 0L;
+    for (Segment segment : segments) {
+      List<CoarseGrainDataMap> dataMaps = defaultDataMap.getDataMapFactory().getDataMaps(segment);
+      for (CoarseGrainDataMap dataMap : dataMaps) {
+        totalRowCount += dataMap.getRowCount(segment, partitions);
+      }
+    }
+    return totalRowCount;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index c52cc41..adc74b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -54,6 +56,19 @@ public interface DataMap<T extends Blocklet> {
   List<T> prune(Expression filter, SegmentProperties segmentProperties,
       List<PartitionSpec> partitions, CarbonTable carbonTable) throws IOException;
 
+  /**
+   * Prune the data maps for finding the row count. It returns a Map of
+   * blockletpath and the row count
+   */
+  long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException;
+
+  /**
+   * Prune the data maps for finding the row count for each block. It returns a Map of
+   * blockletpath and the row count
+   */
+  Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+      Map<String, Long> blockletToRowCountMap) throws IOException;
+
   // TODO Move this method to Abstract class
   /**
    * Validate whether the current segment needs to be fetching the required data
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index b4af9d9..3aba163 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -18,9 +18,11 @@ package org.apache.carbondata.core.datamap.dev.cgdatamap;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.Blocklet;
@@ -41,6 +43,18 @@ public abstract class CoarseGrainDataMap implements DataMap<Blocklet> {
     throw new UnsupportedOperationException("Filter expression not supported");
   }
 
+  @Override
+  public long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+      Map<String, Long> blockletToRowCountMap) throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+
   @Override public int getNumberOfEntries() {
     // keep default, one record in one datamap
     return 1;
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index 03b2bfb..3a47df1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -18,9 +18,11 @@ package org.apache.carbondata.core.datamap.dev.fgdatamap;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -40,6 +42,17 @@ public abstract class FineGrainDataMap implements DataMap<FineGrainBlocklet> {
     throw new UnsupportedOperationException("Filter expression not supported");
   }
 
+  @Override
+  public long getRowCount(Segment segment, List<PartitionSpec> partitions) throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
+  @Override
+  public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+      Map<String, Long> blockletToRowCountMap) throws IOException {
+    throw new UnsupportedOperationException("Operation not supported");
+  }
+
   @Override public int getNumberOfEntries() {
     // keep default, one record in one datamap
     return 1;
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index a7818c2..8ebd50d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -23,10 +23,13 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -217,6 +220,7 @@ public class BlockDataMap extends CoarseGrainDataMap
     CarbonRowSchema[] schema = getFileFooterEntrySchema();
     boolean[] minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
     Arrays.fill(minMaxFlag, true);
+    long totalRowCount = 0;
     for (DataFileFooter fileFooter : indexInfo) {
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
@@ -241,11 +245,14 @@ public class BlockDataMap extends CoarseGrainDataMap
         summaryRow = loadToUnsafeBlock(schema, taskSummarySchema, fileFooter, segmentProperties,
             getMinMaxCacheColumns(), blockInfo.getFilePath(), summaryRow,
             blockMetaInfo, updatedMinValues, updatedMaxValues, minMaxFlag);
+        totalRowCount += fileFooter.getNumberOfRows();
       }
     }
     List<Short> blockletCountList = new ArrayList<>();
     blockletCountList.add((short) 0);
     byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountList);
+    // set the total row count
+    summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
     summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
     setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties, minMaxFlag);
     return summaryRow;
@@ -289,6 +296,7 @@ public class BlockDataMap extends CoarseGrainDataMap
     // min max flag for task summary
     boolean[] taskSummaryMinMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
     Arrays.fill(taskSummaryMinMaxFlag, true);
+    long totalRowCount = 0;
     for (DataFileFooter fileFooter : indexInfo) {
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
@@ -331,6 +339,7 @@ public class BlockDataMap extends CoarseGrainDataMap
               summaryRow,
               blockletDataMapInfo.getBlockMetaInfoMap().get(previousBlockInfo.getFilePath()),
               blockMinValues, blockMaxValues, minMaxFlag);
+          totalRowCount += previousDataFileFooter.getNumberOfRows();
           minMaxFlag = new boolean[segmentProperties.getColumnsValueSize().length];
           Arrays.fill(minMaxFlag, true);
           // flag to check whether last file footer entry is different from previous entry.
@@ -361,9 +370,12 @@ public class BlockDataMap extends CoarseGrainDataMap
               blockletDataMapInfo.getBlockMetaInfoMap()
                   .get(previousDataFileFooter.getBlockInfo().getTableBlockInfo().getFilePath()),
               blockMinValues, blockMaxValues, minMaxFlag);
+      totalRowCount += previousDataFileFooter.getNumberOfRows();
       blockletCountInEachBlock.add(totalBlockletsInOneBlock);
     }
     byte[] blockletCount = convertRowCountFromShortToByteArray(blockletCountInEachBlock);
+    // set the total row count
+    summaryRow.setLong(totalRowCount, TASK_ROW_COUNT);
     // blocklet count index is the last index
     summaryRow.setByteArray(blockletCount, taskSummarySchema.length - 1);
     setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
@@ -409,7 +421,7 @@ public class BlockDataMap extends CoarseGrainDataMap
     }
     DataMapRow row = new DataMapRowImpl(schema);
     int ordinal = 0;
-    int taskMinMaxOrdinal = 0;
+    int taskMinMaxOrdinal = 1;
     // get min max values for columns to be cached
     byte[][] minValuesForColumnsToBeCached = BlockletDataMapUtil
         .getMinMaxForColumnsToBeCached(segmentProperties, minMaxCacheColumns, minValues);
@@ -648,6 +660,49 @@ public class BlockDataMap extends CoarseGrainDataMap
     return sum;
   }
 
+  @Override
+  public long getRowCount(Segment segment, List<PartitionSpec> partitions) {
+    long totalRowCount =
+        taskSummaryDMStore.getDataMapRow(getTaskSummarySchema(), 0).getLong(TASK_ROW_COUNT);
+    if (totalRowCount == 0) {
+      Map<String, Long> blockletToRowCountMap = new HashMap<>();
+      getRowCountForEachBlock(segment, partitions, blockletToRowCountMap);
+      for (long blockletRowCount : blockletToRowCountMap.values()) {
+        totalRowCount += blockletRowCount;
+      }
+    } else {
+      if (taskSummaryDMStore.getRowCount() == 0) {
+        return 0L;
+      }
+    }
+    return totalRowCount;
+  }
+
+  public Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
+      Map<String, Long> blockletToRowCountMap) {
+    if (memoryDMStore.getRowCount() == 0) {
+      return new HashMap<>();
+    }
+    // if it has partitioned datamap but there is no partitioned information stored, it means
+    // partitions are dropped so return empty list.
+    if (partitions != null) {
+      if (!validatePartitionInfo(partitions)) {
+        return new HashMap<>();
+      }
+    }
+    CarbonRowSchema[] schema = getFileFooterEntrySchema();
+    int numEntries = memoryDMStore.getRowCount();
+    for (int i = 0; i < numEntries; i++) {
+      DataMapRow dataMapRow = memoryDMStore.getDataMapRow(schema, i);
+      String fileName = new String(dataMapRow.getByteArray(FILE_PATH_INDEX),
+          CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + CarbonTablePath.getCarbonDataExtension();
+      int rowCount = dataMapRow.getInt(ROW_COUNT_INDEX);
+      // prepend segment number with the blocklet file path
+      blockletToRowCountMap.put((segment.getSegmentNo() + "," + fileName), (long) rowCount);
+    }
+    return blockletToRowCountMap;
+  }
+
   private List<Blocklet> prune(FilterResolverIntf filterExp) {
     if (memoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 191056d..7939a17 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -146,6 +146,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
         relativeBlockletId += fileFooter.getBlockletList().size();
       }
     }
+    summaryRow.setLong(0L, TASK_ROW_COUNT);
     setMinMaxFlagForTaskSummary(summaryRow, taskSummarySchema, segmentProperties,
         summaryRowMinMaxFlag);
     return summaryRow;
@@ -163,7 +164,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
       int ordinal = 0;
-      int taskMinMaxOrdinal = 0;
+      int taskMinMaxOrdinal = 1;
       BlockletInfo blockletInfo = blockletList.get(index);
       blockletInfo.setSorted(fileFooter.isSorted());
       BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
index 085fb7d..dcaecd2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapRowIndexes.java
@@ -50,15 +50,17 @@ public interface BlockletDataMapRowIndexes {
   int BLOCKLET_ID_INDEX = 12;
 
   // Summary dataMap row indexes
-  int TASK_MIN_VALUES_INDEX = 0;
+  int TASK_ROW_COUNT = 0;
 
-  int TASK_MAX_VALUES_INDEX = 1;
+  int TASK_MIN_VALUES_INDEX = 1;
 
-  int SUMMARY_INDEX_FILE_NAME = 2;
+  int TASK_MAX_VALUES_INDEX = 2;
 
-  int SUMMARY_SEGMENTID = 3;
+  int SUMMARY_INDEX_FILE_NAME = 3;
 
-  int TASK_MIN_MAX_FLAG = 4;
+  int SUMMARY_SEGMENTID = 4;
 
-  int SUMMARY_INDEX_PATH = 5;
+  int TASK_MIN_MAX_FLAG = 5;
+
+  int SUMMARY_INDEX_PATH = 6;
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
index 7a2e13a..52b9fb3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/SchemaGenerator.java
@@ -113,6 +113,8 @@ public class SchemaGenerator {
       List<CarbonColumn> minMaxCacheColumns,
       boolean storeBlockletCount, boolean filePathToBeStored) throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
+    // for number of rows.
+    taskMinMaxSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
     // get MinMax Schema
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas, minMaxCacheColumns);
     // for storing file name
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index bd8c465..a632f03 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -747,9 +748,12 @@ public class CarbonUpdateUtil {
   /**
    * Return row count of input block
    */
-  public static long getRowCount(
-      BlockMappingVO blockMappingVO,
-      CarbonTable carbonTable) {
+  public static long getRowCount(BlockMappingVO blockMappingVO, CarbonTable carbonTable) {
+    if (blockMappingVO.getBlockRowCountMapping().size() == 1
+        && blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT)
+        != null) {
+      return blockMappingVO.getBlockRowCountMapping().get(CarbonCommonConstantsInternal.ROW_COUNT);
+    }
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable);
     long rowCount = 0;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 281143b..4ba8b8c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -28,11 +28,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -58,7 +58,6 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.stream.StreamFile;
 import org.apache.carbondata.core.stream.StreamPruner;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -576,7 +575,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    * Get the row count of the Block and mapping of segment and Block count.
    */
   public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
-      List<PartitionSpec> partitions) throws IOException {
+      List<PartitionSpec> partitions, boolean isUpdateFlow) throws IOException {
     // Normal query flow goes to CarbonInputFormat#getPrunedBlocklets and initialize the
     // pruning info for table we queried. But here count star query without filter uses a different
     // query plan, and no pruning info is initialized. When it calls default data map to
@@ -586,7 +585,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     ExplainCollector.remove();
 
     AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
-    TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
+    TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
 
     ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier);
     LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
@@ -602,6 +601,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // TODO: currently only batch segment is supported, add support for streaming table
     List<Segment> filteredSegment =
         getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope);
+    boolean isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
     /* In the select * flow, getSplits() method was clearing the segmentMap if,
     segment needs refreshing. same thing need for select count(*) flow also.
     For NonTransactional table, one of the reason for a segment refresh is below scenario.
@@ -624,36 +624,40 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
           .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
               toBeCleanedSegments);
     }
-    List<ExtendedBlocklet> blocklets =
-        blockletMap.prune(filteredSegment, (FilterResolverIntf) null, partitions);
-    for (ExtendedBlocklet blocklet : blocklets) {
-      String blockName = blocklet.getPath();
-      blockName = CarbonTablePath.getCarbonDataFileName(blockName);
-      blockName = blockName + CarbonTablePath.getCarbonDataExtension();
-
-      long rowCount = blocklet.getDetailInfo().getRowCount();
-
-      String segmentId = Segment.toSegment(blocklet.getSegmentId()).getSegmentNo();
-      String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
-
-      // if block is invalid then don't add the count
-      SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
-
-      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
-        Long blockCount = blockRowCountMapping.get(key);
-        if (blockCount == null) {
-          blockCount = 0L;
-          Long count = segmentAndBlockCountMapping.get(segmentId);
-          if (count == null) {
-            count = 0L;
+    if (isIUDTable || isUpdateFlow) {
+      Map<String, Long> blockletToRowCountMap =
+          defaultDataMap.getBlockRowCount(filteredSegment, partitions, defaultDataMap);
+      // key is the (segmentId","+blockletPath) and key is the row count of that blocklet
+      for (Map.Entry<String, Long> eachBlocklet : blockletToRowCountMap.entrySet()) {
+        String[] segmentIdAndPath = eachBlocklet.getKey().split(",", 2);
+        String segmentId = segmentIdAndPath[0];
+        String blockName = segmentIdAndPath[1];
+
+        long rowCount = eachBlocklet.getValue();
+
+        String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
+
+        // if block is invalid then don't add the count
+        SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+        if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
+          Long blockCount = blockRowCountMapping.get(key);
+          if (blockCount == null) {
+            blockCount = 0L;
+            Long count = segmentAndBlockCountMapping.get(segmentId);
+            if (count == null) {
+              count = 0L;
+            }
+            segmentAndBlockCountMapping.put(segmentId, count + 1);
           }
-          segmentAndBlockCountMapping.put(segmentId, count + 1);
+          blockCount += rowCount;
+          blockRowCountMapping.put(key, blockCount);
         }
-        blockCount += rowCount;
-        blockRowCountMapping.put(key, blockCount);
       }
+    } else {
+      long totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap);
+      blockRowCountMapping.put(CarbonCommonConstantsInternal.ROW_COUNT, totalRowCount);
     }
-
     return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
 
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 7c9a9fc..001964a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.dev.DataMap
 import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager, Segment, TableDataMap}
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
-import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap}
+import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap, BlockletDataMapRowIndexes}
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -93,7 +93,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
     val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
     val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
       .getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount, false)
-    val minSchemas = summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
+    val minSchemas = summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX)
+      .asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
       .getChildSchemas
     minSchemas.length == expectedLength
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 297cb54..cfceea4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -64,7 +64,7 @@ case class CarbonCountStar(
           sparkSession,
           TableIdentifier(
             carbonTable.getTableName,
-            Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
+            Some(carbonTable.getDatabaseName))).map(_.asJava).orNull, false),
       carbonTable)
     val valueRaw =
       attributesRaw.head.dataType match {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index a88a02b..7337496 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -104,7 +104,7 @@ object DeleteExecution {
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
-          TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
+          TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull, true)
     val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)