You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/09/16 17:05:38 UTC

[carbondata] branch master updated: [CARBONDATA-3454] optimized index server output for count(*)

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 41ac71a  [CARBONDATA-3454] optimized index server output for count(*)
41ac71a is described below

commit 41ac71a7ef96a6725ee9b6a8f26bf4836bd535f9
Author: kunal642 <ku...@gmail.com>
AuthorDate: Thu Jun 27 14:32:11 2019 +0530

    [CARBONDATA-3454] optimized index server output for count(*)
    
    Optimised the output for count(*) queries so that only a long is send back to the driver to reduce the network transfer cost for index server
    
    This closes #3308
---
 .../apache/carbondata/core/datamap/DataMapJob.java |   2 +
 .../carbondata/core/datamap/DataMapUtil.java       |  13 ++-
 .../core/datamap/DistributableDataMapFormat.java   |  34 +++++--
 .../core/indexstore/ExtendedBlocklet.java          |  68 ++++++++-----
 .../core/indexstore/ExtendedBlockletWrapper.java   |  27 +++--
 .../ExtendedBlockletWrapperContainer.java          |  19 ++--
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  52 ++++++++--
 .../hadoop/api/CarbonTableInputFormat.java         |  22 ++--
 .../carbondata/indexserver/DataMapJobs.scala       |  15 ++-
 .../indexserver/DistributedCountRDD.scala          | 111 +++++++++++++++++++++
 .../indexserver/DistributedPruneRDD.scala          |  29 ++----
 .../indexserver/DistributedRDDUtils.scala          |  13 +++
 .../carbondata/indexserver/IndexServer.scala       |  19 ++++
 13 files changed, 319 insertions(+), 105 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
index 9eafe7c..326282d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
@@ -35,4 +35,6 @@ public interface DataMapJob extends Serializable {
 
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat);
 
+  Long executeCountJob(DistributableDataMapFormat dataMapFormat);
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index dd9debc..bca7409 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -230,7 +230,7 @@ public class DataMapUtil {
       List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
       List<String> segmentsToBeRefreshed) throws IOException {
     return executeDataMapJob(carbonTable, resolver, dataMapJob, partitionsToPrune, validSegments,
-        invalidSegments, level, false, segmentsToBeRefreshed);
+        invalidSegments, level, false, segmentsToBeRefreshed, false);
   }
 
   /**
@@ -241,7 +241,8 @@ public class DataMapUtil {
   public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, DataMapLevel level,
-      Boolean isFallbackJob, List<String> segmentsToBeRefreshed) throws IOException {
+      Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob)
+      throws IOException {
     List<String> invalidSegmentNo = new ArrayList<>();
     for (Segment segment : invalidSegments) {
       invalidSegmentNo.add(segment.getSegmentNo());
@@ -250,9 +251,11 @@ public class DataMapUtil {
     DistributableDataMapFormat dataMapFormat =
         new DistributableDataMapFormat(carbonTable, resolver, validSegments, invalidSegmentNo,
             partitionsToPrune, false, level, isFallbackJob);
-    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat);
-    // Apply expression on the blocklets.
-    return prunedBlocklets;
+    if (isCountJob) {
+      dataMapFormat.setCountStarJob();
+      dataMapFormat.setIsWriteToFile(false);
+    }
+    return dataMapJob.execute(dataMapFormat);
   }
 
   public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 8426fcb..b430c5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -91,6 +90,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private boolean isWriteToFile = true;
 
+  private boolean isCountStarJob = false;
+
   DistributableDataMapFormat() {
 
   }
@@ -103,7 +104,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     this.dataMapToClear = dataMapToClear;
   }
 
-  DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
+  public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf filterResolverIntf,
       List<Segment> validSegments, List<String> invalidSegments, List<PartitionSpec> partitions,
       boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean isFallbackJob)
       throws IOException {
@@ -136,7 +137,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     return new RecordReader<Void, ExtendedBlocklet>() {
       private Iterator<ExtendedBlocklet> blockletIterator;
       private ExtendedBlocklet currBlocklet;
-      private List<DataMap> dataMaps;
 
       @Override
       public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -149,7 +149,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
         if (dataMapLevel == null) {
           TableDataMap defaultDataMap = DataMapStoreManager.getInstance()
               .getDataMap(table, distributable.getDistributable().getDataMapSchema());
-          dataMaps = defaultDataMap.getTableDataMaps(distributable.getDistributable());
           blocklets = defaultDataMap
               .prune(segmentsToLoad, new DataMapFilter(filterResolverIntf), partitions);
           blocklets = DataMapUtil
@@ -192,11 +191,6 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
       @Override
       public void close() throws IOException {
-        if (null != dataMaps) {
-          for (DataMap dataMap : dataMaps) {
-            dataMap.finish();
-          }
-        }
       }
     };
   }
@@ -247,6 +241,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     out.writeUTF(taskGroupDesc);
     out.writeUTF(queryId);
     out.writeBoolean(isWriteToFile);
+    out.writeBoolean(isCountStarJob);
   }
 
   @Override
@@ -292,6 +287,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     this.taskGroupDesc = in.readUTF();
     this.queryId = in.readUTF();
     this.isWriteToFile = in.readBoolean();
+    this.isCountStarJob = in.readBoolean();
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -398,9 +394,29 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     return validSegments;
   }
 
+  public List<Segment> getValidSegments() {
+    return validSegments;
+  }
+
   public void createDataMapChooser() throws IOException {
     if (null != filterResolverIntf) {
       this.dataMapChooser = new DataMapChooser(table);
     }
   }
+
+  public void setCountStarJob() {
+    this.isCountStarJob = true;
+  }
+
+  public boolean isCountStarJob() {
+    return this.isCountStarJob;
+  }
+
+  public List<PartitionSpec> getPartitions() {
+    return partitions;
+  }
+
+  public ReadCommittedScope getReadCommittedScope() {
+    return readCommittedScope;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index a85423b..611e969 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -39,6 +39,10 @@ public class ExtendedBlocklet extends Blocklet {
 
   private CarbonInputSplit inputSplit;
 
+  private Long count;
+
+  private String segmentNo;
+
   public ExtendedBlocklet() {
 
   }
@@ -78,6 +82,9 @@ public class ExtendedBlocklet extends Blocklet {
   }
 
   public String getSegmentId() {
+    if (segmentNo != null) {
+      return segmentNo;
+    }
     return this.inputSplit.getSegmentId();
   }
 
@@ -92,8 +99,12 @@ public class ExtendedBlocklet extends Blocklet {
     return getFilePath();
   }
 
-  public String getDataMapWriterPath() {
-    return this.inputSplit.getDataMapWritePath();
+  public Long getRowCount() {
+    if (count != null) {
+      return count;
+    } else {
+      return (long) inputSplit.getRowCount();
+    }
   }
 
   public void setDataMapWriterPath(String dataMapWriterPath) {
@@ -161,30 +172,35 @@ public class ExtendedBlocklet extends Blocklet {
    * @param uniqueLocation
    * @throws IOException
    */
-  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation)
+  public void serializeData(DataOutput out, Map<String, Short> uniqueLocation, boolean isCountJob)
       throws IOException {
     super.write(out);
-    if (dataMapUniqueId == null) {
-      out.writeBoolean(false);
+    if (isCountJob) {
+      out.writeLong(inputSplit.getRowCount());
+      out.writeUTF(inputSplit.getSegmentId());
     } else {
-      out.writeBoolean(true);
-      out.writeUTF(dataMapUniqueId);
-    }
-    out.writeBoolean(inputSplit != null);
-    if (inputSplit != null) {
-      // creating byte array output stream to get the size of input split serializeData size
-      ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
-      DataOutputStream dos = new DataOutputStream(ebos);
-      inputSplit.setFilePath(null);
-      inputSplit.setBucketId(null);
-      if (inputSplit.isBlockCache()) {
-        inputSplit.updateFooteroffset();
-        inputSplit.updateBlockLength();
-        inputSplit.setWriteDetailInfo(false);
+      if (dataMapUniqueId == null) {
+        out.writeBoolean(false);
+      } else {
+        out.writeBoolean(true);
+        out.writeUTF(dataMapUniqueId);
+      }
+      out.writeBoolean(inputSplit != null);
+      if (inputSplit != null) {
+        // creating byte array output stream to get the size of input split serializeData size
+        ExtendedByteArrayOutputStream ebos = new ExtendedByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(ebos);
+        inputSplit.setFilePath(null);
+        inputSplit.setBucketId(null);
+        if (inputSplit.isBlockCache()) {
+          inputSplit.updateFooteroffset();
+          inputSplit.updateBlockLength();
+          inputSplit.setWriteDetailInfo(false);
+        }
+        inputSplit.serializeFields(dos, uniqueLocation);
+        out.writeInt(ebos.size());
+        out.write(ebos.getBuffer(), 0, ebos.size());
       }
-      inputSplit.serializeFields(dos, uniqueLocation);
-      out.writeInt(ebos.size());
-      out.write(ebos.getBuffer(), 0 , ebos.size());
     }
   }
 
@@ -195,9 +211,15 @@ public class ExtendedBlocklet extends Blocklet {
    * @param tablePath
    * @throws IOException
    */
-  public void deserializeFields(DataInput in, String[] locations, String tablePath)
+  public void deserializeFields(DataInput in, String[] locations, String tablePath,
+      boolean isCountJob)
       throws IOException {
     super.readFields(in);
+    if (isCountJob) {
+      count = in.readLong();
+      segmentNo = in.readUTF();
+      return;
+    }
     if (in.readBoolean()) {
       dataMapUniqueId = in.readUTF();
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
index ab051ea..f722f32 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -56,25 +56,20 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
 
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());
-
+  private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+  private static final int BLOCK_SIZE = 256 * 1024 * 1024;
   private boolean isWrittenToFile;
-
   private int dataSize;
-
   private byte[] bytes;
 
-  private static final int BUFFER_SIZE = 8 * 1024 * 1024;
-
-  private static final int BLOCK_SIZE = 256 * 1024 * 1024;
-
   public ExtendedBlockletWrapper() {
 
   }
 
   public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
-      String queryId, boolean isWriteToFile) {
+      String queryId, boolean isWriteToFile, boolean isCountJob) {
     Map<String, Short> uniqueLocations = new HashMap<>();
-    byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList);
+    byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
     int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
             CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
@@ -122,13 +117,13 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
   }
 
   private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
-      List<ExtendedBlocklet> extendedBlockletList) {
+      List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
     ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
     DataOutputStream stream = new DataOutputStream(bos);
     try {
       for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
         extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
-        extendedBlocklet.serializeData(stream, uniqueLocations);
+        extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
       }
       return new SnappyCompressor().compressByte(bos.toByteArray());
     } catch (IOException e) {
@@ -142,6 +137,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
    * Below method will be used to write the data to stream[file/memory]
    * Data Format
    * <number of splits><number of unique location[short]><locations><serialize data len><data>
+   *
    * @param stream
    * @param data
    * @param uniqueLocation
@@ -158,7 +154,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
       final Map.Entry<String, Short> next = iterator.next();
       uniqueLoc[next.getValue()] = next.getKey();
     }
-    stream.writeShort((short)uniqueLoc.length);
+    stream.writeShort((short) uniqueLoc.length);
     for (String loc : uniqueLoc) {
       stream.writeUTF(loc);
     }
@@ -170,12 +166,14 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
    * deseralize the blocklet data from file or stream
    * data format
    * <number of splits><number of unique location[short]><locations><serialize data len><data>
+   *
    * @param tablePath
    * @param queryId
    * @return
    * @throws IOException
    */
-  public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId) throws IOException {
+  public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
+      throws IOException {
     byte[] data;
     if (bytes != null) {
       if (isWrittenToFile) {
@@ -218,7 +216,7 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
       try {
         for (int i = 0; i < numberOfBlocklet; i++) {
           ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
-          extendedBlocklet.deserializeFields(eDIS, locations, tablePath);
+          extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
           extendedBlockletList.add(extendedBlocklet);
         }
       } finally {
@@ -248,4 +246,5 @@ public class ExtendedBlockletWrapper implements Writable, Serializable {
     }
     this.dataSize = in.readInt();
   }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
index 0c52297..40acf9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapperContainer.java
@@ -62,8 +62,8 @@ public class ExtendedBlockletWrapperContainer implements Writable {
     this.isFallbackJob = isFallbackJob;
   }
 
-  public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId)
-      throws IOException {
+  public List<ExtendedBlocklet> getExtendedBlockets(String tablePath, String queryId,
+      boolean isCountJob) throws IOException {
     if (!isFallbackJob) {
       int numOfThreads = CarbonProperties.getNumOfThreadsForPruning();
       ExecutorService executorService = Executors
@@ -85,8 +85,8 @@ public class ExtendedBlockletWrapperContainer implements Writable {
       List<Future<List<ExtendedBlocklet>>> futures = new ArrayList<>();
       for (int i = 0; i < split.length; i++) {
         end += split[i];
-        futures.add(executorService
-            .submit(new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId)));
+        futures.add(executorService.submit(
+            new ExtendedBlockletDeserializerThread(start, end, tablePath, queryId, isCountJob)));
         start += split[i];
       }
       executorService.shutdown();
@@ -109,7 +109,8 @@ public class ExtendedBlockletWrapperContainer implements Writable {
     } else {
       List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
       for (ExtendedBlockletWrapper extendedBlockletWrapper: extendedBlockletWrappers) {
-        extendedBlocklets.addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId));
+        extendedBlocklets
+            .addAll(extendedBlockletWrapper.readBlocklet(tablePath, queryId, isCountJob));
       }
       return extendedBlocklets;
     }
@@ -125,18 +126,22 @@ public class ExtendedBlockletWrapperContainer implements Writable {
 
     private String queryId;
 
+    private boolean isCountJob;
+
     public ExtendedBlockletDeserializerThread(int start, int end, String tablePath,
-        String queryId) {
+        String queryId, boolean isCountJob) {
       this.start = start;
       this.end = end;
       this.tablePath = tablePath;
       this.queryId = queryId;
+      this.isCountJob = isCountJob;
     }
 
     @Override public List<ExtendedBlocklet> call() throws Exception {
       List<ExtendedBlocklet> extendedBlocklets = new ArrayList<>();
       for (int i = start; i < end; i++) {
-        extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId));
+        extendedBlocklets.addAll(extendedBlockletWrappers[i].readBlocklet(tablePath, queryId,
+            isCountJob));
       }
       return extendedBlocklets;
     }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 45041e4..56ccabc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datamap.DataMapJob;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.DataMapUtil;
+import org.apache.carbondata.core.datamap.DistributableDataMapFormat;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
@@ -412,10 +413,42 @@ m filterExpression
    */
   @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
 
-  List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
+  /**
+   * This method will execute a distributed job(DistributedDataMapJob) to get the count for the
+   * table. If the DistributedDataMapJob fails for some reason then an embedded job is fired to
+   * get the count.
+   */
+  Long getDistributedCount(CarbonTable table,
+      List<PartitionSpec> partitionNames, List<Segment> validSegments) throws IOException {
+    DistributableDataMapFormat dataMapFormat =
+        new DistributableDataMapFormat(table, null, validSegments, new ArrayList<String>(),
+            partitionNames, false, null, false);
+    dataMapFormat.setIsWriteToFile(false);
+    try {
+      DataMapJob dataMapJob =
+          (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
+      if (dataMapJob == null) {
+        throw new ExceptionInInitializerError("Unable to create DistributedDataMapJob");
+      }
+      return dataMapJob.executeCountJob(dataMapFormat);
+    } catch (Exception e) {
+      LOG.error("Failed to get count from index server. Initializing fallback", e);
+      DataMapJob dataMapJob = DataMapUtil.getEmbeddedJob();
+      return dataMapJob.executeCountJob(dataMapFormat);
+    }
+  }
+
+  List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table,
+      List<PartitionSpec> partitionNames, List<Segment> validSegments,
+      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) throws IOException {
+    return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
+        segmentsToBeRefreshed, true);
+  }
+
+  private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
       List<Segment> validSegments, List<Segment> invalidSegments,
-      List<String> segmentsToBeRefreshed) throws IOException {
+      List<String> segmentsToBeRefreshed, boolean isCountJob) throws IOException {
     try {
       DataMapJob dataMapJob =
           (DataMapJob) DataMapUtil.createDataMapJob(DataMapUtil.DISTRIBUTED_JOB_NAME);
@@ -424,7 +457,7 @@ m filterExpression
       }
       return DataMapUtil
           .executeDataMapJob(table, filterResolverIntf, dataMapJob, partitionNames, validSegments,
-              invalidSegments, null, segmentsToBeRefreshed);
+              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
     } catch (Exception e) {
       // Check if fallback is disabled for testing purposes then directly throw exception.
       if (CarbonProperties.getInstance().isFallBackDisabled()) {
@@ -432,10 +465,9 @@ m filterExpression
       }
       LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
           + "back to embedded mode", e);
-      return DataMapUtil
-          .executeDataMapJob(table, filterResolverIntf,
-              DataMapUtil.getEmbeddedJob(), partitionNames, validSegments, invalidSegments, null,
-              true, segmentsToBeRefreshed);
+      return DataMapUtil.executeDataMapJob(table, filterResolverIntf,
+          DataMapUtil.getEmbeddedJob(), partitionNames, validSegments,
+          invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
     }
   }
 
@@ -545,7 +577,7 @@ m filterExpression
       try {
         prunedBlocklets =
             getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
-                invalidSegments, segmentsToBeRefreshed);
+                invalidSegments, segmentsToBeRefreshed, false);
       } catch (Exception e) {
         // Check if fallback is disabled then directly throw exception otherwise try driver
         // pruning.
@@ -580,7 +612,7 @@ m filterExpression
           if (distributedCG && dataMapJob != null) {
             cgPrunedBlocklets = DataMapUtil
                 .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
-                    segmentIds, invalidSegments, DataMapLevel.CG, true, new ArrayList<String>());
+                    segmentIds, invalidSegments, DataMapLevel.CG, new ArrayList<String>());
           } else {
             cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
           }
@@ -616,7 +648,7 @@ m filterExpression
           // Prune segments from already pruned blocklets
           fgPrunedBlocklets = DataMapUtil
               .executeDataMapJob(carbonTable, filter.getResolver(), dataMapJob, partitionsToPrune,
-                  segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(), true,
+                  segmentIds, invalidSegments, fgDataMapExprWrapper.getDataMapLevel(),
                   new ArrayList<String>());
           // note that the 'fgPrunedBlocklets' has extra datamap related info compared with
           // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 3b7a800..74a4d6e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -562,15 +563,14 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       if (CarbonProperties.getInstance()
           .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
         try {
-          List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
-              getDistributedSplit(table, null, partitions, filteredSegment,
-                  allSegments.getInvalidSegments(), toBeCleanedSegments));
-          for (InputSplit extendedBlocklet : extendedBlocklets) {
-            CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
+          List<ExtendedBlocklet> extendedBlocklets =
+              getDistributedBlockRowCount(table, partitions, filteredSegment,
+                  allSegments.getInvalidSegments(), toBeCleanedSegments);
+          for (ExtendedBlocklet blocklet : extendedBlocklets) {
             String filePath = blocklet.getFilePath().replace("\\", "/");
             String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
             blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
-                (long) blocklet.getRowCount());
+                blocklet.getRowCount());
           }
         } catch (Exception e) {
           // Check if fallback is disabled then directly throw exception otherwise try driver
@@ -615,15 +615,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         }
       }
     } else {
-      long totalRowCount = 0L;
+      long totalRowCount;
       if (CarbonProperties.getInstance()
           .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
-        List<InputSplit> extendedBlocklets = CarbonTableInputFormat.convertToCarbonInputSplit(
-            getDistributedSplit(table, null, partitions, filteredSegment,
-                allSegments.getInvalidSegments(), new ArrayList<String>()));
-        for (InputSplit extendedBlocklet : extendedBlocklets) {
-          totalRowCount += ((CarbonInputSplit) extendedBlocklet).getRowCount();
-        }
+        totalRowCount =
+            getDistributedCount(table, partitions, filteredSegment);
       } else {
         TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
         totalRowCount = defaultDataMap.getRowCount(filteredSegment, partitions, defaultDataMap);
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 01b8824..1fee051 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -65,7 +65,8 @@ class DistributedDataMapJob extends AbstractDataMapJob {
           dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
         dataMapFormat.setFilterResolverIntf(filterInf)
         IndexServer.getClient.getSplits(dataMapFormat)
-          .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId)
+          .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat
+            .getQueryId, dataMapFormat.isCountStarJob)
       } finally {
         val tmpPath = CarbonUtil
           .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath,
@@ -106,7 +107,11 @@ class DistributedDataMapJob extends AbstractDataMapJob {
         filterInf.getFilterExpression.getFilterExpressionType == ExpressionType.UNKNOWN) {
       return filterProcessor.changeUnknownResloverToTrue(tableIdentifer)
     }
-    return filterInf;
+    filterInf
+  }
+
+  override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
+    IndexServer.getClient.getCount(dataMapFormat).get()
   }
 }
 
@@ -122,7 +127,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
     dataMapFormat.setIsWriteToFile(false)
     dataMapFormat.setFallbackJob()
     val splits = IndexServer.getSplits(dataMapFormat).getExtendedBlockets(dataMapFormat
-      .getCarbonTable.getTablePath, dataMapFormat.getQueryId)
+      .getCarbonTable.getTablePath, dataMapFormat.getQueryId, dataMapFormat.isCountStarJob)
     // Fire a job to clear the cache from executors as Embedded mode does not maintain the cache.
     IndexServer.invalidateSegmentCache(dataMapFormat.getCarbonTable, dataMapFormat
       .getValidSegmentIds.asScala.toArray)
@@ -130,4 +135,8 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
     splits
   }
 
+  override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
+    IndexServer.getCount(dataMapFormat).get()
+  }
+
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
new file mode 100644
index 0000000..4a080fa
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.indexserver
+
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapreduce.{InputSplit, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+/**
+ * An RDD which will get the count for the table.
+ */
+class DistributedCountRDD(@transient ss: SparkSession, dataMapFormat: DistributableDataMapFormat)
+  extends CarbonRDD[(String, String)](ss, Nil) {
+
+  @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
+    .getName)
+
+  override protected def getPreferredLocations(split: Partition): Seq[String] = {
+    if (split.asInstanceOf[DataMapRDDPartition].getLocations != null) {
+      split.asInstanceOf[DataMapRDDPartition].getLocations.toSeq
+    } else {
+      Seq()
+    }
+  }
+
+  override def internalCompute(split: Partition,
+      context: TaskContext): Iterator[(String, String)] = {
+    val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId,
+      id, TaskType.MAP, split.index, 0)
+    val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
+    val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
+    val numOfThreads = CarbonProperties.getInstance().getNumOfThreadsForExecutorPruning
+    val service = Executors
+      .newFixedThreadPool(numOfThreads, new CarbonThreadFactory("IndexPruningPool", true))
+    implicit val ec: ExecutionContextExecutor = ExecutionContext
+      .fromExecutor(service)
+    val futures = if (inputSplits.length <= numOfThreads) {
+      inputSplits.map {
+        split => generateFuture(Seq(split))
+      }
+    } else {
+      DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map {
+        splits => generateFuture(splits)
+      }
+    }
+    // scalastyle:off awaitresult
+    val results = Await.result(Future.sequence(futures), Duration.Inf).flatten
+    // scalastyle:on awaitresult
+    val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+      SparkEnv.get.blockManager.blockManagerId.executorId
+    }"
+    val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
+      CacheProvider.getInstance().getCarbonCache.getCurrentSize
+    } else {
+      0L
+    }
+    Iterator((executorIP + "_" + cacheSize.toString, results.map(_._2.toLong).sum.toString))
+  }
+
+  override protected def internalGetPartitions: Array[Partition] = {
+    new DistributedPruneRDD(ss, dataMapFormat).partitions
+  }
+
+  private def generateFuture(split: Seq[InputSplit])
+    (implicit executionContext: ExecutionContext) = {
+    Future {
+      val segments = split.map { inputSplit =>
+        val distributable = inputSplit.asInstanceOf[DataMapDistributableWrapper]
+        distributable.getDistributable.getSegment
+          .setReadCommittedScope(dataMapFormat.getReadCommittedScope)
+        distributable.getDistributable.getSegment
+      }
+      val defaultDataMap = DataMapStoreManager.getInstance
+        .getDataMap(dataMapFormat.getCarbonTable, split.head
+          .asInstanceOf[DataMapDistributableWrapper].getDistributable.getDataMapSchema)
+      defaultDataMap.getBlockRowCount(segments.toList.asJava, dataMapFormat
+        .getPartitions, defaultDataMap).asScala
+    }
+  }
+
+}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index d8b9c19..76d33b4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -17,15 +17,12 @@
 
 package org.apache.carbondata.indexserver
 
-import java.text.SimpleDateFormat
-import java.util.Date
 import java.util.concurrent.Executors
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
 import scala.concurrent.duration.Duration
 
-import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.mapred.{RecordReader, TaskAttemptID}
 import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -35,9 +32,8 @@ import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, DistributableDataMapFormat, TableDataMap}
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DistributableDataMapFormat}
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
-import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.{ExtendedBlocklet, ExtendedBlockletWrapper}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonThreadFactory}
@@ -65,21 +61,13 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
 
   @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
     .getName)
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-  var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
 
-  private def groupSplits(xs: Seq[InputSplit], n: Int) = {
-    val (quot, rem) = (xs.size / n, xs.size % n)
-    val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1))
-    (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList
-  }
+  var readers: scala.collection.Iterator[RecordReader[Void, ExtendedBlocklet]] = _
 
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(String, ExtendedBlockletWrapper)] = {
-    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+    val attemptId = new TaskAttemptID(DistributedRDDUtils.generateTrackerId,
+      id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
     val inputSplits = split.asInstanceOf[DataMapRDDPartition].inputSplit
     if (dataMapFormat.isJobToClearDataMaps) {
@@ -118,7 +106,7 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
           split => generateFuture(Seq(split), attemptContext)
         }
       } else {
-        groupSplits(inputSplits, numOfThreads).map {
+        DistributedRDDUtils.groupSplits(inputSplits, numOfThreads).map {
           splits => generateFuture(splits, attemptContext)
         }
       }
@@ -139,14 +127,13 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
       }"
       val value = (executorIP + "_" + cacheSize.toString, new ExtendedBlockletWrapper(f.toList
         .asJava, dataMapFormat.getCarbonTable.getTablePath, dataMapFormat.getQueryId,
-        dataMapFormat.isWriteToFile))
+        dataMapFormat.isWriteToFile, dataMapFormat.isCountStarJob))
       Iterator(value)
     }
   }
 
-  private def generateFuture(split: Seq[InputSplit],
-      attemptContextImpl: TaskAttemptContextImpl)
-    (implicit executionContext: ExecutionContext) = {
+  private def generateFuture(split: Seq[InputSplit], attemptContextImpl: TaskAttemptContextImpl)
+    (implicit executionContext: ExecutionContext): Future[Seq[ExtendedBlocklet]] = {
     Future {
       split.flatMap { inputSplit =>
         val blocklets = new java.util.ArrayList[ExtendedBlocklet]()
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 933ec15..4819779 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.indexserver
 
+import java.text.SimpleDateFormat
+import java.util.Date
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
@@ -325,4 +327,15 @@ object DistributedRDDUtils {
     }
   }
 
+  def groupSplits(xs: Seq[InputSplit], n: Int): List[Seq[InputSplit]] = {
+    val (quot, rem) = (xs.size / n, xs.size % n)
+    val (smaller, bigger) = xs.splitAt(xs.size - rem * (quot + 1))
+    (smaller.grouped(quot) ++ bigger.grouped(quot + 1)).toList
+  }
+
+  def generateTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index fdaa3d1..abee487 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.ipc.{ProtocolInfo, RPC}
 import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.security.{KerberosInfo, SecurityUtil, UserGroupInformation}
@@ -58,6 +59,9 @@ trait ServerInterface {
    */
   def invalidateSegmentCache(carbonTable: CarbonTable,
       segmentIds: Array[String], jobGroupId: String = ""): Unit
+
+  def getCount(request: DistributableDataMapFormat): LongWritable
+
 }
 
 /**
@@ -99,6 +103,21 @@ object IndexServer extends ServerInterface {
     })
   }
 
+  def getCount(request: DistributableDataMapFormat): LongWritable = {
+    doAs {
+      if (!request.isFallbackJob) {
+        sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
+        sparkSession.sparkContext
+          .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
+      }
+      val splits = new DistributedCountRDD(sparkSession, request).collect()
+      if (!request.isFallbackJob) {
+        DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+      }
+      new LongWritable(splits.map(_._2.toLong).sum)
+    }
+  }
+
   def getSplits(request: DistributableDataMapFormat): ExtendedBlockletWrapperContainer = {
     doAs {
       if (!request.isFallbackJob) {