You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 11:05:58 UTC
carbondata git commit: [CARBONDATA-1867] Add support for task/segment
level pruning
Repository: carbondata
Updated Branches:
refs/heads/master fef3384b8 -> 2bad144a2
[CARBONDATA-1867] Add support for task/segment level pruning
Added support for task/segment level pruning. Added code to compute task level min/max which can be helpful for task/segment level pruning
This closes #1624
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2bad144a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2bad144a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2bad144a
Branch: refs/heads/master
Commit: 2bad144a2f13bcdf5518490c8c08bad25d9ea84d
Parents: fef3384
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Dec 6 17:11:51 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 7 16:35:44 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/datamap/TableDataMap.java | 26 +++
.../carbondata/core/datamap/dev/DataMap.java | 7 +
.../blockletindex/BlockletDataMap.java | 169 ++++++++++++++++---
.../scan/filter/FilterExpressionProcessor.java | 13 ++
.../core/scan/filter/SingleTableProvider.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 6 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
7 files changed, 195 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 705a9fd..1d5c978 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.indexstore.Blocklet;
@@ -165,4 +166,29 @@ public final class TableDataMap implements OperationEventListener {
@Override public void onEvent(Event event, OperationContext opContext) {
dataMapFactory.fireEvent(event);
}
+
+ /**
+ * Method to prune the segments based on task min/max values
+ *
+ * @param segmentIds
+ * @param filterExp
+ * @return
+ * @throws IOException
+ */
+ public List<String> pruneSegments(List<String> segmentIds, FilterResolverIntf filterExp)
+ throws IOException {
+ List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (String segmentId : segmentIds) {
+ List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+ for (DataMap dataMap : dataMaps) {
+ if (dataMap.isScanRequired(filterExp)) {
+ // If any one task in a given segment contains the data that means the segment need to
+ // be scanned and we need to validate further data maps in the same segment
+ prunedSegments.add(segmentId);
+ break;
+ }
+ }
+ }
+ return prunedSegments;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index ada23ad..e7c30a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -42,6 +42,13 @@ public interface DataMap {
*/
List<Blocklet> prune(FilterResolverIntf filterExp);
+ /**
+ * Validate whether the current segment needs to be fetching the required data
+ *
+ * @param filterExp
+ * @return
+ */
+ boolean isScanRequired(FilterResolverIntf filterExp);
/**
* Clear complete index table and release memory.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index fd514ea..1dc9b7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -55,10 +55,12 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -91,8 +93,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
private static int BLOCK_INFO_INDEX = 8;
+ private static int TASK_MIN_VALUES_INDEX = 0;
+
+ private static int TASK_MAX_VALUES_INDEX = 1;
+
private UnsafeMemoryDMStore unsafeMemoryDMStore;
+ private UnsafeMemoryDMStore unsafeMemoryTaskMinMaxDMStore;
+
private SegmentProperties segmentProperties;
private int[] columnCardinality;
@@ -111,6 +119,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
createSchema(segmentProperties);
+ createTaskMinMaxSchema(segmentProperties);
}
TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) {
@@ -124,6 +133,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
if (unsafeMemoryDMStore != null) {
unsafeMemoryDMStore.finishWriting();
}
+ if (null != unsafeMemoryTaskMinMaxDMStore) {
+ unsafeMemoryTaskMinMaxDMStore.finishWriting();
+ }
LOGGER.info(
"Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + (
System.currentTimeMillis() - startTime));
@@ -134,27 +146,41 @@ public class BlockletDataMap implements DataMap, Cacheable {
int[] minMaxLen = segmentProperties.getColumnsValueSize();
List<BlockletInfo> blockletList = fileFooter.getBlockletList();
CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
+ DataMapRow taskMinMaxRow = null;
+ // Add one row to maintain task level min max for segment pruning
+ if (!blockletList.isEmpty()) {
+ taskMinMaxRow = new DataMapRowImpl(unsafeMemoryTaskMinMaxDMStore.getSchema());
+ }
for (int index = 0; index < blockletList.size(); index++) {
DataMapRow row = new DataMapRowImpl(schema);
int ordinal = 0;
+ int taskMinMaxOrdinal = 0;
BlockletInfo blockletInfo = blockletList.get(index);
// add start key as index key
row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++);
BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
- row.setRow(addMinMax(minMaxLen, schema[ordinal],
- updateMinValues(minMaxIndex.getMinValues(), minMaxLen)), ordinal);
+ byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal);
+ // compute and set task level min values
+ addTaskMinMaxValues(taskMinMaxRow, minMaxLen,
+ unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], minValues,
+ TASK_MIN_VALUES_INDEX, true);
ordinal++;
- row.setRow(addMinMax(minMaxLen, schema[ordinal],
- updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen)), ordinal);
+ taskMinMaxOrdinal++;
+ byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen);
+ row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal);
+ // compute and set task level max values
+ addTaskMinMaxValues(taskMinMaxRow, minMaxLen,
+ unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], maxValues,
+ TASK_MAX_VALUES_INDEX, false);
ordinal++;
row.setInt(blockletInfo.getNumberOfRows(), ordinal++);
// add file path
- byte[] filePathBytes =
- filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
row.setByteArray(filePathBytes, ordinal++);
// add pages
@@ -179,6 +205,18 @@ public class BlockletDataMap implements DataMap, Cacheable {
throw new RuntimeException(e);
}
}
+ // write the task level min/max row to unsafe memory store
+ if (null != taskMinMaxRow) {
+ addTaskMinMaxRowToUnsafeMemoryStore(taskMinMaxRow);
+ }
+ }
+
+ private void addTaskMinMaxRowToUnsafeMemoryStore(DataMapRow taskMinMaxRow) {
+ try {
+ unsafeMemoryTaskMinMaxDMStore.addIndexRowToUnsafe(taskMinMaxRow);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -270,28 +308,54 @@ public class BlockletDataMap implements DataMap, Cacheable {
return minRow;
}
+ /**
+ * This method will compute min/max values at task level
+ *
+ * @param taskMinMaxRow
+ * @param minMaxLen
+ * @param carbonRowSchema
+ * @param minMaxValue
+ * @param ordinal
+ * @param isMinValueComparison
+ */
+ private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen,
+ CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal,
+ boolean isMinValueComparison) {
+ DataMapRow row = taskMinMaxRow.getRow(ordinal);
+ byte[][] updatedMinMaxValues = minMaxValue;
+ if (null == row) {
+ CarbonRowSchema[] minSchemas =
+ ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas();
+ row = new DataMapRowImpl(minSchemas);
+ } else {
+ byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal);
+ // Compare and update min max values
+ for (int i = 0; i < minMaxLen.length; i++) {
+ int compare =
+ ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]);
+ if (isMinValueComparison) {
+ if (compare < 0) {
+ updatedMinMaxValues[i] = existingMinMaxValues[i];
+ }
+ } else if (compare > 0) {
+ updatedMinMaxValues[i] = existingMinMaxValues[i];
+ }
+ }
+ }
+ int minMaxOrdinal = 0;
+ // min/max value adding
+ for (int i = 0; i < minMaxLen.length; i++) {
+ row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++);
+ }
+ taskMinMaxRow.setRow(row, ordinal);
+ }
+
private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
List<CarbonRowSchema> indexSchemas = new ArrayList<>();
// Index key
indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
- int[] minMaxLen = segmentProperties.getColumnsValueSize();
- // do it 2 times, one for min and one for max.
- for (int k = 0; k < 2; k++) {
- CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
- for (int i = 0; i < minMaxLen.length; i++) {
- if (minMaxLen[i] <= 0) {
- mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
- } else {
- mapSchemas[i] =
- new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
- }
- }
- CarbonRowSchema mapSchema =
- new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
- mapSchemas);
- indexSchemas.add(mapSchema);
- }
+ getMinMaxSchema(segmentProperties, indexSchemas);
// for number of rows.
indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT));
@@ -315,6 +379,51 @@ public class BlockletDataMap implements DataMap, Cacheable {
new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
}
+ private void createTaskMinMaxSchema(SegmentProperties segmentProperties) throws MemoryException {
+ List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2);
+ getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
+ unsafeMemoryTaskMinMaxDMStore = new UnsafeMemoryDMStore(
+ taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
+ }
+
+ private void getMinMaxSchema(SegmentProperties segmentProperties,
+ List<CarbonRowSchema> minMaxSchemas) {
+ // Index key
+ int[] minMaxLen = segmentProperties.getColumnsValueSize();
+ // do it 2 times, one for min and one for max.
+ for (int k = 0; k < 2; k++) {
+ CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
+ for (int i = 0; i < minMaxLen.length; i++) {
+ if (minMaxLen[i] <= 0) {
+ mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+ } else {
+ mapSchemas[i] =
+ new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);
+ }
+ }
+ CarbonRowSchema mapSchema =
+ new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
+ mapSchemas);
+ minMaxSchemas.add(mapSchema);
+ }
+ }
+
+ @Override
+ public boolean isScanRequired(FilterResolverIntf filterExp) {
+ FilterExecuter filterExecuter =
+ FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+ for (int i = 0; i < unsafeMemoryTaskMinMaxDMStore.getRowCount(); i++) {
+ DataMapRow unsafeRow = unsafeMemoryTaskMinMaxDMStore.getUnsafeRow(i);
+ boolean isScanRequired = FilterExpressionProcessor
+ .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX),
+ getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX));
+ if (isScanRequired) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp) {
@@ -562,6 +671,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
unsafeMemoryDMStore = null;
segmentProperties = null;
}
+ // clear task min/max unsafe memory
+ if (null != unsafeMemoryTaskMinMaxDMStore) {
+ unsafeMemoryTaskMinMaxDMStore.freeMemory();
+ unsafeMemoryTaskMinMaxDMStore = null;
+ }
}
@Override
@@ -576,11 +690,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
@Override
public long getMemorySize() {
+ long memoryUsed = 0L;
if (unsafeMemoryDMStore != null) {
- return unsafeMemoryDMStore.getMemoryUsed();
- } else {
- return 0;
+ memoryUsed += unsafeMemoryDMStore.getMemoryUsed();
+ }
+ if (null != unsafeMemoryTaskMinMaxDMStore) {
+ memoryUsed += unsafeMemoryTaskMinMaxDMStore.getMemoryUsed();
}
+ return memoryUsed;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index e77d58e..6c804d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression
import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.scan.filter.partition.AndFilterImpl;
import org.apache.carbondata.core.scan.filter.partition.EqualToFilterImpl;
@@ -529,4 +530,16 @@ public class FilterExpressionProcessor implements FilterProcessor {
}
return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
}
+
+ public static boolean isScanRequired(FilterExecuter filterExecuter, byte[][] maxValue,
+ byte[][] minValue) {
+ if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
+ return ((ImplicitColumnFilterExecutor) filterExecuter)
+ .isFilterValuesPresentInAbstractIndex(maxValue, minValue);
+ } else {
+ // otherwise decide based on min/max value
+ BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue);
+ return !bitSet.isEmpty();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
index 88fc8a6..d72e798 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java
@@ -31,7 +31,8 @@ public class SingleTableProvider implements TableProvider {
@Override public CarbonTable getCarbonTable(CarbonTableIdentifier carbonTableIdentifier)
throws IOException {
- if (carbonTable.getCarbonTableIdentifier().equals(carbonTableIdentifier)) {
+ if (carbonTable.getCarbonTableIdentifier().getTableUniqueName()
+ .equals(carbonTableIdentifier.getTableUniqueName())) {
return carbonTable;
} else {
throw new IOException("Carbon table does not exist with identifier " + carbonTableIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e4e9ceb..15d1304 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -159,7 +159,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
@@ -303,7 +303,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
}
- private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
throws IOException {
String tablePath = configuration.get(INPUT_DIR, "");
try {
@@ -887,7 +887,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
/**
* return valid segment to access
*/
- private String[] getSegmentsToAccess(JobContext job) {
+ public String[] getSegmentsToAccess(JobContext job) {
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
if (segmentString.trim().isEmpty()) {
return new String[0];
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 1fa838c..d599c22 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -328,7 +328,7 @@ class CarbonScanRDD(
iterator.asInstanceOf[Iterator[InternalRow]]
}
- private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
+ def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
CarbonTableInputFormat.setTableInfo(conf, tableInfo)
CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)