You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 11:05:58 UTC

carbondata git commit: [CARBONDATA-1867] Add support for task/segment level pruning

Repository: carbondata
Updated Branches:
  refs/heads/master fef3384b8 -> 2bad144a2


[CARBONDATA-1867] Add support for task/segment level pruning

Added support for task/segment level pruning. Added code to compute task level min/max which can be helpful for task/segment level pruning

This closes #1624


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2bad144a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2bad144a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2bad144a

Branch: refs/heads/master
Commit: 2bad144a2f13bcdf5518490c8c08bad25d9ea84d
Parents: fef3384
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Dec 6 17:11:51 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 7 16:35:44 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  26 +++
 .../carbondata/core/datamap/dev/DataMap.java    |   7 +
 .../blockletindex/BlockletDataMap.java          | 169 ++++++++++++++++---
 .../scan/filter/FilterExpressionProcessor.java  |  13 ++
 .../core/scan/filter/SingleTableProvider.java   |   3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   6 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 7 files changed, 195 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 705a9fd..1d5c978 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
@@ -165,4 +166,29 @@ public final class TableDataMap implements OperationEventListener {
   @Override public void onEvent(Event event, OperationContext opContext) {
     dataMapFactory.fireEvent(event);
   }
+
+  /**
+   * Method to prune the segments based on task min/max values
+   *
+   * @param segmentIds
+   * @param filterExp
+   * @return
+   * @throws IOException
+   */
+  public List<String> pruneSegments(List<String> segmentIds, FilterResolverIntf filterExp)
+      throws IOException {
+    List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (String segmentId : segmentIds) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      for (DataMap dataMap : dataMaps) {
+        if (dataMap.isScanRequired(filterExp)) {
+          // If any one task in a given segment contains the data that means the segment need to
+          // be scanned and we need to validate further data maps in the same segment
+          prunedSegments.add(segmentId);
+          break;
+        }
+      }
+    }
+    return prunedSegments;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index ada23ad..e7c30a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -42,6 +42,13 @@ public interface DataMap {
    */
   List<Blocklet> prune(FilterResolverIntf filterExp);
 
+  /**
+   * Validate whether the current segment needs to be fetching the required data
+   *
+   * @param filterExp
+   * @return
+   */
+  boolean isScanRequired(FilterResolverIntf filterExp);
 
   /**
    * Clear complete index table and release memory.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index fd514ea..1dc9b7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -55,10 +55,12 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -91,8 +93,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private static int BLOCK_INFO_INDEX = 8;
 
+  private static int TASK_MIN_VALUES_INDEX = 0;
+
+  private static int TASK_MAX_VALUES_INDEX = 1;
+
   private UnsafeMemoryDMStore unsafeMemoryDMStore;
 
+  private UnsafeMemoryDMStore unsafeMemoryTaskMinMaxDMStore;
+
   private SegmentProperties segmentProperties;
 
   private int[] columnCardinality;
@@ -111,6 +119,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
         createSchema(segmentProperties);
+        createTaskMinMaxSchema(segmentProperties);
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) {
@@ -124,6 +133,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     if (unsafeMemoryDMStore != null) {
       unsafeMemoryDMStore.finishWriting();
     }
+    if (null != unsafeMemoryTaskMinMaxDMStore) {
+      unsafeMemoryTaskMinMaxDMStore.finishWriting();
+    }
     LOGGER.info(
         "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
             System.currentTimeMillis() - startTime));
@@ -134,27 +146,41 @@ public class BlockletDataMap implements DataMap, Cacheable {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
     CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+    DataMapRow taskMinMaxRow = null;
+    // Add one row to maintain task level min max for segment pruning
+    if (!blockletList.isEmpty()) {
+      taskMinMaxRow = new DataMapRowImpl(unsafeMemoryTaskMinMaxDMStore.getSchema());
+    }
     for (int index = 0; index < blockletList.size(); index++) {
       DataMapRow row = new DataMapRowImpl(schema);
       int ordinal = 0;
+      int taskMinMaxOrdinal = 0;
       BlockletInfo blockletInfo = blockletList.get(index);
 
       // add start key as index key
       row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
 
       BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
-      row.setRow(addMinMax(minMaxLen, schema[ordinal],
-          updateMinValues(minMaxIndex.getMinValues(), minMaxLen)), ordinal);
+      byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+      // compute and set task level min values
+      addTaskMinMaxValues(taskMinMaxRow, minMaxLen,
+          unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+          TASK_MIN_VALUES_INDEX, true);
       ordinal++;
-      row.setRow(addMinMax(minMaxLen, schema[ordinal],
-          updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen)), ordinal);
+      taskMinMaxOrdinal++;
+      byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
+      row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+      // compute and set task level max values
+      addTaskMinMaxValues(taskMinMaxRow, minMaxLen,
+          unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+          TASK_MAX_VALUES_INDEX, false);
       ordinal++;
 
       row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
 
       // add file path
-      byte[] filePathBytes =
-          filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+      byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
       row.setByteArray(filePathBytes, ordinal++);
 
       // add pages
@@ -179,6 +205,18 @@ public class BlockletDataMap implements DataMap, Cacheable {
         throw new RuntimeException(e);
       }
     }
+    // write the task level min/max row to unsafe memory store
+    if (null != taskMinMaxRow) {
+      addTaskMinMaxRowToUnsafeMemoryStore(taskMinMaxRow);
+    }
+  }
+
+  private void addTaskMinMaxRowToUnsafeMemoryStore(DataMapRow taskMinMaxRow) {
+    try {
+      unsafeMemoryTaskMinMaxDMStore.addIndexRowToUnsafe(taskMinMaxRow);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -270,28 +308,54 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return minRow;
   }
 
+  /**
+   * This method will compute min/max values at task level
+   *
+   * @param taskMinMaxRow
+   * @param minMaxLen
+   * @param carbonRowSchema
+   * @param minMaxValue
+   * @param ordinal
+   * @param isMinValueComparison
+   */
+  private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
+      CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
+      boolean isMinValueComparison) {
+    DataMapRow row = taskMinMaxRow.getRow(ordinal);
+    byte[][] updatedMinMaxValues = minMaxValue;
+    if (null == row) {
+      CarbonRowSchema[] minSchemas =
+          ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
+      row = new DataMapRowImpl(minSchemas);
+    } else {
+      byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
+      // Compare and update min max values
+      for (int i = 0; i < minMaxLen.length; i++) {
+        int compare =
+            ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]);
+        if (isMinValueComparison) {
+          if (compare < 0) {
+            updatedMinMaxValues[i] = existingMinMaxValues[i];
+          }
+        } else if (compare > 0) {
+          updatedMinMaxValues[i] = existingMinMaxValues[i];
+        }
+      }
+    }
+    int minMaxOrdinal = 0;
+    // min/max value adding
+    for (int i = 0; i < minMaxLen.length; i++) {
+      row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
+    }
+    taskMinMaxRow.setRow(row, ordinal);
+  }
+
   private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
     List<CarbonRowSchema> indexSchemas = new ArrayList<>();
 
     // Index key
     indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
-    int[] minMaxLen = segmentProperties.getColumnsValueSize();
-    // do it 2 times, one for min and one for max.
-    for (int k = 0; k < 2; k++) {
-      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
-      for (int i = 0; i < minMaxLen.length; i++) {
-        if (minMaxLen[i] <= 0) {
-          mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
-        } else {
-          mapSchemas[i] =
-              new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
-        }
-      }
-      CarbonRowSchema mapSchema =
-          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
-              mapSchemas);
-      indexSchemas.add(mapSchema);
-    }
+    getMinMaxSchema(segmentProperties, indexSchemas);
 
     // for number of rows.
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
@@ -315,6 +379,51 @@ public class BlockletDataMap implements DataMap, Cacheable {
         new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
   }
 
+  private void createTaskMinMaxSchema(SegmentProperties segmentProperties) throws MemoryException {
+    List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2);
+    getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
+    unsafeMemoryTaskMinMaxDMStore = new UnsafeMemoryDMStore(
+        taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+  }
+
+  private void getMinMaxSchema(SegmentProperties segmentProperties,
+      List<CarbonRowSchema> minMaxSchemas) {
+    // Index key
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
+    // do it 2 times, one for min and one for max.
+    for (int k = 0; k < 2; k++) {
+      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
+      for (int i = 0; i < minMaxLen.length; i++) {
+        if (minMaxLen[i] <= 0) {
+          mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+        } else {
+          mapSchemas[i] =
+              new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
+        }
+      }
+      CarbonRowSchema mapSchema =
+          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+              mapSchemas);
+      minMaxSchemas.add(mapSchema);
+    }
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    FilterExecuter filterExecuter =
+        FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+    for (int i = 0; i < unsafeMemoryTaskMinMaxDMStore.getRowCount(); i++) {
+      DataMapRow unsafeRow = unsafeMemoryTaskMinMaxDMStore.getUnsafeRow(i);
+      boolean isScanRequired = FilterExpressionProcessor
+          .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+              getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+      if (isScanRequired) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp) {
 
@@ -562,6 +671,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
       unsafeMemoryDMStore = null;
       segmentProperties = null;
     }
+    // clear task min/max unsafe memory
+    if (null != unsafeMemoryTaskMinMaxDMStore) {
+      unsafeMemoryTaskMinMaxDMStore.freeMemory();
+      unsafeMemoryTaskMinMaxDMStore = null;
+    }
   }
 
   @Override
@@ -576,11 +690,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public long getMemorySize() {
+    long memoryUsed = 0L;
     if (unsafeMemoryDMStore != null) {
-      return unsafeMemoryDMStore.getMemoryUsed();
-    } else {
-      return 0;
+      memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+    }
+    if (null != unsafeMemoryTaskMinMaxDMStore) {
+      memoryUsed += unsafeMemoryTaskMinMaxDMStore.getMemoryUsed();
     }
+    return memoryUsed;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index e77d58e..6c804d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.partition.AndFilterImpl;
 import org.apache.carbondata.core.scan.filter.partition.EqualToFilterImpl;
@@ -529,4 +530,16 @@ public class FilterExpressionProcessor implements FilterProcessor {
     }
     return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
   }
+
+  public static boolean isScanRequired(FilterExecuter filterExecuter, byte[][] maxValue,
+      byte[][] minValue) {
+    if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
+      return ((ImplicitColumnFilterExecutor) filterExecuter)
+          .isFilterValuesPresentInAbstractIndex(maxValue, minValue);
+    } else {
+      // otherwise decide based on min/max value
+      BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue);
+      return !bitSet.isEmpty();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
index 88fc8a6..d72e798 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
@@ -31,7 +31,8 @@ public class SingleTableProvider implements TableProvider {
 
   @Override public CarbonTable getCarbonTable(CarbonTableIdentifier carbonTableIdentifier)
       throws IOException {
-    if (carbonTable.getCarbonTableIdentifier().equals(carbonTableIdentifier)) {
+    if (carbonTable.getCarbonTableIdentifier().getTableUniqueName()
+        .equals(carbonTableIdentifier.getTableUniqueName())) {
       return carbonTable;
     } else {
       throw new IOException("Carbon table does not exist with identifier " + carbonTableIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e4e9ceb..15d1304 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -159,7 +159,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+  public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
     if (carbonTable == null) {
       // carbon table should be created either from deserialized table info (schema saved in
       // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
@@ -303,7 +303,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
   }
 
-  private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
     String tablePath = configuration.get(INPUT_DIR, "");
     try {
@@ -887,7 +887,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * return valid segment to access
    */
-  private String[] getSegmentsToAccess(JobContext job) {
+  public String[] getSegmentsToAccess(JobContext job) {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
     if (segmentString.trim().isEmpty()) {
       return new String[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 1fa838c..d599c22 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -328,7 +328,7 @@ class CarbonScanRDD(
     iterator.asInstanceOf[Iterator[InternalRow]]
   }
 
-  private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+  def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
     CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
     CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)