You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:51 UTC
[33/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor
query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
index 553f85e..773fbd7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
@@ -17,20 +17,15 @@
package org.apache.carbondata.core.scan.result.iterator;
import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]> {
- private CarbonIterator<BatchResult> iterator;
- private BatchResult batch;
+ private CarbonIterator<RowBatch> iterator;
+ private RowBatch batch;
private int counter;
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName());
-
- public PartitionSpliterRawResultIterator(CarbonIterator<BatchResult> iterator) {
+ public PartitionSpliterRawResultIterator(CarbonIterator<RowBatch> iterator) {
this.iterator = iterator;
}
@@ -65,7 +60,7 @@ public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]>
* @param batch
* @return
*/
- private boolean checkBatchEnd(BatchResult batch) {
+ private boolean checkBatchEnd(RowBatch batch) {
return !(counter < batch.getSize());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 70d0958..1dd1595 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
/**
@@ -37,7 +37,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
/**
* Iterator of the Batch raw result.
*/
- private CarbonIterator<BatchResult> detailRawQueryResultIterator;
+ private CarbonIterator<RowBatch> detailRawQueryResultIterator;
/**
* Counter to maintain the row counter.
@@ -55,9 +55,9 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
/**
* batch of the result.
*/
- private BatchResult batch;
+ private RowBatch batch;
- public RawResultIterator(CarbonIterator<BatchResult> detailRawQueryResultIterator,
+ public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator,
SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
@@ -155,7 +155,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> {
* @param batch
* @return
*/
- private boolean checkIfBatchIsProcessedCompletely(BatchResult batch) {
+ private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
if (counter < batch.getSize()) {
return false;
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
index cc9710e..c7cb00d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -35,10 +35,12 @@ public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIt
super(infos, queryModel, execService);
}
- @Override public Object next() {
+ @Override
+ public Object next() {
throw new UnsupportedOperationException("call processNextBatch instead");
}
+ @Override
public void processNextBatch(CarbonColumnarBatch columnarBatch) {
synchronized (lock) {
updateDataBlockIterator();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index cfc2f16..973ce0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -87,7 +87,4 @@ public class CarbonColumnarBatch {
}
}
- public int getRowsFilteredCount() {
- return rowsFiltered;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index a5f81b9..59117dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -18,16 +18,16 @@ package org.apache.carbondata.core.scan.result.vector;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
public int offset;
public int size;
public CarbonColumnVector vector;
public int vectorOffset;
- public QueryDimension dimension;
- public QueryMeasure measure;
+ public ProjectionDimension dimension;
+ public ProjectionMeasure measure;
public int ordinal;
public DirectDictionaryGenerator directDictionaryGenerator;
public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
index db4c982..8902dfb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -29,7 +29,7 @@ public class MeasureDataVectorProcessor {
void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info);
- void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+ void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info);
}
@@ -60,7 +60,7 @@ public class MeasureDataVectorProcessor {
}
@Override
- public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+ public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
@@ -69,13 +69,13 @@ public class MeasureDataVectorProcessor {
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
vector.putInt(vectorOffset, (int)dataChunk.getLong(currentRow));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
@@ -117,7 +117,7 @@ public class MeasureDataVectorProcessor {
}
@Override
- public void fillMeasureVectorForFilter(int[] rowMapping,
+ public void fillMeasureVector(int[] filteredRowId,
ColumnPage dataChunk, ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
@@ -126,13 +126,13 @@ public class MeasureDataVectorProcessor {
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
vector.putBoolean(vectorOffset, dataChunk.getBoolean(currentRow));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
@@ -171,7 +171,7 @@ public class MeasureDataVectorProcessor {
}
@Override
- public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+ public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
@@ -180,13 +180,13 @@ public class MeasureDataVectorProcessor {
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
vector.putShort(vectorOffset, (short) dataChunk.getLong(currentRow));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
@@ -225,7 +225,7 @@ public class MeasureDataVectorProcessor {
}
@Override
- public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+ public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
@@ -234,13 +234,13 @@ public class MeasureDataVectorProcessor {
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
vector.putLong(vectorOffset, dataChunk.getLong(currentRow));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
@@ -279,7 +279,7 @@ public class MeasureDataVectorProcessor {
}
@Override
- public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+ public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
@@ -288,7 +288,7 @@ public class MeasureDataVectorProcessor {
int precision = info.measure.getMeasure().getPrecision();
BitSet nullBitSet = dataChunk.getNullBits();
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
@@ -330,7 +330,7 @@ public class MeasureDataVectorProcessor {
}
@Override
- public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk,
+ public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk,
ColumnVectorInfo info) {
int offset = info.offset;
int len = offset + info.size;
@@ -339,13 +339,13 @@ public class MeasureDataVectorProcessor {
BitSet nullBitSet = dataChunk.getNullBits();
if (nullBitSet.isEmpty()) {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
vector.putDouble(vectorOffset, dataChunk.getDouble(currentRow));
vectorOffset++;
}
} else {
for (int i = offset; i < len; i++) {
- int currentRow = rowMapping[i];
+ int currentRow = filteredRowId[i];
if (nullBitSet.get(currentRow)) {
vector.putNull(vectorOffset);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
deleted file mode 100644
index bf26ca3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.scanner;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-
-/**
- * Blocklet scanner class to process the block
- */
-public abstract class AbstractBlockletScanner implements BlockletScanner {
-
- /**
- * block execution info
- */
- protected BlockExecutionInfo blockExecutionInfo;
-
- public QueryStatisticsModel queryStatisticsModel;
-
- private AbstractScannedResult emptyResult;
-
- public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
- this.blockExecutionInfo = tableBlockExecutionInfos;
- }
-
- @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
- throws IOException, FilterUnsupportedException {
- long startTime = System.currentTimeMillis();
- AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
- QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
- totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
- totalBlockletStatistic.getCount() + 1);
- QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
- validScannedBlockletStatistic
- .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
- validScannedBlockletStatistic.getCount() + 1);
- // adding statistics for valid number of pages
- QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
- validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
- validPages.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
- // adding statistics for number of pages
- QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
- totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
- totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
- scannedResult.setBlockletId(
- blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
- .getDataBlock().blockletId());
- if (!blockExecutionInfo.isPrefetchBlocklet()) {
- readBlocklet(blocksChunkHolder);
- }
- DimensionRawColumnChunk[] dimensionRawColumnChunks =
- blocksChunkHolder.getDimensionRawDataChunk();
- DimensionColumnDataChunk[][] dimensionColumnDataChunks =
- new DimensionColumnDataChunk[dimensionRawColumnChunks.length][blocksChunkHolder
- .getDataBlock().numberOfPages()];
- MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getMeasureRawDataChunk();
- ColumnPage[][] columnPages =
- new ColumnPage[measureRawColumnChunks.length][blocksChunkHolder.getDataBlock()
- .numberOfPages()];
- scannedResult.setDimensionChunks(dimensionColumnDataChunks);
- scannedResult.setMeasureChunks(columnPages);
- scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
- scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
- if (blockExecutionInfo.isPrefetchBlocklet()) {
- for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
- if (dimensionRawColumnChunks[i] != null) {
- dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks();
- }
- }
- for (int i = 0; i < measureRawColumnChunks.length; i++) {
- if (measureRawColumnChunks[i] != null) {
- columnPages[i] = measureRawColumnChunks[i].convertToColumnPage();
- }
- }
- }
- int[] numberOfRows = null;
- if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) {
- for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
- if (dimensionRawColumnChunks[i] != null) {
- numberOfRows = dimensionRawColumnChunks[i].getRowCount();
- break;
- }
- }
- } else if (blockExecutionInfo.getAllSelectedMeasureBlocksIndexes().length > 0) {
- for (int i = 0; i < measureRawColumnChunks.length; i++) {
- if (measureRawColumnChunks[i] != null) {
- numberOfRows = measureRawColumnChunks[i].getRowCount();
- break;
- }
- }
- }
-
- // count(*) case there would not be any dimensions are measures selected.
- if (numberOfRows == null) {
- numberOfRows = new int[blocksChunkHolder.getDataBlock().numberOfPages()];
- for (int i = 0; i < numberOfRows.length; i++) {
- numberOfRows[i] =
- CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
- }
- int lastPageSize = blocksChunkHolder.getDataBlock().nodeSize()
- % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
- ;
- if (lastPageSize > 0) {
- numberOfRows[numberOfRows.length - 1] = lastPageSize;
- }
- }
- scannedResult.setNumberOfRows(numberOfRows);
- if (!blockExecutionInfo.isPrefetchBlocklet()) {
- scannedResult.fillDataChunks();
- }
- // adding statistics for carbon scan time
- QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
- scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
- scanTime.getCount() + (System.currentTimeMillis() - startTime));
- return scannedResult;
- }
-
- @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
- long startTime = System.currentTimeMillis();
- DimensionRawColumnChunk[] dimensionRawColumnChunks = blocksChunkHolder.getDataBlock()
- .getDimensionChunks(blocksChunkHolder.getFileReader(),
- blockExecutionInfo.getAllSelectedDimensionBlocksIndexes());
- blocksChunkHolder.setDimensionRawDataChunk(dimensionRawColumnChunks);
- MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getDataBlock()
- .getMeasureChunks(blocksChunkHolder.getFileReader(),
- blockExecutionInfo.getAllSelectedMeasureBlocksIndexes());
- blocksChunkHolder.setMeasureRawDataChunk(measureRawColumnChunks);
- // adding statistics for carbon read time
- QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
- readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
- readTime.getCount() + (System.currentTimeMillis() - startTime));
- }
-
- @Override public AbstractScannedResult createEmptyResult() {
- if (emptyResult == null) {
- emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
- emptyResult.setNumberOfRows(new int[0]);
- emptyResult.setIndexes(new int[0][]);
- }
- return emptyResult;
- }
-
- @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
- // For non filter it is always true
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
index 0ed0d43..0a41032 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -18,9 +18,10 @@ package org.apache.carbondata.core.scan.scanner;
import java.io.IOException;
+import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
/**
* Interface for processing the block
@@ -30,31 +31,26 @@ public interface BlockletScanner {
/**
* Checks whether this blocklet required to scan or not based on min max of each blocklet.
- * @param blocksChunkHolder
+ * @param dataBlock
* @return
* @throws IOException
*/
- boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException;
+ boolean isScanRequired(DataRefNode dataBlock);
/**
* Below method will used to process the block data and get the scanned result
*
- * @param blocksChunkHolder block chunk which holds the block data
+ * @param rawBlockletColumnChunks block chunk which holds the block data
* @return scannerResult
* result after processing
*/
- AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+ BlockletScannedResult scanBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks)
throws IOException, FilterUnsupportedException;
/**
* Just reads the blocklet from file, does not uncompress it.
- * @param blocksChunkHolder
+ * @param rawBlockletColumnChunks
*/
- void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException;
+ void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException;
- /**
- * In case if there is no filter satisfies.
- * @return AbstractScannedResult
- */
- AbstractScannedResult createEmptyResult();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
new file mode 100644
index 0000000..1c73d63
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.BitSetGroup;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Below class will be used for filter query processing
+ * this class will be first apply the filter then it will read the column page if
+ * required and return the scanned result
+ */
+public class BlockletFilterScanner extends BlockletFullScanner {
+
+ /**
+ * filter executer to evaluate filter condition
+ */
+ private FilterExecuter filterExecuter;
+ /**
+ * this will be used to apply min max
+ * this will be useful for dimension column which is on the right side
+ * as node finder will always give tentative blocks, if column data stored individually
+ * and data is in sorted order then we can check whether filter is in the range of min max or not
+ * if it present then only we can apply filter on complete data.
+ * this will be very useful in case of sparse data when rows are
+ * repeating.
+ */
+ private boolean isMinMaxEnabled;
+
+ private QueryStatisticsModel queryStatisticsModel;
+
+ private boolean useBitSetPipeLine;
+
+ public BlockletFilterScanner(BlockExecutionInfo blockExecutionInfo,
+ QueryStatisticsModel queryStatisticsModel) {
+ super(blockExecutionInfo, queryStatisticsModel);
+ // to check whether min max is enabled or not
+ String minMaxEnableValue = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
+ CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
+ if (null != minMaxEnableValue) {
+ isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
+ }
+ // get the filter tree
+ this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
+ this.queryStatisticsModel = queryStatisticsModel;
+
+ String useBitSetPipeLine = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.BITSET_PIPE_LINE,
+ CarbonCommonConstants.BITSET_PIPE_LINE_DEFAULT);
+ if (null != useBitSetPipeLine) {
+ this.useBitSetPipeLine = Boolean.parseBoolean(useBitSetPipeLine);
+ }
+ }
+
+ /**
+ * Below method will be used to process the block
+ *
+ * @param rawBlockletColumnChunks block chunk holder which holds the data
+ * @throws FilterUnsupportedException
+ */
+ @Override
+ public BlockletScannedResult scanBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks)
+ throws IOException, FilterUnsupportedException {
+ return executeFilter(rawBlockletColumnChunks);
+ }
+
+ @Override
+ public boolean isScanRequired(DataRefNode dataBlock) {
+ // adding statistics for number of pages
+ QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
+ totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
+ totalPagesScanned.getCount() + dataBlock.numberOfPages());
+ // apply min max
+ if (isMinMaxEnabled) {
+ BitSet bitSet = null;
+ // check for implicit include filter instance
+ if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
+ String blockletId = blockExecutionInfo.getBlockIdString() +
+ CarbonCommonConstants.FILE_SEPARATOR + dataBlock.blockletIndex();
+ bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
+ .isFilterValuesPresentInBlockOrBlocklet(
+ dataBlock.getColumnsMaxValue(),
+ dataBlock.getColumnsMinValue(), blockletId);
+ } else {
+ bitSet = this.filterExecuter
+ .isScanRequired(dataBlock.getColumnsMaxValue(),
+ dataBlock.getColumnsMinValue());
+ }
+ return !bitSet.isEmpty();
+ }
+ return true;
+ }
+
+ @Override
+ public void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException {
+ long startTime = System.currentTimeMillis();
+ this.filterExecuter.readColumnChunks(rawBlockletColumnChunks);
+ // adding statistics for carbon read time
+ QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
+ readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
+ readTime.getCount() + (System.currentTimeMillis() - startTime));
+ }
+
+ /**
+ * This method will process the data in below order
+ * 1. first apply min max on the filter tree and check whether any of the filter
+ * is fall on the range of min max, if not then return empty result
+ * 2. If filter falls on min max range then apply filter on actual
+ * data and get the filtered row index
+ * 3. if row index is empty then return the empty result
+ * 4. if row indexes is not empty then read only those blocks(measure or dimension)
+ * which was present in the query but not present in the filter, as while applying filter
+ * some of the blocks where already read and present in chunk holder so not need to
+ * read those blocks again, this is to avoid reading of same blocks which was already read
+ * 5. Set the blocks and filter indexes to result
+ *
+ * @param rawBlockletColumnChunks
+ * @throws FilterUnsupportedException
+ */
+ private BlockletScannedResult executeFilter(RawBlockletColumnChunks rawBlockletColumnChunks)
+ throws FilterUnsupportedException, IOException {
+ long startTime = System.currentTimeMillis();
+ QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+ totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+ totalBlockletStatistic.getCount() + 1);
+ // apply filter on actual data, for each page
+ BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks,
+ useBitSetPipeLine);
+ // if filter result is empty then return with empty result
+ if (bitSetGroup.isEmpty()) {
+ CarbonUtil.freeMemory(rawBlockletColumnChunks.getDimensionRawColumnChunks(),
+ rawBlockletColumnChunks.getMeasureRawColumnChunks());
+
+ QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+ scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+ scanTime.getCount() + (System.currentTimeMillis() - startTime));
+
+ QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.PAGE_SCANNED);
+ scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
+ scannedPages.getCount() + bitSetGroup.getScannedPages());
+ return createEmptyResult();
+ }
+
+ BlockletScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+ scannedResult.setBlockletId(
+ blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
+ rawBlockletColumnChunks.getDataBlock().blockletIndex());
+ // valid scanned blocklet
+ QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+ validScannedBlockletStatistic
+ .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+ validScannedBlockletStatistic.getCount() + 1);
+ // adding statistics for valid number of pages
+ QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
+ validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
+ validPages.getCount() + bitSetGroup.getValidPages());
+ QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.PAGE_SCANNED);
+ scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
+ scannedPages.getCount() + bitSetGroup.getScannedPages());
+ int[] pageFilteredRowCount = new int[bitSetGroup.getNumberOfPages()];
+ // get the row indexes from bit set for each page
+ int[][] pageFilteredRowId = new int[bitSetGroup.getNumberOfPages()][];
+ int numPages = pageFilteredRowId.length;
+ for (int pageId = 0; pageId < numPages; pageId++) {
+ BitSet bitSet = bitSetGroup.getBitSet(pageId);
+ if (bitSet != null && !bitSet.isEmpty()) {
+ int[] matchedRowId = new int[bitSet.cardinality()];
+ int index = 0;
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+ matchedRowId[index++] = i;
+ }
+ pageFilteredRowCount[pageId] = matchedRowId.length;
+ pageFilteredRowId[pageId] = matchedRowId;
+ }
+ }
+
+ long dimensionReadTime = System.currentTimeMillis();
+ dimensionReadTime = System.currentTimeMillis() - dimensionReadTime;
+
+ FileReader fileReader = rawBlockletColumnChunks.getFileReader();
+
+
+ DimensionRawColumnChunk[] dimensionRawColumnChunks =
+ new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionToRead()];
+ int numDimensionChunks = dimensionRawColumnChunks.length;
+ // read dimension chunk blocks from file which is not present
+ for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) {
+ dimensionRawColumnChunks[chunkIndex] =
+ rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex];
+ }
+ int[][] allSelectedDimensionColumnIndexRange =
+ blockExecutionInfo.getAllSelectedDimensionColumnIndexRange();
+ DimensionRawColumnChunk[] projectionListDimensionChunk = rawBlockletColumnChunks.getDataBlock()
+ .readDimensionChunks(fileReader, allSelectedDimensionColumnIndexRange);
+ for (int[] columnIndexRange : allSelectedDimensionColumnIndexRange) {
+ System.arraycopy(projectionListDimensionChunk, columnIndexRange[0],
+ dimensionRawColumnChunks, columnIndexRange[0],
+ columnIndexRange[1] + 1 - columnIndexRange[0]);
+ }
+
+ /*
+ * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
+ * then loading them
+ */
+ int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
+ for (int projectionListDimensionIndex : projectionListDimensionIndexes) {
+ if (null == dimensionRawColumnChunks[projectionListDimensionIndex]) {
+ dimensionRawColumnChunks[projectionListDimensionIndex] =
+ rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
+ fileReader, projectionListDimensionIndex);
+ }
+ }
+
+ DimensionColumnPage[][] dimensionColumnPages =
+ new DimensionColumnPage[numDimensionChunks][numPages];
+ for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) {
+ if (dimensionRawColumnChunks[chunkIndex] != null) {
+ for (int pageId = 0; pageId < numPages; pageId++) {
+ dimensionColumnPages[chunkIndex][pageId] =
+ dimensionRawColumnChunks[chunkIndex].decodeColumnPage(pageId);
+ }
+ }
+ }
+
+
+ MeasureRawColumnChunk[] measureRawColumnChunks =
+ new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureToRead()];
+ int numMeasureChunks = measureRawColumnChunks.length;
+
+ // read the measure chunk blocks which is not present
+ for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) {
+ if (null != rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
+ measureRawColumnChunks[chunkIndex] =
+ rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex];
+ }
+ }
+
+ int[][] allSelectedMeasureColumnIndexRange =
+ blockExecutionInfo.getAllSelectedMeasureIndexRange();
+ MeasureRawColumnChunk[] projectionListMeasureChunk = rawBlockletColumnChunks.getDataBlock()
+ .readMeasureChunks(fileReader, allSelectedMeasureColumnIndexRange);
+ for (int[] columnIndexRange : allSelectedMeasureColumnIndexRange) {
+ System.arraycopy(projectionListMeasureChunk, columnIndexRange[0], measureRawColumnChunks,
+ columnIndexRange[0], columnIndexRange[1] + 1 - columnIndexRange[0]);
+ }
+ /*
+ * in case projection if the projected measure are not loaded in the ColumnPage
+ * then loading them
+ */
+ int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
+ for (int projectionListMeasureIndex : projectionListMeasureIndexes) {
+ if (null == measureRawColumnChunks[projectionListMeasureIndex]) {
+ measureRawColumnChunks[projectionListMeasureIndex] = rawBlockletColumnChunks.getDataBlock()
+ .readMeasureChunk(fileReader, projectionListMeasureIndex);
+ }
+ }
+ ColumnPage[][] measureColumnPages = new ColumnPage[numMeasureChunks][numPages];
+ for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) {
+ if (measureRawColumnChunks[chunkIndex] != null) {
+ for (int pageId = 0; pageId < numPages; pageId++) {
+ measureColumnPages[chunkIndex][pageId] =
+ measureRawColumnChunks[chunkIndex].decodeColumnPage(pageId);
+ }
+ }
+ }
+
+ scannedResult.setDimensionColumnPages(dimensionColumnPages);
+ scannedResult.setPageFilteredRowId(pageFilteredRowId);
+ scannedResult.setMeasureColumnPages(measureColumnPages);
+ scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
+ scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
+ scannedResult.setPageFilteredRowCount(pageFilteredRowCount);
+ // adding statistics for carbon scan time
+ QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+ scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+ scanTime.getCount() + (System.currentTimeMillis() - startTime - dimensionReadTime));
+ QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
+ readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
+ readTime.getCount() + dimensionReadTime);
+ return scannedResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
new file mode 100644
index 0000000..f0211dc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+
+/**
+ * Blocklet scanner to do full scan of a blocklet,
+ * returning all projection and filter column chunks
+ */
+public class BlockletFullScanner implements BlockletScanner {
+
+ /**
+ * block execution info
+ */
+ protected BlockExecutionInfo blockExecutionInfo;
+
+ private QueryStatisticsModel queryStatisticsModel;
+
+ private BlockletScannedResult emptyResult;
+
+ public BlockletFullScanner(BlockExecutionInfo tableBlockExecutionInfos,
+ QueryStatisticsModel queryStatisticsModel) {
+ this.blockExecutionInfo = tableBlockExecutionInfos;
+ this.queryStatisticsModel = queryStatisticsModel;
+ }
+
+ @Override
+ public BlockletScannedResult scanBlocklet(
+ RawBlockletColumnChunks rawBlockletColumnChunks)
+ throws IOException, FilterUnsupportedException {
+ long startTime = System.currentTimeMillis();
+ BlockletScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+ QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+ totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+ totalBlockletStatistic.getCount() + 1);
+ QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+ validScannedBlockletStatistic
+ .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+ validScannedBlockletStatistic.getCount() + 1);
+ // adding statistics for valid number of pages
+ QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
+ validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
+ validPages.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
+ // adding statistics for number of pages
+ QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
+ totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
+ totalPagesScanned.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
+ scannedResult.setBlockletId(
+ blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
+ rawBlockletColumnChunks.getDataBlock().blockletIndex());
+ if (!blockExecutionInfo.isPrefetchBlocklet()) {
+ readBlocklet(rawBlockletColumnChunks);
+ }
+ DimensionRawColumnChunk[] dimensionRawColumnChunks =
+ rawBlockletColumnChunks.getDimensionRawColumnChunks();
+ DimensionColumnPage[][] dimensionColumnDataChunks =
+ new DimensionColumnPage[dimensionRawColumnChunks.length][rawBlockletColumnChunks
+ .getDataBlock().numberOfPages()];
+ MeasureRawColumnChunk[] measureRawColumnChunks =
+ rawBlockletColumnChunks.getMeasureRawColumnChunks();
+ ColumnPage[][] measureColumnPages =
+ new ColumnPage[measureRawColumnChunks.length][rawBlockletColumnChunks.getDataBlock()
+ .numberOfPages()];
+ scannedResult.setDimensionColumnPages(dimensionColumnDataChunks);
+ scannedResult.setMeasureColumnPages(measureColumnPages);
+ scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
+ scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
+ if (blockExecutionInfo.isPrefetchBlocklet()) {
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ if (dimensionRawColumnChunks[i] != null) {
+ dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].decodeAllColumnPages();
+ }
+ }
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ if (measureRawColumnChunks[i] != null) {
+ measureColumnPages[i] = measureRawColumnChunks[i].decodeAllColumnPages();
+ }
+ }
+ }
+ int[] numberOfRows = null;
+ if (blockExecutionInfo.getAllSelectedDimensionColumnIndexRange().length > 0) {
+ for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
+ if (dimensionRawColumnChunks[i] != null) {
+ numberOfRows = dimensionRawColumnChunks[i].getRowCount();
+ break;
+ }
+ }
+ } else if (blockExecutionInfo.getAllSelectedMeasureIndexRange().length > 0) {
+ for (int i = 0; i < measureRawColumnChunks.length; i++) {
+ if (measureRawColumnChunks[i] != null) {
+ numberOfRows = measureRawColumnChunks[i].getRowCount();
+ break;
+ }
+ }
+ }
+
+ // count(*) case there would not be any dimensions are measures selected.
+ if (numberOfRows == null) {
+ numberOfRows = new int[rawBlockletColumnChunks.getDataBlock().numberOfPages()];
+ for (int i = 0; i < numberOfRows.length; i++) {
+ numberOfRows[i] =
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+ }
+ int lastPageSize = rawBlockletColumnChunks.getDataBlock().numRows()
+ % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+ ;
+ if (lastPageSize > 0) {
+ numberOfRows[numberOfRows.length - 1] = lastPageSize;
+ }
+ }
+ scannedResult.setPageFilteredRowCount(numberOfRows);
+ if (!blockExecutionInfo.isPrefetchBlocklet()) {
+ scannedResult.fillDataChunks();
+ }
+ // adding statistics for carbon scan time
+ QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
+ scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
+ scanTime.getCount() + (System.currentTimeMillis() - startTime));
+ return scannedResult;
+ }
+
+ @Override
+ public void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks)
+ throws IOException {
+ long startTime = System.currentTimeMillis();
+ DimensionRawColumnChunk[] dimensionRawColumnChunks = rawBlockletColumnChunks.getDataBlock()
+ .readDimensionChunks(rawBlockletColumnChunks.getFileReader(),
+ blockExecutionInfo.getAllSelectedDimensionColumnIndexRange());
+ rawBlockletColumnChunks.setDimensionRawColumnChunks(dimensionRawColumnChunks);
+ MeasureRawColumnChunk[] measureRawColumnChunks = rawBlockletColumnChunks.getDataBlock()
+ .readMeasureChunks(rawBlockletColumnChunks.getFileReader(),
+ blockExecutionInfo.getAllSelectedMeasureIndexRange());
+ rawBlockletColumnChunks.setMeasureRawColumnChunks(measureRawColumnChunks);
+ // adding statistics for carbon read time
+ QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+ .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
+ readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
+ readTime.getCount() + (System.currentTimeMillis() - startTime));
+ }
+
+ BlockletScannedResult createEmptyResult() {
+ if (emptyResult == null) {
+ emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+ emptyResult.setPageFilteredRowCount(new int[0]);
+ emptyResult.setPageFilteredRowId(new int[0][]);
+ }
+ return emptyResult;
+ }
+
+ @Override public boolean isScanRequired(DataRefNode dataBlock) {
+ // For non filter it is always true
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
deleted file mode 100644
index e77093b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.scan.scanner.impl;
-
-import java.io.IOException;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult;
-import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-import org.apache.carbondata.core.util.BitSetGroup;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Below class will be used for filter query processing
- * this class will be first apply the filter then it will read the block if
- * required and return the scanned result
- */
-public class FilterScanner extends AbstractBlockletScanner {
-
- /**
- * filter tree
- */
- private FilterExecuter filterExecuter;
- /**
- * this will be used to apply min max
- * this will be useful for dimension column which is on the right side
- * as node finder will always give tentative blocks, if column data stored individually
- * and data is in sorted order then we can check whether filter is in the range of min max or not
- * if it present then only we can apply filter on complete data.
- * this will be very useful in case of sparse data when rows are
- * repeating.
- */
- private boolean isMinMaxEnabled;
-
- private QueryStatisticsModel queryStatisticsModel;
-
- private boolean useBitSetPipeLine;
-
- public FilterScanner(BlockExecutionInfo blockExecutionInfo,
- QueryStatisticsModel queryStatisticsModel) {
- super(blockExecutionInfo);
- // to check whether min max is enabled or not
- String minMaxEnableValue = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
- CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
- if (null != minMaxEnableValue) {
- isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
- }
- // get the filter tree
- this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
- this.queryStatisticsModel = queryStatisticsModel;
-
- String useBitSetPipeLine = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.BITSET_PIPE_LINE,
- CarbonCommonConstants.BITSET_PIPE_LINE_DEFAULT);
- if (null != useBitSetPipeLine) {
- this.useBitSetPipeLine = Boolean.parseBoolean(useBitSetPipeLine);
- }
- }
-
- /**
- * Below method will be used to process the block
- *
- * @param blocksChunkHolder block chunk holder which holds the data
- * @throws FilterUnsupportedException
- */
- @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
- throws IOException, FilterUnsupportedException {
- return fillScannedResult(blocksChunkHolder);
- }
-
- @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException {
- // adding statistics for number of pages
- QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
- totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
- totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages());
- // apply min max
- if (isMinMaxEnabled) {
- BitSet bitSet = null;
- // check for implicit include filter instance
- if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
- String blockletId = blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR
- + blocksChunkHolder.getDataBlock().blockletId();
- bitSet = ((ImplicitColumnFilterExecutor) filterExecuter)
- .isFilterValuesPresentInBlockOrBlocklet(
- blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
- blocksChunkHolder.getDataBlock().getColumnsMinValue(), blockletId);
- } else {
- bitSet = this.filterExecuter
- .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
- blocksChunkHolder.getDataBlock().getColumnsMinValue());
- }
- if (bitSet.isEmpty()) {
- CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
- blocksChunkHolder.getMeasureRawDataChunk());
- return false;
- }
- }
- return true;
- }
-
- @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException {
- long startTime = System.currentTimeMillis();
- this.filterExecuter.readBlocks(blocksChunkHolder);
- // adding statistics for carbon read time
- QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
- readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
- readTime.getCount() + (System.currentTimeMillis() - startTime));
- }
-
- /**
- * This method will process the data in below order
- * 1. first apply min max on the filter tree and check whether any of the filter
- * is fall on the range of min max, if not then return empty result
- * 2. If filter falls on min max range then apply filter on actual
- * data and get the filtered row index
- * 3. if row index is empty then return the empty result
- * 4. if row indexes is not empty then read only those blocks(measure or dimension)
- * which was present in the query but not present in the filter, as while applying filter
- * some of the blocks where already read and present in chunk holder so not need to
- * read those blocks again, this is to avoid reading of same blocks which was already read
- * 5. Set the blocks and filter indexes to result
- *
- * @param blocksChunkHolder
- * @throws FilterUnsupportedException
- */
- private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHolder)
- throws FilterUnsupportedException, IOException {
- long startTime = System.currentTimeMillis();
- QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
- totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
- totalBlockletStatistic.getCount() + 1);
- // apply filter on actual data
- BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder, useBitSetPipeLine);
- // if indexes is empty then return with empty result
- if (bitSetGroup.isEmpty()) {
- CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(),
- blocksChunkHolder.getMeasureRawDataChunk());
-
- QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
- scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
- scanTime.getCount() + (System.currentTimeMillis() - startTime));
-
- QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.PAGE_SCANNED);
- scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
- scannedPages.getCount() + bitSetGroup.getScannedPages());
- return createEmptyResult();
- }
-
- AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
- scannedResult.setBlockletId(
- blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder
- .getDataBlock().blockletId());
- // valid scanned blocklet
- QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
- validScannedBlockletStatistic
- .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
- validScannedBlockletStatistic.getCount() + 1);
- // adding statistics for valid number of pages
- QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.VALID_PAGE_SCANNED);
- validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED,
- validPages.getCount() + bitSetGroup.getValidPages());
- QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.PAGE_SCANNED);
- scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED,
- scannedPages.getCount() + bitSetGroup.getScannedPages());
- int[] rowCount = new int[bitSetGroup.getNumberOfPages()];
- // get the row indexes from bot set
- int[][] indexesGroup = new int[bitSetGroup.getNumberOfPages()][];
- for (int k = 0; k < indexesGroup.length; k++) {
- BitSet bitSet = bitSetGroup.getBitSet(k);
- if (bitSet != null && !bitSet.isEmpty()) {
- int[] indexes = new int[bitSet.cardinality()];
- int index = 0;
- for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
- indexes[index++] = i;
- }
- rowCount[k] = indexes.length;
- indexesGroup[k] = indexes;
- }
- }
- FileHolder fileReader = blocksChunkHolder.getFileReader();
- int[][] allSelectedDimensionBlocksIndexes =
- blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
- long dimensionReadTime = System.currentTimeMillis();
- DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
- .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
- dimensionReadTime = System.currentTimeMillis() - dimensionReadTime;
-
- DimensionRawColumnChunk[] dimensionRawColumnChunks =
- new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
- // read dimension chunk blocks from file which is not present
- for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
- if (null != blocksChunkHolder.getDimensionRawDataChunk()[i]) {
- dimensionRawColumnChunks[i] = blocksChunkHolder.getDimensionRawDataChunk()[i];
- }
- }
- for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
- for (int j = allSelectedDimensionBlocksIndexes[i][0];
- j <= allSelectedDimensionBlocksIndexes[i][1]; j++) {
- dimensionRawColumnChunks[j] = projectionListDimensionChunk[j];
- }
- }
- long dimensionReadTime1 = System.currentTimeMillis();
- /**
- * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk
- * then loading them
- */
- int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes();
- int projectionListDimensionIndexesLength = projectionListDimensionIndexes.length;
- for (int i = 0; i < projectionListDimensionIndexesLength; i++) {
- if (null == dimensionRawColumnChunks[projectionListDimensionIndexes[i]]) {
- dimensionRawColumnChunks[projectionListDimensionIndexes[i]] =
- blocksChunkHolder.getDataBlock()
- .getDimensionChunk(fileReader, projectionListDimensionIndexes[i]);
- }
- }
- dimensionReadTime += (System.currentTimeMillis() - dimensionReadTime1);
- dimensionReadTime1 = System.currentTimeMillis();
- MeasureRawColumnChunk[] measureRawColumnChunks =
- new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
- int[][] allSelectedMeasureBlocksIndexes =
- blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
- MeasureRawColumnChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
- .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
- dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1;
- // read the measure chunk blocks which is not present
- for (int i = 0; i < measureRawColumnChunks.length; i++) {
- if (null != blocksChunkHolder.getMeasureRawDataChunk()[i]) {
- measureRawColumnChunks[i] = blocksChunkHolder.getMeasureRawDataChunk()[i];
- }
- }
- for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
- for (int j = allSelectedMeasureBlocksIndexes[i][0];
- j <= allSelectedMeasureBlocksIndexes[i][1]; j++) {
- measureRawColumnChunks[j] = projectionListMeasureChunk[j];
- }
- }
- dimensionReadTime1 = System.currentTimeMillis();
- /**
- * in case projection if the projected measure are not loaded in the ColumnPage
- * then loading them
- */
- int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes();
- int projectionListMeasureIndexesLength = projectionListMeasureIndexes.length;
- for (int i = 0; i < projectionListMeasureIndexesLength; i++) {
- if (null == measureRawColumnChunks[projectionListMeasureIndexes[i]]) {
- measureRawColumnChunks[projectionListMeasureIndexes[i]] = blocksChunkHolder.getDataBlock()
- .getMeasureChunk(fileReader, projectionListMeasureIndexes[i]);
- }
- }
- dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1;
- DimensionColumnDataChunk[][] dimensionColumnDataChunks =
- new DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length];
- ColumnPage[][] columnPages =
- new ColumnPage[measureRawColumnChunks.length][indexesGroup.length];
- for (int i = 0; i < dimensionRawColumnChunks.length; i++) {
- if (dimensionRawColumnChunks[i] != null) {
- for (int j = 0; j < indexesGroup.length; j++) {
- dimensionColumnDataChunks[i][j] = dimensionRawColumnChunks[i].convertToDimColDataChunk(j);
- }
- }
- }
- for (int i = 0; i < measureRawColumnChunks.length; i++) {
- if (measureRawColumnChunks[i] != null) {
- for (int j = 0; j < indexesGroup.length; j++) {
- columnPages[i][j] = measureRawColumnChunks[i].convertToColumnPage(j);
- }
- }
- }
- scannedResult.setDimensionChunks(dimensionColumnDataChunks);
- scannedResult.setIndexes(indexesGroup);
- scannedResult.setMeasureChunks(columnPages);
- scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks);
- scannedResult.setMsrRawColumnChunks(measureRawColumnChunks);
- scannedResult.setNumberOfRows(rowCount);
- // adding statistics for carbon scan time
- QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);
- scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME,
- scanTime.getCount() + (System.currentTimeMillis() - startTime - dimensionReadTime));
- QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
- .get(QueryStatisticsConstants.READ_BLOCKlET_TIME);
- readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME,
- readTime.getCount() + dimensionReadTime);
- return scannedResult;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
deleted file mode 100644
index 1373ed5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.scanner.impl;
-
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-
-/**
- * Non filter processor which will be used for non filter query
- * In case of non filter query we just need to read all the blocks requested in the
- * query and pass it to scanned result
- */
-public class NonFilterScanner extends AbstractBlockletScanner {
-
- public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
- QueryStatisticsModel queryStatisticsModel) {
- super(blockExecutionInfo);
- super.queryStatisticsModel = queryStatisticsModel;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 2f981b5..6faae03 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -30,22 +30,17 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
* to store key which is generated using
* key generator
*/
- protected byte[] dictionaryKey;
+ private byte[] dictionaryKey;
/**
* to store no dictionary column data
*/
- protected byte[][] complexTypesKeys;
+ private byte[][] complexTypesKeys;
/**
* to store no dictionary column data
*/
- protected byte[][] noDictionaryKeys;
-
- /**
- * contains value of implicit columns in byte array format
- */
- protected byte[] implicitColumnByteArray;
+ private byte[][] noDictionaryKeys;
public ByteArrayWrapper() {
}
@@ -91,16 +86,6 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
}
/**
- * to get the no dictionary column data
- *
- * @param index of the no dictionary key
- * @return no dictionary key for the index
- */
- public byte[] getComplexTypeByIndex(int index) {
- return this.complexTypesKeys[index];
- }
-
- /**
* to generate the hash code
*/
@Override public int hashCode() {
@@ -201,30 +186,10 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
}
/**
- * @return the complexTypesKeys
- */
- public byte[][] getComplexTypesKeys() {
- return complexTypesKeys;
- }
-
- /**
* @param complexTypesKeys the complexTypesKeys to set
*/
public void setComplexTypesKeys(byte[][] complexTypesKeys) {
this.complexTypesKeys = complexTypesKeys;
}
- /**
- * @return
- */
- public byte[] getImplicitColumnByteArray() {
- return implicitColumnByteArray;
- }
-
- /**
- * @param implicitColumnByteArray
- */
- public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
- this.implicitColumnByteArray = implicitColumnByteArray;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
deleted file mode 100644
index 8a37d01..0000000
--- a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.stats;
-
-import java.io.Serializable;
-
-public class PartitionStatistic implements Serializable {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
index 55f0882..ed60d37 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
@@ -37,7 +37,4 @@ public class QueryStatisticsModel {
return statisticsTypeAndObjMap;
}
- public void setStatisticsTypeAndObjMap(Map<String, QueryStatistic> statisticsTypeAndObjMap) {
- this.statisticsTypeAndObjMap = statisticsTypeAndObjMap;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 73a665d..d0c8e93 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -341,15 +341,6 @@ public class LoadMetadataDetails implements Serializable {
}
/**
- * To get isDeleted property.
- *
- * @return isDeleted
- */
- public String getIsDeleted() {
- return isDeleted;
- }
-
- /**
* To set isDeleted property.
*
* @param isDeleted
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 6af0304..e02f246 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -652,22 +652,6 @@ public class SegmentStatusManager {
return "";
}
- /**
- * getting the task numbers present in the segment.
- * @param segmentId
- * @return
- */
- public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
- updateStatusManager) {
- List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
- for (String eachFileName : list) {
- taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
- }
- return taskList;
- }
-
-
public static class ValidAndInvalidSegmentsInfo {
private final List<String> listOfValidSegments;
private final List<String> listOfValidUpdatedSegments;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index d4ef5c6..66f7a12 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.TupleIdEnum;
import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -140,14 +139,6 @@ public class SegmentUpdateStatusManager {
}
/**
- *
- * @param loadMetadataDetails
- */
- public void setLoadMetadataDetails(LoadMetadataDetails[] loadMetadataDetails) {
- this.segmentDetails = loadMetadataDetails;
- }
-
- /**
* Returns the UpdateStatus Details.
* @return
*/
@@ -174,18 +165,6 @@ public class SegmentUpdateStatusManager {
}
/**
- * Returns all delete delta files of specified block
- *
- * @param tupleId
- * @return
- * @throws Exception
- */
- public List<String> getDeleteDeltaFiles(String tupleId) throws Exception {
- return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
- }
-
-
- /**
* Returns all update delta files of specified Segment.
*
* @param segmentId
@@ -248,20 +227,6 @@ public class SegmentUpdateStatusManager {
}
/**
- * Returns all deleted records of specified block
- *
- * @param tupleId
- * @return
- * @throws Exception
- */
- public Map<Integer, Integer[]> getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception {
- List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
- CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
- String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID);
- return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId);
- }
-
- /**
* Below method will be used to get all the delete delta files based on block name
*
* @param blockFilePath actual block filePath
@@ -775,41 +740,6 @@ public class SegmentUpdateStatusManager {
}
/**
- * compares passed time stamp with status file delete timestamp and
- * returns latest timestamp from status file if both are not equal
- * returns null otherwise
- *
- * @param completeBlockName
- * @param timestamp
- * @return
- */
- public String getTimestampForRefreshCache(String completeBlockName, String timestamp) {
- long cacheTimestamp = 0;
- if (null != timestamp) {
- cacheTimestamp = CarbonUpdateUtil.getTimeStampAsLong(timestamp);
- }
- String blockName = CarbonTablePath.addDataPartPrefix(CarbonUpdateUtil.getBlockName(
- CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.BLOCK_ID)));
- String segmentId =
- CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.SEGMENT_ID);
- SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
- readLoadMetadata();
- for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
- if (segmentId.equalsIgnoreCase(block.getSegmentName()) &&
- block.getBlockName().equalsIgnoreCase(blockName) &&
- !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
- long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong();
- if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) {
- return null;
- } else {
- return block.getDeleteDeltaEndTimestamp();
- }
- }
- }
- return null;
- }
-
- /**
* This method closes the streams
*
* @param streams - streams to close.
@@ -828,85 +758,7 @@ public class SegmentUpdateStatusManager {
}
}
}
- /**
- * Get the invalid tasks in that segment.
- * @param segmentId
- * @return
- */
- public List<String> getInvalidBlockList(String segmentId) {
-
- // get the original fact file timestamp from the table status file.
- List<String> listOfInvalidBlocks = new ArrayList<String>();
- SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier);
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
- LoadMetadataDetails[] segmentDetails =
- ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
- long timestampOfOriginalFacts = 0;
-
- String startTimestampOfUpdate = "" ;
- String endTimestampOfUpdate = "";
-
- for (LoadMetadataDetails segment : segmentDetails) {
- // find matching segment and return timestamp.
- if (segment.getLoadName().equalsIgnoreCase(segmentId)) {
- timestampOfOriginalFacts = segment.getLoadStartTime();
- startTimestampOfUpdate = segment.getUpdateDeltaStartTimestamp();
- endTimestampOfUpdate = segment.getUpdateDeltaEndTimestamp();
- }
- }
-
- if (startTimestampOfUpdate.isEmpty()) {
- return listOfInvalidBlocks;
-
- }
-
- // now after getting the original fact timestamp, what ever is remaining
- // files need to cross check it with table status file.
-
- // filter out the fact files.
-
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
- CarbonFile segDir =
- FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-
- final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimestampOfUpdate);
- final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimestampOfUpdate);
- final Long timeStampOriginalFactFinal =
- timestampOfOriginalFacts;
-
- CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile pathName) {
- String fileName = pathName.getName();
- if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
- String firstPart = fileName.substring(0, fileName.indexOf('.'));
-
- long timestamp = Long.parseLong(firstPart
- .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
- firstPart.length()));
- if (Long.compare(timestamp, endTimeStampFinal) <= 0
- && Long.compare(timestamp, startTimeStampFinal) >= 0) {
- return false;
- }
- if (Long.compare(timestamp, timeStampOriginalFactFinal) == 0) {
- return false;
- }
- // take the rest of files as they are invalid.
- return true;
- }
- return false;
- }
- });
- // gather the task numbers.
- for (CarbonFile updateFiles : files) {
- listOfInvalidBlocks.add(updateFiles.getName());
- }
-
- return listOfInvalidBlocks;
- }
/**
* Returns the invalid timestamp range of a segment.
* @param segmentId
@@ -932,12 +784,11 @@ public class SegmentUpdateStatusManager {
}
/**
*
- * @param segmentId
* @param block
* @param needCompleteList
* @return
*/
- public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId,
+ public CarbonFile[] getDeleteDeltaInvalidFilesList(
final SegmentUpdateDetails block, final boolean needCompleteList,
CarbonFile[] allSegmentFiles, boolean isAbortedFile) {
@@ -983,12 +834,11 @@ public class SegmentUpdateStatusManager {
/**
*
- * @param blockName
* @param allSegmentFiles
* @return
*/
- public CarbonFile[] getAllBlockRelatedFiles(String blockName, CarbonFile[] allSegmentFiles,
- String actualBlockName) {
+ public CarbonFile[] getAllBlockRelatedFiles(CarbonFile[] allSegmentFiles,
+ String actualBlockName) {
List<CarbonFile> files = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (CarbonFile eachFile : allSegmentFiles) {