You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/30 16:34:05 UTC

[12/26] carbondata git commit: [CARBONDATA-3118] Parallelize block pruning of default datamap in driver for filter query processing

[CARBONDATA-3118] Parallelize block pruning of default datamap in driver for filter query processing

Parallelize block pruning of default datamap in driver for filter query processing

Background:
We do block pruning for the filter queries at the driver side.
In real time big data scenario, we can have millions of carbon files for
one carbon table.
It is currently observed that for 1 million carbon files it takes around 5
seconds for block pruning. As each carbon file takes around 0.005ms for
pruning
(with only one filter columns set in 'column_meta_cache' tblproperty).
If the files are more, we might take more time for block pruning.
Also, spark Job will not be launched until block pruning is completed. so,
the user will not know what is happening at that time and why spark job is
not launching.
currently, block pruning is taking time as each segment processing is
happening sequentially. we can reduce the time by parallelizing it.

solution:
Keep default number of threads for block pruning as 4.
User can reduce this number by a carbon property
carbon.max.driver.threads.for.pruning to set between -> 1 to 4.

In TableDataMap.prune(),

group the segments as per the threads by distributing equal carbon files to
each thread.
Launch the threads for a group of segments to handle block pruning.

This closes #2936


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf539281
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf539281
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf539281

Branch: refs/heads/branch-1.5
Commit: cf539281eb465c3823b6155e84cbecccc1fb501c
Parents: e8955a7
Author: ajantha-bhat <aj...@gmail.com>
Authored: Tue Nov 20 22:15:06 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 30 21:55:29 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  11 +
 .../core/datamap/SegmentDataMapGroup.java       |  50 +++++
 .../carbondata/core/datamap/TableDataMap.java   | 213 +++++++++++++++++--
 .../indexstore/blockletindex/BlockDataMap.java  |   2 +-
 .../blockletindex/BlockletDataMap.java          |   3 +-
 .../core/profiler/TablePruningInfo.java         |   6 +-
 docs/configuration-parameters.md                |   1 +
 .../hadoop/api/CarbonInputFormat.java           |   3 +
 8 files changed, 266 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 094e552..a61d86f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1399,6 +1399,17 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "false";
 
+  /**
+   * max driver threads used for block pruning [1 to 4 threads]
+   */
+  @CarbonProperty public static final String CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING =
+      "carbon.max.driver.threads.for.block.pruning";
+
+  public static final String CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT = "4";
+
+  // block prune in multi-thread if files size more than 100K files.
+  public static final int CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 100000;
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Datamap parameter start here
   //////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/core/src/main/java/org/apache/carbondata/core/datamap/SegmentDataMapGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/SegmentDataMapGroup.java b/core/src/main/java/org/apache/carbondata/core/datamap/SegmentDataMapGroup.java
new file mode 100644
index 0000000..1592d25
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/SegmentDataMapGroup.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper that have a group of selected datamap for segment.
+ * this have information about from which index to which index need to process the datamap.
+ */
+public class SegmentDataMapGroup implements Serializable {
+
+  private Segment segment;
+
+  private int fromIndex;
+
+  private int toIndex;
+
+  public SegmentDataMapGroup(Segment segment, int fromIndex, int toIndex) {
+    this.segment = segment;
+    this.fromIndex = fromIndex;
+    this.toIndex = toIndex;
+  }
+
+  public Segment getSegment() {
+    return segment;
+  }
+
+  public int getFromIndex() {
+    return fromIndex;
+  }
+
+  public int getToIndex() {
+    return toIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index a272777..e1b2c13 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -20,6 +20,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -34,14 +41,19 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
 import org.apache.carbondata.events.OperationEventListener;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * Index at the table level, user can add any number of DataMap for one table, by
  * {@code
@@ -63,6 +75,8 @@ public final class TableDataMap extends OperationEventListener {
 
   private SegmentPropertiesFetcher segmentPropertiesFetcher;
 
+  private static final Log LOG = LogFactory.getLog(TableDataMap.class);
+
   /**
    * It is called to initialize and load the required table datamap metadata.
    */
@@ -101,7 +115,6 @@ public final class TableDataMap extends OperationEventListener {
       } else {
         segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
         for (DataMap dataMap : dataMaps.get(segment)) {
-
           pruneBlocklets
               .addAll(dataMap.prune(filterExp, segmentProperties, partitions, identifier));
         }
@@ -120,29 +133,196 @@ public final class TableDataMap extends OperationEventListener {
    * @param filterExp
    * @return
    */
-  public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
-      List<PartitionSpec> partitions) throws IOException {
-    List<ExtendedBlocklet> blocklets = new ArrayList<>();
-    SegmentProperties segmentProperties;
-    Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
+  public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolverIntf filterExp,
+      final List<PartitionSpec> partitions) throws IOException {
+    final List<ExtendedBlocklet> blocklets = new ArrayList<>();
+    final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
+    // for non-filter queries
+    if (filterExp == null) {
+      // if filter is not passed, then return all the blocklets.
+      return pruneWithoutFilter(segments, partitions, blocklets);
+    }
+    // for filter queries
+    int totalFiles = 0;
+    int datamapsCount = 0;
+    boolean isBlockDataMapType = true;
+    for (Segment segment : segments) {
+      for (DataMap dataMap : dataMaps.get(segment)) {
+        if (!(dataMap instanceof BlockDataMap)) {
+          isBlockDataMapType = false;
+          break;
+        }
+        totalFiles += ((BlockDataMap) dataMap).getTotalBlocks();
+        datamapsCount++;
+      }
+      if (!isBlockDataMapType) {
+        // totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, bloom datamap. use old flow.
+        break;
+      }
+    }
+    int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning || totalFiles
+        < CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) {
+      // use multi-thread, only if the files are more than 0.1 million.
+      // As 0.1 million files block pruning can take only 1 second.
+      // Doing multi-thread for smaller values is not recommended as
+      // driver should have minimum threads opened to support multiple concurrent queries.
+      return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
+    }
+    // handle by multi-thread
+    return pruneWithFilterMultiThread(segments, filterExp, partitions, blocklets, dataMaps,
+        totalFiles);
+  }
+
+  private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
+      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) throws IOException {
+    for (Segment segment : segments) {
+      List<Blocklet> allBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
+      blocklets.addAll(
+          addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment),
+              segment.toString()));
+    }
+    return blocklets;
+  }
+
+  private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
+      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
+      List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> dataMaps) throws IOException {
     for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
-      // if filter is not passed then return all the blocklets
-      if (filterExp == null) {
-        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
-      } else {
-        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
-        for (DataMap dataMap : dataMaps.get(segment)) {
-          pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+      SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
+      for (DataMap dataMap : dataMaps.get(segment)) {
+        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
+      }
+      blocklets.addAll(
+          addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
+              segment.toString()));
+    }
+    return blocklets;
+  }
+
+  private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments,
+      final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
+      List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
+      int totalFiles) {
+    int numOfThreadsForPruning = getNumOfThreadsForPruning();
+    int filesPerEachThread = totalFiles / numOfThreadsForPruning;
+    int prev;
+    int filesCount = 0;
+    int processedFileCount = 0;
+    List<List<SegmentDataMapGroup>> segmentList = new ArrayList<>(numOfThreadsForPruning);
+    List<SegmentDataMapGroup> segmentDataMapGroupList = new ArrayList<>();
+    for (Segment segment : segments) {
+      List<DataMap> eachSegmentDataMapList = dataMaps.get(segment);
+      prev = 0;
+      for (int i = 0; i < eachSegmentDataMapList.size(); i++) {
+        DataMap dataMap = eachSegmentDataMapList.get(i);
+        filesCount += ((BlockDataMap) dataMap).getTotalBlocks();
+        if (filesCount >= filesPerEachThread) {
+          if (segmentList.size() != numOfThreadsForPruning - 1) {
+            // not the last segmentList
+            segmentDataMapGroupList.add(new SegmentDataMapGroup(segment, prev, i));
+            // save the last value to process in next thread
+            prev = i + 1;
+            segmentList.add(segmentDataMapGroupList);
+            segmentDataMapGroupList = new ArrayList<>();
+            processedFileCount += filesCount;
+            filesCount = 0;
+          } else {
+            // add remaining in the end
+            processedFileCount += filesCount;
+            filesCount = 0;
+          }
         }
       }
-      blocklets.addAll(addSegmentId(
-          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
-          segment.toString()));
+      if (prev == 0 || prev != eachSegmentDataMapList.size()) {
+        // if prev == 0. Add a segment's all datamaps
+        // eachSegmentDataMapList.size() != prev, adding the last remaining datamaps of this segment
+        segmentDataMapGroupList
+            .add(new SegmentDataMapGroup(segment, prev, eachSegmentDataMapList.size() - 1));
+      }
+    }
+    // adding the last segmentList data
+    segmentList.add(segmentDataMapGroupList);
+    processedFileCount += filesCount;
+    if (processedFileCount != totalFiles) {
+      // this should not happen
+      throw new RuntimeException(" not all the files processed ");
+    }
+    List<Future<Void>> results = new ArrayList<>(numOfThreadsForPruning);
+    final Map<Segment, List<Blocklet>> prunedBlockletMap = new ConcurrentHashMap<>(segments.size());
+    final ExecutorService executorService = Executors.newFixedThreadPool(numOfThreadsForPruning);
+    final String threadName = Thread.currentThread().getName();
+    for (int i = 0; i < numOfThreadsForPruning; i++) {
+      final List<SegmentDataMapGroup> segmentDataMapGroups = segmentList.get(i);
+      results.add(executorService.submit(new Callable<Void>() {
+        @Override public Void call() throws IOException {
+          Thread.currentThread().setName(threadName);
+          for (SegmentDataMapGroup segmentDataMapGroup : segmentDataMapGroups) {
+            List<Blocklet> pruneBlocklets = new ArrayList<>();
+            List<DataMap> dataMapList = dataMaps.get(segmentDataMapGroup.getSegment());
+            for (int i = segmentDataMapGroup.getFromIndex();
+                 i <= segmentDataMapGroup.getToIndex(); i++) {
+              pruneBlocklets.addAll(dataMapList.get(i).prune(filterExp,
+                  segmentPropertiesFetcher.getSegmentProperties(segmentDataMapGroup.getSegment()),
+                  partitions));
+            }
+            synchronized (prunedBlockletMap) {
+              List<Blocklet> pruneBlockletsExisting =
+                  prunedBlockletMap.get(segmentDataMapGroup.getSegment());
+              if (pruneBlockletsExisting != null) {
+                pruneBlockletsExisting.addAll(pruneBlocklets);
+              } else {
+                prunedBlockletMap.put(segmentDataMapGroup.getSegment(), pruneBlocklets);
+              }
+            }
+          }
+          return null;
+        }
+      }));
+    }
+    executorService.shutdown();
+    try {
+      executorService.awaitTermination(2, TimeUnit.HOURS);
+    } catch (InterruptedException e) {
+      LOG.error("Error in pruning datamap in multi-thread: " + e.getMessage());
+    }
+    // check for error
+    for (Future<Void> result : results) {
+      try {
+        result.get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    for (Map.Entry<Segment, List<Blocklet>> entry : prunedBlockletMap.entrySet()) {
+      try {
+        blocklets.addAll(addSegmentId(
+            blockletDetailsFetcher.getExtendedBlocklets(entry.getValue(), entry.getKey()),
+            entry.getKey().toString()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
     return blocklets;
   }
 
+  private int getNumOfThreadsForPruning() {
+    int numOfThreadsForPruning = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING,
+            CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT));
+    if (numOfThreadsForPruning > Integer
+        .parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT)
+        || numOfThreadsForPruning < 1) {
+      LOG.info("Invalid value for carbon.max.driver.threads.for.block.pruning, value :"
+          + numOfThreadsForPruning + " .using the default threads : "
+          + CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+      numOfThreadsForPruning = Integer
+          .parseInt(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT);
+    }
+    return numOfThreadsForPruning;
+  }
+
   private List<ExtendedBlocklet> addSegmentId(List<ExtendedBlocklet> pruneBlocklets,
       String segmentId) {
     for (ExtendedBlocklet blocklet : pruneBlocklets) {
@@ -150,7 +330,6 @@ public final class TableDataMap extends OperationEventListener {
     }
     return pruneBlocklets;
   }
-
   /**
    * This is used for making the datamap distributable.
    * It takes the valid segments and returns all the datamaps as distributable objects so that

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 67405f4..6ba0507 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -629,7 +629,7 @@ public class BlockDataMap extends CoarseGrainDataMap
   }
 
   // get total block number in this datamap
-  protected int getTotalBlocks() {
+  public int getTotalBlocks() {
     if (isLegacyStore) {
       // dummy value
       return 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 390e92f..2d8b082 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -280,8 +280,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
     }
   }
 
-  @Override
-  protected int getTotalBlocks() {
+  @Override public int getTotalBlocks() {
     if (isLegacyStore) {
       return super.getTotalBlocklets();
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/core/src/main/java/org/apache/carbondata/core/profiler/TablePruningInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/profiler/TablePruningInfo.java b/core/src/main/java/org/apache/carbondata/core/profiler/TablePruningInfo.java
index f7d6af5..826329a 100644
--- a/core/src/main/java/org/apache/carbondata/core/profiler/TablePruningInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/profiler/TablePruningInfo.java
@@ -42,11 +42,11 @@ public class TablePruningInfo {
   private int numBlocksAfterFGPruning;
   private int numBlockletsAfterFGPruning;
 
-  void addTotalBlocks(int numBlocks) {
+  synchronized void addTotalBlocks(int numBlocks) {
     this.totalBlocks += numBlocks;
   }
 
-  void addTotalBlocklets(int numBlocklets) {
+  synchronized void addTotalBlocklets(int numBlocklets) {
     this.totalBlocklets += numBlocklets;
   }
 
@@ -67,7 +67,7 @@ public class TablePruningInfo {
    * we accumulate blocklet number in default datamap instead of setting it
    * in CarbonInputFormat
    */
-  void addNumBlockletsAfterDefaultPruning(int numBlocklets) {
+  synchronized void addNumBlockletsAfterDefaultPruning(int numBlocklets) {
     this.numBlockletsAfterDefaultPruning += numBlocklets;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index e9e5978..c82d5d7 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -136,6 +136,7 @@ This section provides the details of all the configurations required for the Car
 | enable.query.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional query statistics information to more accurately locate the issues being debugged.**NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all query performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |
 | enable.unsafe.in.query.processing | false | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData while scanning the  data during query. |
 | carbon.query.validate.direct.query.on.datamap | true | CarbonData supports creating pre-aggregate table datamaps as an independent tables. For some debugging purposes, it might be required to directly query from such datamap tables. This configuration allows to query on such datamaps. |
+| carbon.max.driver.threads.for.block.pruning | 4 | Number of threads used for driver pruning when the carbon files are more than 100k Maximum memory. This configuration can used to set number of threads between 1 to 4. |
 | carbon.heap.memory.pooling.threshold.bytes | 1048576 | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. Using unsafe, memory can be allocated on Java Heap or off heap. This configuration controls the allocation mechanism on Java HEAP. If the heap memory allocations of the given size is greater or equal than this value,it should go through the pooling mechanism. But if set this size to -1, it should not go through the pooling mechanism. Default value is 1048576(1MB, the same as Spark). Value to be specified in bytes. |
 
 ## Data Mutation Configuration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf539281/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index ed82e13..843f564 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -487,6 +487,8 @@ m filterExpression
     // First prune using default datamap on driver side.
     TableDataMap defaultDataMap = DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable);
     List<ExtendedBlocklet> prunedBlocklets = null;
+    // This is to log the event, so user will know what is happening by seeing logs.
+    LOG.info("Started block pruning ...");
     if (carbonTable.isTransactionalTable()) {
       prunedBlocklets = defaultDataMap.prune(segmentIds, resolver, partitionsToPrune);
     } else {
@@ -542,6 +544,7 @@ m filterExpression
             prunedBlocklets.size(), getBlockCount(prunedBlocklets));
       }
     }
+    LOG.info("Finished block pruning ...");
     return prunedBlocklets;
   }