You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/24 05:38:43 UTC
[2/5] incubator-carbondata git commit: WIP Added code for new V3
format to optimize scan
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 9a8b254..ae9ba8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -53,11 +55,18 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
BitSet bitSet = new BitSet(1);
byte[][] filterValues = this.filterRangeValues;
int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+ boolean isScanRequired = isScanRequired(blockMinValue[columnIndex], filterValues);
+ if (isScanRequired) {
+ bitSet.set(0);
+ }
+ return bitSet;
+ }
+
+ private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues) {
boolean isScanRequired = false;
for (int k = 0; k < filterValues.length; k++) {
// and filter-min should be positive
- int minCompare =
- ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
+ int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue);
// if any filter applied is not in range of min and max of block
// then since its a less than fiter validate whether the block
@@ -67,26 +76,45 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
break;
}
}
- if (isScanRequired) {
- bitSet.set(0);
- }
- return bitSet;
-
+ return isScanRequired;
}
- @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+ @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder)
throws FilterUnsupportedException, IOException {
if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
return super.applyFilter(blockChunkHolder);
}
int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
.get(dimColEvaluatorInfoList.get(0).getColumnIndex());
- if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
- blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+ if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
.getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
}
- return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
- blockChunkHolder.getDataBlock().nodeSize());
+ DimensionRawColumnChunk rawColumnChunk =
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+ BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+ for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+ if (rawColumnChunk.getMinValues() != null) {
+ if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
+ int compare = ByteUtil.UnsafeComparer.INSTANCE
+ .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
+ if (compare > 0) {
+ BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
+ bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
+ } else {
+ BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+ rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
+ }
+ }
+ } else {
+ BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+ rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
+ }
+ }
+ return bitSetGroup;
}
private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -156,7 +184,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
start = CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[i]);
if (start < 0) {
start = -(start + 1);
- if (start == numerOfRows) {
+ if (start >= numerOfRows) {
start = start - 1;
}
// Method will compare the tentative index value after binary search, this tentative
@@ -250,4 +278,16 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
}
return bitSet;
}
+
+ @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+ if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+ super.readBlocks(blockChunkHolder);
+ }
+ int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+ .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+ if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+ .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index 8646301..af5568e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -91,12 +91,19 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
*
* @return start IndexKey
*/
- public void getStartKey(long[] startKey,
- SortedMap<Integer, byte[]> noDictStartKeys, List<long[]> startKeyList) {
- FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
- startKey, startKeyList);
- FilterUtil
- .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDictStartKeys);
+ public void getStartKey(long[] startKey, SortedMap<Integer, byte[]> noDictStartKeys,
+ List<long[]> startKeyList) {
+ switch (exp.getFilterExpressionType()) {
+ case GREATERTHAN:
+ case GREATERTHAN_EQUALTO:
+ FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
+ startKey, startKeyList);
+ FilterUtil
+ .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDictStartKeys);
+ break;
+ default:
+ //do nothing
+ }
}
/**
@@ -106,10 +113,17 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
*/
@Override public void getEndKey(SegmentProperties segmentProperties, long[] endKeys,
SortedMap<Integer, byte[]> noDicEndKeys, List<long[]> endKeyList) {
- FilterUtil.getEndKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(),
- endKeys, segmentProperties, endKeyList);
- FilterUtil
- .getEndKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDicEndKeys);
+ switch (exp.getFilterExpressionType()) {
+ case LESSTHAN:
+ case LESSTHAN_EQUALTO:
+ FilterUtil
+ .getEndKey(dimColEvaluatorInfoList.get(0).getDimensionResolvedFilterInstance(), endKeys,
+ segmentProperties, endKeyList);
+ FilterUtil.getEndKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), noDicEndKeys);
+ break;
+ default:
+ //do nothing
+ }
}
private List<byte[]> getNoDictionaryRangeValues() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index 6b0a458..f0cebf4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -18,6 +18,10 @@ package org.apache.carbondata.core.scan.processor;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
@@ -29,7 +33,6 @@ import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultColle
import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector;
import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.result.AbstractScannedResult;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.scan.scanner.BlockletScanner;
@@ -63,23 +66,33 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
protected BlockletScanner blockletScanner;
/**
- * to hold the data block
- */
- protected BlocksChunkHolder blocksChunkHolder;
-
- /**
* batch size of result
*/
protected int batchSize;
+ protected ExecutorService executorService;
+
+ private Future<AbstractScannedResult> future;
+
+ private Future<BlocksChunkHolder> futureIo;
+
protected AbstractScannedResult scannedResult;
- public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
- int batchSize, QueryStatisticsModel queryStatisticsModel,
- BlocksChunkHolder blockChunkHolder) {
+ private BlockExecutionInfo blockExecutionInfo;
+
+ private FileHolder fileReader;
+
+ private AtomicBoolean nextBlock;
+
+ private AtomicBoolean nextRead;
+
+ public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo,
+ FileHolder fileReader, int batchSize, QueryStatisticsModel queryStatisticsModel,
+ ExecutorService executorService) {
+ this.blockExecutionInfo = blockExecutionInfo;
+ this.fileReader = fileReader;
dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
blockExecutionInfo.getNumberOfBlockToScan());
- blocksChunkHolder = blockChunkHolder;
if (blockExecutionInfo.getFilterExecuterTree() != null) {
blockletScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel);
} else {
@@ -99,13 +112,16 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
new DictionaryBasedResultCollector(blockExecutionInfo);
}
this.batchSize = batchSize;
+ this.executorService = executorService;
+ this.nextBlock = new AtomicBoolean(false);
+ this.nextRead = new AtomicBoolean(false);
}
public boolean hasNext() {
if (scannedResult != null && scannedResult.hasNext()) {
return true;
} else {
- return dataBlockIterator.hasNext();
+ return dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get();
}
}
@@ -121,22 +137,85 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
}
scannedResult = getNextScannedResult();
}
+ nextBlock.set(false);
+ nextRead.set(false);
return false;
}
- } catch (IOException | FilterUnsupportedException ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
- private AbstractScannedResult getNextScannedResult()
- throws IOException, FilterUnsupportedException {
- if (dataBlockIterator.hasNext()) {
- blocksChunkHolder.setDataBlock(dataBlockIterator.next());
- blocksChunkHolder.reset();
- return blockletScanner.scanBlocklet(blocksChunkHolder);
+ private AbstractScannedResult getNextScannedResult() throws Exception {
+ AbstractScannedResult result = null;
+ if (dataBlockIterator.hasNext() || nextBlock.get() || nextRead.get()) {
+ if (future == null) {
+ future = execute();
+ }
+ result = future.get();
+ nextBlock.set(false);
+ if (dataBlockIterator.hasNext() || nextRead.get()) {
+ nextBlock.set(true);
+ future = execute();
+ }
+ }
+ return result;
+ }
+
+ private BlocksChunkHolder getBlocksChunkHolder() throws IOException {
+ BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolderInternal();
+ while (blocksChunkHolder == null && dataBlockIterator.hasNext()) {
+ blocksChunkHolder = getBlocksChunkHolderInternal();
+ }
+ return blocksChunkHolder;
+ }
+
+ private BlocksChunkHolder getBlocksChunkHolderInternal() throws IOException {
+ BlocksChunkHolder blocksChunkHolder =
+ new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
+ blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
+ blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+ if (blockletScanner.isScanRequired(blocksChunkHolder)) {
+ return blocksChunkHolder;
}
return null;
}
+ private Future<AbstractScannedResult> execute() {
+ return executorService.submit(new Callable<AbstractScannedResult>() {
+ @Override public AbstractScannedResult call() throws Exception {
+ if (futureIo == null) {
+ futureIo = executeRead();
+ }
+ BlocksChunkHolder blocksChunkHolder = futureIo.get();
+ futureIo = null;
+ nextRead.set(false);
+ if (blocksChunkHolder != null) {
+ if (dataBlockIterator.hasNext()) {
+ nextRead.set(true);
+ futureIo = executeRead();
+ }
+ return blockletScanner.scanBlocklet(blocksChunkHolder);
+ }
+ return null;
+ }
+ });
+ }
+
+ private Future<BlocksChunkHolder> executeRead() {
+ return executorService.submit(new Callable<BlocksChunkHolder>() {
+ @Override public BlocksChunkHolder call() throws Exception {
+ if (dataBlockIterator.hasNext()) {
+ BlocksChunkHolder blocksChunkHolder = getBlocksChunkHolder();
+ if (blocksChunkHolder != null) {
+ blockletScanner.readBlocklet(blocksChunkHolder);
+ return blocksChunkHolder;
+ }
+ }
+ return null;
+ }
+ });
+ }
+
public abstract void processNextBatch(CarbonColumnarBatch columnarBatch);
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
index 2b1a48e..5227115 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlocksChunkHolder.java
@@ -18,8 +18,8 @@ package org.apache.carbondata.core.scan.processor;
import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
/**
* Block chunk holder which will hold the dimension and
@@ -30,12 +30,12 @@ public class BlocksChunkHolder {
/**
* dimension column data chunk
*/
- private DimensionColumnDataChunk[] dimensionDataChunk;
+ private DimensionRawColumnChunk[] dimensionRawDataChunk;
/**
* measure column data chunk
*/
- private MeasureColumnDataChunk[] measureDataChunk;
+ private MeasureRawColumnChunk[] measureRawDataChunk;
/**
* file reader which will use to read the block from file
@@ -48,36 +48,43 @@ public class BlocksChunkHolder {
private DataRefNode dataBlock;
public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock) {
- dimensionDataChunk = new DimensionColumnDataChunk[numberOfDimensionBlock];
- measureDataChunk = new MeasureColumnDataChunk[numberOfMeasureBlock];
+ dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
+ measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
+ }
+
+ public BlocksChunkHolder(int numberOfDimensionBlock, int numberOfMeasureBlock,
+ FileHolder fileReader) {
+ dimensionRawDataChunk = new DimensionRawColumnChunk[numberOfDimensionBlock];
+ measureRawDataChunk = new MeasureRawColumnChunk[numberOfMeasureBlock];
+ this.fileReader = fileReader;
}
/**
- * @return the dimensionDataChunk
+ * @return the dimensionRawDataChunk
*/
- public DimensionColumnDataChunk[] getDimensionDataChunk() {
- return dimensionDataChunk;
+ public DimensionRawColumnChunk[] getDimensionRawDataChunk() {
+ return dimensionRawDataChunk;
}
/**
- * @param dimensionDataChunk the dimensionDataChunk to set
+ * @param dimensionRawDataChunk the dimensionRawDataChunk to set
*/
- public void setDimensionDataChunk(DimensionColumnDataChunk[] dimensionDataChunk) {
- this.dimensionDataChunk = dimensionDataChunk;
+ public void setDimensionRawDataChunk(DimensionRawColumnChunk[] dimensionRawDataChunk) {
+ this.dimensionRawDataChunk = dimensionRawDataChunk;
}
/**
- * @return the measureDataChunk
+ * @return the measureRawDataChunk
*/
- public MeasureColumnDataChunk[] getMeasureDataChunk() {
- return measureDataChunk;
+ public MeasureRawColumnChunk[] getMeasureRawDataChunk() {
+ return measureRawDataChunk;
}
/**
- * @param measureDataChunk the measureDataChunk to set
+ * @param measureRawDataChunk the measureRawDataChunk to set
*/
- public void setMeasureDataChunk(MeasureColumnDataChunk[] measureDataChunk) {
- this.measureDataChunk = measureDataChunk;
+ public void setMeasureRawDataChunk(MeasureRawColumnChunk[] measureRawDataChunk) {
+ this.measureRawDataChunk = measureRawDataChunk;
}
/**
@@ -113,11 +120,11 @@ public class BlocksChunkHolder {
* array
*/
public void reset() {
- for (int i = 0; i < measureDataChunk.length; i++) {
- this.measureDataChunk[i] = null;
+ for (int i = 0; i < measureRawDataChunk.length; i++) {
+ this.measureRawDataChunk[i] = null;
}
- for (int i = 0; i < dimensionDataChunk.length; i++) {
- this.dimensionDataChunk[i] = null;
+ for (int i = 0; i < dimensionRawDataChunk.length; i++) {
+ this.dimensionRawDataChunk[i] = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
index f78b75b..5eab7b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.scan.processor.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -37,8 +38,8 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
*/
public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
int batchSize, QueryStatisticsModel queryStatisticsModel,
- BlocksChunkHolder blockChunkHolder) {
- super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, blockChunkHolder);
+ BlocksChunkHolder blockChunkHolder, ExecutorService executorService) {
+ super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, executorService);
}
/**
@@ -64,9 +65,6 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
public void processNextBatch(CarbonColumnarBatch columnarBatch) {
if (updateScanner()) {
this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
- while (columnarBatch.getActualSize() < columnarBatch.getBatchSize() && updateScanner()) {
- this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
index 60e6f67..fbd7044 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -47,16 +48,23 @@ public abstract class AbstractScannedResult {
* current row number
*/
protected int currentRow = -1;
+
+ protected int pageCounter;
/**
* row mapping indexes
*/
- protected int[] rowMapping;
+ protected int[][] rowMapping;
/**
* key size of the fixed length column
*/
private int fixedLengthKeySize;
/**
- * total number of rows
+ * total number of rows per page
+ */
+ private int[] numberOfRows;
+
+ /**
+ * Total number of rows.
*/
private int totalNumberOfRows;
/**
@@ -66,11 +74,16 @@ public abstract class AbstractScannedResult {
/**
* dimension column data chunk
*/
- protected DimensionColumnDataChunk[] dataChunks;
+ protected DimensionColumnDataChunk[][] dataChunks;
+
+ /**
+ * Raw dimension chunks;
+ */
+ protected DimensionRawColumnChunk[] rawColumnChunks;
/**
* measure column data chunk
*/
- protected MeasureColumnDataChunk[] measureDataChunks;
+ protected MeasureColumnDataChunk[][] measureDataChunks;
/**
* dictionary column block index in file
*/
@@ -128,7 +141,7 @@ public abstract class AbstractScannedResult {
*
* @param dataChunks dimension chunks used in query
*/
- public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) {
+ public void setDimensionChunks(DimensionColumnDataChunk[][] dataChunks) {
this.dataChunks = dataChunks;
}
@@ -137,10 +150,14 @@ public abstract class AbstractScannedResult {
*
* @param measureDataChunks measure data chunks
*/
- public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) {
+ public void setMeasureChunks(MeasureColumnDataChunk[][] measureDataChunks) {
this.measureDataChunks = measureDataChunks;
}
+ public void setRawColumnChunks(DimensionRawColumnChunk[] rawColumnChunks) {
+ this.rawColumnChunks = rawColumnChunks;
+ }
+
/**
* Below method will be used to get the chunk based in measure ordinal
*
@@ -148,7 +165,7 @@ public abstract class AbstractScannedResult {
* @return measure column chunk
*/
public MeasureColumnDataChunk getMeasureChunk(int ordinal) {
- return measureDataChunks[ordinal];
+ return measureDataChunks[ordinal][pageCounter];
}
/**
@@ -162,7 +179,7 @@ public abstract class AbstractScannedResult {
byte[] completeKey = new byte[fixedLengthKeySize];
int offset = 0;
for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- offset += dataChunks[dictionaryColumnBlockIndexes[i]]
+ offset += dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
.fillChunkData(completeKey, offset, rowId,
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
}
@@ -181,7 +198,7 @@ public abstract class AbstractScannedResult {
int[] completeKey = new int[totalDimensionsSize];
int column = 0;
for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- column = dataChunks[dictionaryColumnBlockIndexes[i]]
+ column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
.fillConvertedChunkData(rowId, column, completeKey,
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
}
@@ -195,7 +212,7 @@ public abstract class AbstractScannedResult {
public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
int column = 0;
for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- column = dataChunks[dictionaryColumnBlockIndexes[i]]
+ column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
.fillConvertedChunkData(vectorInfo, column,
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
}
@@ -207,7 +224,7 @@ public abstract class AbstractScannedResult {
public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
int column = 0;
for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- column = dataChunks[noDictionaryColumnBlockIndexes[i]]
+ column = dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
.fillConvertedChunkData(vectorInfo, column,
columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
}
@@ -219,7 +236,7 @@ public abstract class AbstractScannedResult {
public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
for (int i = 0; i < measuresOrdinal.length; i++) {
vectorInfo[i].measureVectorFiller
- .fillMeasureVector(measureDataChunks[measuresOrdinal[i]], vectorInfo[i]);
+ .fillMeasureVector(measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
}
}
@@ -233,8 +250,9 @@ public abstract class AbstractScannedResult {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream dataOutput = new DataOutputStream(byteStream);
try {
- vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks,
- rowMapping == null ? j : rowMapping[j], dataOutput);
+ vectorInfos[i].genericQueryType
+ .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks,
+ rowMapping == null ? j : rowMapping[pageCounter][j], pageCounter, dataOutput);
Object data = vectorInfos[i].genericQueryType
.getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
vector.putObject(vectorOffset++, data);
@@ -257,6 +275,31 @@ public abstract class AbstractScannedResult {
}
/**
+ * Just increment the page counter and reset the remaining counters.
+ */
+ public void incrementPageCounter() {
+ rowCounter = 0;
+ currentRow = -1;
+ pageCounter++;
+ }
+
+ public int numberOfpages() {
+ return numberOfRows.length;
+ }
+
+ /**
+ * Get total rows in the current page
+ * @return
+ */
+ public int getCurrentPageRowCount() {
+ return numberOfRows[pageCounter];
+ }
+
+ public int getCurrentPageCounter() {
+ return pageCounter;
+ }
+
+ /**
* increment the counter.
*/
public void setRowCounter(int rowCounter) {
@@ -272,7 +315,7 @@ public abstract class AbstractScannedResult {
* @return dimension data based on row id
*/
protected byte[] getDimensionData(int dimOrdinal, int rowId) {
- return dataChunks[dimOrdinal].getChunkData(rowId);
+ return dataChunks[dimOrdinal][pageCounter].getChunkData(rowId);
}
/**
@@ -287,7 +330,7 @@ public abstract class AbstractScannedResult {
int position = 0;
for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
noDictionaryColumnsKeys[position++] =
- dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId);
+ dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId);
}
return noDictionaryColumnsKeys;
}
@@ -303,8 +346,8 @@ public abstract class AbstractScannedResult {
String[] noDictionaryColumnsKeys = new String[noDictionaryColumnBlockIndexes.length];
int position = 0;
for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- noDictionaryColumnsKeys[position++] =
- new String(dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId));
+ noDictionaryColumnsKeys[position++] = new String(
+ dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId));
}
return noDictionaryColumnsKeys;
}
@@ -353,7 +396,9 @@ public abstract class AbstractScannedResult {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream dataOutput = new DataOutputStream(byteStream);
try {
- genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks, rowId, dataOutput);
+ genericQueryType
+ .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, rowId, pageCounter,
+ dataOutput);
complexTypeData[i] = byteStream.toByteArray();
} catch (IOException e) {
LOGGER.error(e);
@@ -378,8 +423,13 @@ public abstract class AbstractScannedResult {
* @return
*/
public boolean hasNext() {
- if (rowCounter < this.totalNumberOfRows) {
+ if (pageCounter < numberOfRows.length && rowCounter < this.numberOfRows[pageCounter]) {
return true;
+ } else if (pageCounter < numberOfRows.length) {
+ pageCounter++;
+ rowCounter = 0;
+ currentRow = -1;
+ return hasNext();
}
return false;
}
@@ -393,13 +443,18 @@ public abstract class AbstractScannedResult {
public void reset() {
rowCounter = 0;
currentRow = -1;
+ pageCounter = 0;
}
/**
- * @param totalNumberOfRows set total of number rows valid after scanning
+ * @param numberOfRows set total of number rows valid after scanning
*/
- public void setNumberOfRows(int totalNumberOfRows) {
- this.totalNumberOfRows = totalNumberOfRows;
+ public void setNumberOfRows(int[] numberOfRows) {
+ this.numberOfRows = numberOfRows;
+
+ for (int count: numberOfRows) {
+ totalNumberOfRows += count;
+ }
}
/**
@@ -408,7 +463,7 @@ public abstract class AbstractScannedResult {
*
* @param indexes
*/
- public void setIndexes(int[] indexes) {
+ public void setIndexes(int[][] indexes) {
this.rowMapping = indexes;
}
@@ -420,7 +475,8 @@ public abstract class AbstractScannedResult {
* @return whether it is null or not
*/
protected boolean isNullMeasureValue(int ordinal, int rowIndex) {
- return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex);
+ return measureDataChunks[ordinal][pageCounter].getNullValueIndexHolder().getBitSet()
+ .get(rowIndex);
}
/**
@@ -432,7 +488,8 @@ public abstract class AbstractScannedResult {
* @return measure value of long type
*/
protected long getLongMeasureValue(int ordinal, int rowIndex) {
- return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex);
+ return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
+ .getReadableLongValueByIndex(rowIndex);
}
/**
@@ -443,7 +500,7 @@ public abstract class AbstractScannedResult {
* @return measure value of double type
*/
protected double getDoubleMeasureValue(int ordinal, int rowIndex) {
- return measureDataChunks[ordinal].getMeasureDataHolder()
+ return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
.getReadableDoubleValueByIndex(rowIndex);
}
@@ -455,7 +512,7 @@ public abstract class AbstractScannedResult {
* @return measure of big decimal type
*/
protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) {
- return measureDataChunks[ordinal].getMeasureDataHolder()
+ return measureDataChunks[ordinal][pageCounter].getMeasureDataHolder()
.getReadableBigDecimalValueByIndex(rowIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 6ca0570..d1f8b7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -37,7 +37,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
@Override public byte[] getDictionaryKeyArray() {
++currentRow;
- return getDictionaryKeyArray(rowMapping[currentRow]);
+ return getDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
}
/**
@@ -46,7 +46,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
@Override public int[] getDictionaryKeyIntegerArray() {
++currentRow;
- return getDictionaryKeyIntegerArray(rowMapping[currentRow]);
+ return getDictionaryKeyIntegerArray(rowMapping[pageCounter][currentRow]);
}
/**
@@ -55,7 +55,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return complex type key array
*/
@Override public byte[][] getComplexTypeKeyArray() {
- return getComplexTypeKeyArray(rowMapping[currentRow]);
+ return getComplexTypeKeyArray(rowMapping[pageCounter][currentRow]);
}
/**
@@ -65,7 +65,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return no dictionary key array for all the no dictionary dimension
*/
@Override public byte[][] getNoDictionaryKeyArray() {
- return getNoDictionaryKeyArray(rowMapping[currentRow]);
+ return getNoDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
}
/**
@@ -75,7 +75,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return no dictionary key array for all the no dictionary dimension
*/
@Override public String[] getNoDictionaryKeyStringArray() {
- return getNoDictionaryKeyStringArray(rowMapping[currentRow]);
+ return getNoDictionaryKeyStringArray(rowMapping[pageCounter][currentRow]);
}
/**
@@ -84,7 +84,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return valid row id
*/
@Override public int getCurrenrRowId() {
- return rowMapping[currentRow];
+ return rowMapping[pageCounter][currentRow];
}
/**
@@ -93,8 +93,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
int column = 0;
for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- column = dataChunks[dictionaryColumnBlockIndexes[i]]
- .fillConvertedChunkData(rowMapping, vectorInfo, column,
+ column = dataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
+ .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
}
}
@@ -105,8 +105,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
int column = 0;
for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- column = dataChunks[noDictionaryColumnBlockIndexes[i]]
- .fillConvertedChunkData(rowMapping, vectorInfo, column,
+ column = dataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
+ .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
}
}
@@ -116,9 +116,8 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
for (int i = 0; i < measuresOrdinal.length; i++) {
- vectorInfo[i].measureVectorFiller
- .fillMeasureVectorForFilter(rowMapping, measureDataChunks[measuresOrdinal[i]],
- vectorInfo[i]);
+ vectorInfo[i].measureVectorFiller.fillMeasureVectorForFilter(rowMapping[pageCounter],
+ measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index b4bcba6..1176e5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.scan.result.iterator;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -64,8 +65,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
* file reader which will be used to execute the query
*/
protected FileHolder fileReader;
+
protected AbstractDataBlockIterator dataBlockIterator;
- protected boolean nextBatch = false;
/**
* total time scan the blocks
*/
@@ -138,7 +139,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
}
@Override public boolean hasNext() {
- if ((dataBlockIterator != null && dataBlockIterator.hasNext()) || nextBatch) {
+ if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
return true;
} else if (blockExecutionInfos.size() > 0) {
return true;
@@ -168,10 +169,10 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
blockExecutionInfos.remove(executionInfo);
queryStatisticsModel.setRecorder(recorder);
- CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
- blocksChunkHolder.getMeasureDataChunk());
+ CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+ blocksChunkHolder.getMeasureRawDataChunk());
return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel,
- blocksChunkHolder);
+ blocksChunkHolder, execService);
}
return null;
}
@@ -191,8 +192,14 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
}
@Override public void close() {
- CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
- blocksChunkHolder.getMeasureDataChunk());
+ try {
+ fileReader.finish();
+ } catch (IOException e) {
+ LOGGER.error(e);
+ } finally {
+ CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+ blocksChunkHolder.getMeasureRawDataChunk());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index ed4b286..1fa2f4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -17,9 +17,7 @@
package org.apache.carbondata.core.scan.result.iterator;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
@@ -33,7 +31,6 @@ import org.apache.carbondata.core.scan.result.BatchResult;
public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> {
private final Object lock = new Object();
- private Future<BatchResult> future;
public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
ExecutorService execService) {
@@ -41,43 +38,20 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
}
@Override public BatchResult next() {
- BatchResult result;
long startTime = System.currentTimeMillis();
- try {
- if (future == null) {
- future = execute();
- }
- result = future.get();
- nextBatch = false;
- if (hasNext()) {
- nextBatch = true;
- future = execute();
- } else {
- fileReader.finish();
- }
- totalScanTime += System.currentTimeMillis() - startTime;
- } catch (Exception ex) {
- try {
- fileReader.finish();
- } finally {
- throw new RuntimeException(ex);
- }
- }
- return result;
+ BatchResult batchResult = getBatchResult();
+ totalScanTime += System.currentTimeMillis() - startTime;
+ return batchResult;
}
- private Future<BatchResult> execute() {
- return execService.submit(new Callable<BatchResult>() {
- @Override public BatchResult call() {
- BatchResult batchResult = new BatchResult();
- synchronized (lock) {
- updateDataBlockIterator();
- if (dataBlockIterator != null) {
- batchResult.setRows(dataBlockIterator.next());
- }
- }
- return batchResult;
+ private BatchResult getBatchResult() {
+ BatchResult batchResult = new BatchResult();
+ synchronized (lock) {
+ updateDataBlockIterator();
+ if (dataBlockIterator != null) {
+ batchResult.setRows(dataBlockIterator.next());
}
- });
+ }
+ return batchResult;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
index 258a476..341fb21 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -19,12 +19,17 @@ package org.apache.carbondata.core.scan.scanner;
import java.io.IOException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsModel;
@@ -35,51 +40,71 @@ import org.apache.carbondata.core.stats.QueryStatisticsModel;
public abstract class AbstractBlockletScanner implements BlockletScanner {
/**
- * scanner result
- */
- protected AbstractScannedResult scannedResult;
-
- /**
* block execution info
*/
protected BlockExecutionInfo blockExecutionInfo;
public QueryStatisticsModel queryStatisticsModel;
+ private AbstractScannedResult emptyResult;
+
public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
this.blockExecutionInfo = tableBlockExecutionInfos;
}
@Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
throws IOException, FilterUnsupportedException {
- fillKeyValue(blocksChunkHolder);
- return scannedResult;
- }
-
- protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) throws IOException {
-
+ AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+ .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
- totalBlockletStatistic.getCount() + 1);
+ totalBlockletStatistic.getCount() + 1);
queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic);
- QueryStatistic validScannedBlockletStatistic = queryStatisticsModel
- .getStatisticsTypeAndObjMap().get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+ QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
validScannedBlockletStatistic
- .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
- validScannedBlockletStatistic.getCount() + 1);
+ .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+ validScannedBlockletStatistic.getCount() + 1);
queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
- scannedResult.reset();
- scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
scannedResult.setBlockletId(
- blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR
- + blocksChunkHolder.getDataBlock().nodeNumber());
- scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
- .getDimensionChunks(blocksChunkHolder.getFileReader(),
- blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
- scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
- .getMeasureChunks(blocksChunkHolder.getFileReader(),
- blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+ blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+ .getDataBlock().nodeNumber());
+ DimensionRawColumnChunk[] dimensionRawColumnChunks =
+ blocksChunkHolder.getDimensionRawDataChunk();
+ DimensionColumnDataChunk[][] dimensionColumnDataChunks =
+ new DimensionColumnDataChunk[dimensionRawColumnChunks.length][];
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ if (dimensionRawColumnChunks[i] != null) {
+ dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks();
+ }
+ }
+ scannedResult.setDimensionChunks(dimensionColumnDataChunks);
+ MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getMeasureRawDataChunk();
+ MeasureColumnDataChunk[][] measureColumnDataChunks =
+ new MeasureColumnDataChunk[measureRawColumnChunks.length][];
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ if (measureRawColumnChunks[i] != null) {
+ measureColumnDataChunks[i] = measureRawColumnChunks[i].convertToMeasureColDataChunks();
+ }
+ }
+ scannedResult.setMeasureChunks(measureColumnDataChunks);
+ int[] numberOfRows = new int[] { blocksChunkHolder.getDataBlock().nodeSize() };
+ if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ if (dimensionRawColumnChunks[i] != null) {
+ numberOfRows = dimensionRawColumnChunks[i].getRowCount();
+ break;
+ }
+ }
+ } else if (blockExecutionInfo.getAllSelectedMeasureBlocksIndexes().length > 0) {
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ if (measureRawColumnChunks[i] != null) {
+ numberOfRows = measureRawColumnChunks[i].getRowCount();
+ break;
+ }
+ }
+ }
+ scannedResult.setNumberOfRows(numberOfRows);
// loading delete data cache in blockexecutioninfo instance
DeleteDeltaCacheLoaderIntf deleteCacheLoader =
new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
@@ -87,5 +112,32 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
deleteCacheLoader.loadDeleteDeltaFileDataToCache();
scannedResult
.setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+ scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
+ return scannedResult;
+ }
+
+ @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
+ DimensionRawColumnChunk[] dimensionRawColumnChunks = blocksChunkHolder.getDataBlock()
+ .getDimensionChunks(blocksChunkHolder.getFileReader(),
+ blockExecutionInfo.getAllSelectedDimensionBlocksIndexes());
+ blocksChunkHolder.setDimensionRawDataChunk(dimensionRawColumnChunks);
+ MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getDataBlock()
+ .getMeasureChunks(blocksChunkHolder.getFileReader(),
+ blockExecutionInfo.getAllSelectedMeasureBlocksIndexes());
+ blocksChunkHolder.setMeasureRawDataChunk(measureRawColumnChunks);
+ }
+
+ @Override public AbstractScannedResult createEmptyResult() {
+ if (emptyResult == null) {
+ emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+ emptyResult.setNumberOfRows(new int[0]);
+ emptyResult.setIndexes(new int[0][]);
+ }
+ return emptyResult;
+ }
+
+ @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
+ // For non filter it is always true
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
index 6b8a94a..0ed0d43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -29,6 +29,14 @@ import org.apache.carbondata.core.scan.result.AbstractScannedResult;
public interface BlockletScanner {
/**
+ * Checks whether this blocklet required to scan or not based on min max of each blocklet.
+ * @param blocksChunkHolder
+ * @return
+ * @throws IOException
+ */
+ boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException;
+
+ /**
* Below method will used to process the block data and get the scanned result
*
* @param blocksChunkHolder block chunk which holds the block data
@@ -37,4 +45,16 @@ public interface BlockletScanner {
*/
AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
throws IOException, FilterUnsupportedException;
+
+ /**
+ * Just reads the blocklet from file, does not uncompress it.
+ * @param blocksChunkHolder
+ */
+ void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException;
+
+ /**
+ * In case if there is no filter satisfies.
+ * @return AbstractScannedResult
+ */
+ AbstractScannedResult createEmptyResult();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
index 41e4fa5..c3d86aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -24,6 +24,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.mutate.data.BlockletDeleteDeltaCacheLoader;
import org.apache.carbondata.core.mutate.data.DeleteDeltaCacheLoaderIntf;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -36,6 +38,7 @@ import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.BitSetGroup;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -66,7 +69,6 @@ public class FilterScanner extends AbstractBlockletScanner {
public FilterScanner(BlockExecutionInfo blockExecutionInfo,
QueryStatisticsModel queryStatisticsModel) {
super(blockExecutionInfo);
- scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
// to check whether min max is enabled or not
String minMaxEnableValue = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
@@ -87,8 +89,26 @@ public class FilterScanner extends AbstractBlockletScanner {
*/
@Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
throws IOException, FilterUnsupportedException {
- fillScannedResult(blocksChunkHolder);
- return scannedResult;
+ return fillScannedResult(blocksChunkHolder);
+ }
+
+ @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
+ // apply min max
+ if (isMinMaxEnabled) {
+ BitSet bitSet = this.filterExecuter
+ .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
+ blocksChunkHolder.getDataBlock().getColumnsMinValue());
+ if (bitSet.isEmpty()) {
+ CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+ blocksChunkHolder.getMeasureRawDataChunk());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
+ this.filterExecuter.readBlocks(blocksChunkHolder);
}
/**
@@ -107,35 +127,21 @@ public class FilterScanner extends AbstractBlockletScanner {
* @param blocksChunkHolder
* @throws FilterUnsupportedException
*/
- private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
+ private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHolder)
throws FilterUnsupportedException, IOException {
- scannedResult.reset();
- scannedResult.setBlockletId(
- blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
- .getDataBlock().nodeNumber());
- // apply min max
- if (isMinMaxEnabled) {
- BitSet bitSet = this.filterExecuter
- .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
- blocksChunkHolder.getDataBlock().getColumnsMinValue());
- if (bitSet.isEmpty()) {
- scannedResult.setNumberOfRows(0);
- scannedResult.setIndexes(new int[0]);
- CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
- blocksChunkHolder.getMeasureDataChunk());
- return;
- }
- }
// apply filter on actual data
- BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder);
+ BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder);
// if indexes is empty then return with empty result
- if (bitSet.isEmpty()) {
- scannedResult.setNumberOfRows(0);
- scannedResult.setIndexes(new int[0]);
- CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
- blocksChunkHolder.getMeasureDataChunk());
- return;
+ if (bitSetGroup.isEmpty()) {
+ CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
+ blocksChunkHolder.getMeasureRawDataChunk());
+ return createEmptyResult();
}
+
+ AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+ scannedResult.setBlockletId(
+ blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
+ .getDataBlock().nodeNumber());
// valid scanned blocklet
QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
.get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
@@ -143,11 +149,20 @@ public class FilterScanner extends AbstractBlockletScanner {
.addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
validScannedBlockletStatistic.getCount() + 1);
queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+ int[] rowCount = new int[bitSetGroup.getNumberOfPages()];
// get the row indexes from bot set
- int[] indexes = new int[bitSet.cardinality()];
- int index = 0;
- for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
- indexes[index++] = i;
+ int[][] indexesGroup = new int[bitSetGroup.getNumberOfPages()][];
+ for (int k = 0; k < indexesGroup.length; k++) {
+ BitSet bitSet = bitSetGroup.getBitSet(k);
+ if (bitSet != null && !bitSet.isEmpty()) {
+ int[] indexes = new int[bitSet.cardinality()];
+ int index = 0;
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+ indexes[index++] = i;
+ }
+ rowCount[k] = indexes.length;
+ indexesGroup[k] = indexes;
+ }
}
// loading delete data cache in blockexecutioninfo instance
DeleteDeltaCacheLoaderIntf deleteCacheLoader =
@@ -159,42 +174,91 @@ public class FilterScanner extends AbstractBlockletScanner {
FileHolder fileReader = blocksChunkHolder.getFileReader();
int[][] allSelectedDimensionBlocksIndexes =
blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
- DimensionColumnDataChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
+ DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
.getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
- DimensionColumnDataChunk[] dimensionColumnDataChunk =
- new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
+ DimensionRawColumnChunk[] dimensionRawColumnChunks =
+ new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
// read dimension chunk blocks from file which is not present
- for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
- if (null != blocksChunkHolder.getDimensionDataChunk()[i]) {
- dimensionColumnDataChunk[i] = blocksChunkHolder.getDimensionDataChunk()[i];
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ if (null != blocksChunkHolder.getDimensionRawDataChunk()[i]) {
+ dimensionRawColumnChunks[i] = blocksChunkHolder.getDimensionRawDataChunk()[i];
}
}
for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
- System.arraycopy(projectionListDimensionChunk, allSelectedDimensionBlocksIndexes[i][0],
- dimensionColumnDataChunk, allSelectedDimensionBlocksIndexes[i][0],
- allSelectedDimensionBlocksIndexes[i][1] + 1 - allSelectedDimensionBlocksIndexes[i][0]);
+ for (int j = allSelectedDimensionBlocksIndexes[i][0];
+ j <= allSelectedDimensionBlocksIndexes[i][1]; j++) {
+ dimensionRawColumnChunks[j] = projectionListDimensionChunk[j];
+ }
}
- MeasureColumnDataChunk[] measureColumnDataChunk =
- new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
+ /**
+ * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
+ * then loading them
+ */
+ int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
+ int projectionListDimensionIndexesLength = projectionListDimensionIndexes.length;
+ for (int i = 0; i < projectionListDimensionIndexesLength; i++) {
+ if (null == dimensionRawColumnChunks[projectionListDimensionIndexes[i]]) {
+ dimensionRawColumnChunks[projectionListDimensionIndexes[i]] =
+ blocksChunkHolder.getDataBlock()
+ .getDimensionChunk(fileReader, projectionListDimensionIndexes[i]);
+ }
+ }
+ MeasureRawColumnChunk[] measureRawColumnChunks =
+ new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
int[][] allSelectedMeasureBlocksIndexes =
blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
- MeasureColumnDataChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
+ MeasureRawColumnChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
.getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
// read the measure chunk blocks which is not present
- for (int i = 0; i < measureColumnDataChunk.length; i++) {
- if (null != blocksChunkHolder.getMeasureDataChunk()[i]) {
- measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i];
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ if (null != blocksChunkHolder.getMeasureRawDataChunk()[i]) {
+ measureRawColumnChunks[i] = blocksChunkHolder.getMeasureRawDataChunk()[i];
}
}
for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
- System.arraycopy(projectionListMeasureChunk, allSelectedMeasureBlocksIndexes[i][0],
- measureColumnDataChunk, allSelectedMeasureBlocksIndexes[i][0],
- allSelectedMeasureBlocksIndexes[i][1] + 1 - allSelectedMeasureBlocksIndexes[i][0]);
+ for (int j = allSelectedMeasureBlocksIndexes[i][0];
+ j <= allSelectedMeasureBlocksIndexes[i][1]; j++) {
+ measureRawColumnChunks[j] = projectionListMeasureChunk[j];
+ }
+ }
+ /**
+ * in case projection if the projected measure are not loaded in the measureColumnDataChunk
+ * then loading them
+ */
+ int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
+ int projectionListMeasureIndexesLength = projectionListMeasureIndexes.length;
+ for (int i = 0; i < projectionListMeasureIndexesLength; i++) {
+ if (null == measureRawColumnChunks[projectionListMeasureIndexes[i]]) {
+ measureRawColumnChunks[projectionListMeasureIndexes[i]] = blocksChunkHolder.getDataBlock()
+ .getMeasureChunk(fileReader, projectionListMeasureIndexes[i]);
+ }
}
- scannedResult.setDimensionChunks(dimensionColumnDataChunk);
- scannedResult.setIndexes(indexes);
- scannedResult.setMeasureChunks(measureColumnDataChunk);
- scannedResult.setNumberOfRows(indexes.length);
+ DimensionColumnDataChunk[][] dimensionColumnDataChunks =
+ new DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length];
+ MeasureColumnDataChunk[][] measureColumnDataChunks =
+ new MeasureColumnDataChunk[measureRawColumnChunks.length][indexesGroup.length];
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ for (int j = 0; j < indexesGroup.length; j++) {
+ if (dimensionRawColumnChunks[i] != null) {
+ dimensionColumnDataChunks[i][j] =
+ dimensionRawColumnChunks[i].convertToDimColDataChunk(j);
+ }
+ }
+ }
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ for (int j = 0; j < indexesGroup.length; j++) {
+ if (measureRawColumnChunks[i] != null) {
+ measureColumnDataChunks[i][j] =
+ measureRawColumnChunks[i].convertToMeasureColDataChunk(j);
+ }
+ }
+ }
+ scannedResult.setDimensionChunks(dimensionColumnDataChunks);
+ scannedResult.setIndexes(indexesGroup);
+ scannedResult.setMeasureChunks(measureColumnDataChunks);
+ scannedResult.setRawColumnChunks(dimensionRawColumnChunks);
+ scannedResult.setNumberOfRows(rowCount);
+ return scannedResult;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
index cda39f2..1373ed5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.core.scan.scanner.impl;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
import org.apache.carbondata.core.stats.QueryStatisticsModel;
@@ -31,8 +30,6 @@ public class NonFilterScanner extends AbstractBlockletScanner {
public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
QueryStatisticsModel queryStatisticsModel) {
super(blockExecutionInfo);
- // as its a non filter query creating a non filter query scanned result object
- scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
super.queryStatisticsModel = queryStatisticsModel;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
new file mode 100644
index 0000000..07e3487
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+/**
+ * Maintains the group of bitsets.
+ * Each filter executor returns BitSetGroup after filtering the data.
+ */
+public class BitSetGroup {
+
+ private BitSet[] bitSets;
+
+ public BitSetGroup(int groupSize) {
+ bitSets = new BitSet[groupSize];
+ }
+
+ public void setBitSet(BitSet bitSet, int index) {
+ assert index < bitSets.length;
+ bitSets[index] = bitSet;
+ }
+
+ public BitSet getBitSet(int index) {
+ assert index < bitSets.length;
+ return bitSets[index];
+ }
+
+ public boolean isEmpty() {
+ for (BitSet bitSet : bitSets) {
+ if (bitSet != null && !bitSet.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void and(BitSetGroup group) {
+ int i = 0;
+ for (BitSet bitSet : bitSets) {
+ BitSet otherSet = group.getBitSet(i);
+ if (bitSet != null && otherSet != null) {
+ bitSet.and(otherSet);
+ }
+ i++;
+ }
+ }
+
+ public void or(BitSetGroup group) {
+ int i = 0;
+ for (BitSet bitSet : bitSets) {
+ BitSet otherSet = group.getBitSet(i);
+ if (bitSet != null && otherSet != null) {
+ bitSet.or(otherSet);
+ }
+ // if it is null and other set is not null then replace it.
+ if (bitSet == null && otherSet != null) {
+ bitSets[i] = otherSet;
+ }
+ i++;
+ }
+ }
+
+ public int getNumberOfPages() {
+ return bitSets.length;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 11c8870..b9a96d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -45,9 +45,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
@@ -1233,6 +1233,18 @@ public final class CarbonUtil {
}, offset, length);
}
+ public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
+ throws IOException {
+ byte[] data = new byte[length];
+ dataChunkBuffer.position(offset);
+ dataChunkBuffer.get(data);
+ return (DataChunk2) read(data, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new DataChunk2();
+ }
+ }, 0, length);
+ }
+
/**
* Below method will be used to convert the byte array value to thrift object for
* data chunk
@@ -1353,19 +1365,19 @@ public final class CarbonUtil {
return outputArray;
}
- public static void freeMemory(DimensionColumnDataChunk[] dimensionColumnDataChunk,
- MeasureColumnDataChunk[] measureColumnDataChunks) {
- if (null != measureColumnDataChunks) {
- for (int i = 0; i < measureColumnDataChunks.length; i++) {
- if (null != measureColumnDataChunks[i]) {
- measureColumnDataChunks[i].freeMemory();
+ public static void freeMemory(DimensionRawColumnChunk[] dimensionRawColumnChunks,
+ MeasureRawColumnChunk[] measureRawColumnChunks) {
+ if (null != measureRawColumnChunks) {
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ if (null != measureRawColumnChunks[i]) {
+ measureRawColumnChunks[i].freeMemory();
}
}
}
- if (null != dimensionColumnDataChunk) {
- for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
- if (null != dimensionColumnDataChunk[i]) {
- dimensionColumnDataChunk[i].freeMemory();
+ if (null != dimensionRawColumnChunks) {
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ if (null != dimensionRawColumnChunks[i]) {
+ dimensionRawColumnChunks[i].freeMemory();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 5b89096..4882b0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -98,8 +98,17 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
blockletInfoThrift.getColumn_data_chunks_offsets().size());
blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
- blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
- blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+
+ List<Integer> dimensionColumnChunkLengthInteger = new ArrayList<Integer>();
+ List<Integer> measureColumnChunkLengthInteger = new ArrayList<Integer>();
+ for (int i = 0; i < dimensionColumnChunkLength.size(); i++) {
+ dimensionColumnChunkLengthInteger.add(dimensionColumnChunkLength.get(i).intValue());
+ }
+ for (int i = 0; i < measureColumnChunksLength.size(); i++) {
+ measureColumnChunkLengthInteger.add(measureColumnChunksLength.get(i).intValue());
+ }
+ blockletInfo.setDimensionChunksLength(dimensionColumnChunkLengthInteger);
+ blockletInfo.setMeasureChunksLength(measureColumnChunkLengthInteger);
blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
return blockletInfo;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index bf19e08..5f69753 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -23,6 +23,7 @@ import java.math.RoundingMode;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -316,6 +317,87 @@ public final class DataTypeUtil {
}
+ /**
+ * Below method will be used to convert the data passed to its actual data
+ * type
+ *
+ * @param dataInBytes data
+ * @param actualDataType actual data type
+ * @return actual data after conversion
+ */
+ public static Object getDataBasedOnDataType(byte[] dataInBytes, DataType actualDataType) {
+ if (null == dataInBytes || Arrays
+ .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) {
+ return null;
+ }
+ try {
+ switch (actualDataType) {
+ case INT:
+ String data1 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data1.isEmpty()) {
+ return null;
+ }
+ return Integer.parseInt(data1);
+ case SHORT:
+ String data2 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data2.isEmpty()) {
+ return null;
+ }
+ return Short.parseShort(data2);
+ case DOUBLE:
+ String data3 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data3.isEmpty()) {
+ return null;
+ }
+ return Double.parseDouble(data3);
+ case LONG:
+ String data4 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data4.isEmpty()) {
+ return null;
+ }
+ return Long.parseLong(data4);
+ case DATE:
+ String data5 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data5.isEmpty()) {
+ return null;
+ }
+ try {
+ Date dateToStr = dateformatter.get().parse(data5);
+ return dateToStr.getTime() * 1000;
+ } catch (ParseException e) {
+ LOGGER.error("Cannot convert" + data5 + " to Time/Long type value" + e.getMessage());
+ return null;
+ }
+
+ case TIMESTAMP:
+ String data6 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data6.isEmpty()) {
+ return null;
+ }
+ try {
+ Date dateToStr = timeStampformatter.get().parse(data6);
+ return dateToStr.getTime() * 1000;
+ } catch (ParseException e) {
+ LOGGER.error("Cannot convert" + data6 + " to Time/Long type value" + e.getMessage());
+ return null;
+ }
+ case DECIMAL:
+ String data7 = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ if (data7.isEmpty()) {
+ return null;
+ }
+ java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data7);
+ return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+ default:
+ return UTF8String.fromBytes(dataInBytes);
+ }
+ } catch (NumberFormatException ex) {
+ String data = new String(dataInBytes, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+ LOGGER.error("Problem while converting data type" + data);
+ return null;
+ }
+ }
+
public static Object getMeasureDataBasedOnDataType(Object data, DataType dataType) {
if (null == data) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index f2d110f..1d00199 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -95,7 +95,7 @@ public class DataTypeUtilTest {
assertEquals(getDataBasedOnDataType("1", DataType.DECIMAL), expected);
assertEquals(getDataBasedOnDataType("default", DataType.NULL),
UTF8String.fromString("default"));
- assertEquals(getDataBasedOnDataType(null, DataType.NULL), null);
+ assertEquals(getDataBasedOnDataType((String) null, DataType.NULL), null);
}
@Test public void testGetMeasureDataBasedOnDataType() throws NumberFormatException {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
index 4a9cf81..07db1ba 100644
--- a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
+++ b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java
@@ -117,7 +117,7 @@ public class FilterScannerTest {
// DimensionChunkAttributes dimensionChunkAttributes = new DimensionChunkAttributes();
// DimensionColumnDataChunk dimensionColumnDataChunk =
// new FixedLengthDimensionDataChunk(new byte[] { 0, 1 }, dimensionChunkAttributes);
-// blocksChunkHolder.setDimensionDataChunk(new DimensionColumnDataChunk[]
+// blocksChunkHolder.setDimensionRawDataChunk(new DimensionColumnDataChunk[]
//
// { dimensionColumnDataChunk });
// MeasureColumnDataChunk measureColumnDataChunk = new MeasureColumnDataChunk();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 7333115..d7bab75 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -46,7 +46,7 @@ public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
}
if (dictionaries[i] != null) {
data[i] = DataTypeUtil
- .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
+ .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]),
dataTypes[i]);
if (data[i] == null) {
continue;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/72cb415a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 470ca1f..3da8299 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -182,34 +182,21 @@ case class CarbonDictionaryDecoder(
)
new Iterator[InternalRow] {
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
- var flag = true
- var total = 0L
-
override final def hasNext: Boolean = {
- flag = iter.hasNext
- if (!flag && total > 0) {
- val queryStatistic = new QueryStatistic()
- queryStatistic
- .addFixedTimeStatistic(QueryStatisticsConstants.PREPARE_RESULT, total)
- recorder.recordStatistics(queryStatistic)
- recorder.logStatistics()
- }
- flag
+ iter.hasNext
}
override final def next(): InternalRow = {
- val startTime = System.currentTimeMillis()
val row: InternalRow = iter.next()
val data = row.toSeq(dataTypes).toArray
dictIndex.foreach { index =>
if (data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
- .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+ .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
getDictionaryColumnIds(index)._3)
}
}
val result = unsafeProjection(new GenericMutableRow(data))
- total += System.currentTimeMillis() - startTime
result
}
}