You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 16:39:02 UTC
[16/50] carbondata git commit: [CARBONDATA-2099] Refactor query scan
process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
new file mode 100644
index 0000000..fde4e55
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.scan.collector.ResultCollectorFactory;
+import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.scanner.BlockletScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner;
+import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.util.TaskMetricsMap;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public class DataBlockIterator extends CarbonIterator<List<Object[]>> {
+
+ /**
+ * iterator which will be used to iterate over blocklets
+ */
+ private BlockletIterator blockletIterator;
+
+ /**
+ * result collector which will be used to aggregate the scanned result
+ */
+ private ScannedResultCollector scannerResultAggregator;
+
+ /**
+ * processor which will be used to process the block processing can be
+ * filter processing or non filter processing
+ */
+ private BlockletScanner blockletScanner;
+
+ /**
+ * batch size of result
+ */
+ private int batchSize;
+
+ private ExecutorService executorService;
+
+ private Future<BlockletScannedResult> future;
+
+ private Future<RawBlockletColumnChunks> futureIo;
+
+ private BlockletScannedResult scannedResult;
+
+ private BlockExecutionInfo blockExecutionInfo;
+
+ private FileReader fileReader;
+
+ private AtomicBoolean nextBlock;
+
+ private AtomicBoolean nextRead;
+
+ public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader fileReader,
+ int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) {
+ this.blockExecutionInfo = blockExecutionInfo;
+ this.fileReader = fileReader;
+ blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+ blockExecutionInfo.getNumberOfBlockToScan());
+ if (blockExecutionInfo.getFilterExecuterTree() != null) {
+ blockletScanner = new BlockletFilterScanner(blockExecutionInfo, queryStatisticsModel);
+ } else {
+ blockletScanner = new BlockletFullScanner(blockExecutionInfo, queryStatisticsModel);
+ }
+ this.scannerResultAggregator =
+ ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo);
+ this.batchSize = batchSize;
+ this.executorService = executorService;
+ this.nextBlock = new AtomicBoolean(false);
+ this.nextRead = new AtomicBoolean(false);
+ }
+
+ @Override
+ public List<Object[]> next() {
+ List<Object[]> collectedResult = null;
+ if (updateScanner()) {
+ collectedResult = this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize);
+ while (collectedResult.size() < batchSize && updateScanner()) {
+ List<Object[]> data = this.scannerResultAggregator
+ .collectResultInRow(scannedResult, batchSize - collectedResult.size());
+ collectedResult.addAll(data);
+ }
+ } else {
+ collectedResult = new ArrayList<>();
+ }
+ return collectedResult;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (scannedResult != null && scannedResult.hasNext()) {
+ return true;
+ } else {
+ if (null != scannedResult) {
+ scannedResult.freeMemory();
+ }
+ return blockletIterator.hasNext() || nextBlock.get() || nextRead.get();
+ }
+ }
+
+ /**
+ * Return true if scan result if non-empty
+ */
+ private boolean updateScanner() {
+ try {
+ if (scannedResult != null && scannedResult.hasNext()) {
+ return true;
+ } else {
+ scannedResult = processNextBlocklet();
+ while (scannedResult != null) {
+ if (scannedResult.hasNext()) {
+ return true;
+ }
+ scannedResult = processNextBlocklet();
+ }
+ nextBlock.set(false);
+ nextRead.set(false);
+ return false;
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private BlockletScannedResult processNextBlocklet() throws Exception {
+ BlockletScannedResult result = null;
+ if (blockExecutionInfo.isPrefetchBlocklet()) {
+ if (blockletIterator.hasNext() || nextBlock.get() || nextRead.get()) {
+ if (future == null) {
+ future = scanNextBlockletAsync();
+ }
+ result = future.get();
+ nextBlock.set(false);
+ if (blockletIterator.hasNext() || nextRead.get()) {
+ nextBlock.set(true);
+ future = scanNextBlockletAsync();
+ }
+ }
+ } else {
+ if (blockletIterator.hasNext()) {
+ RawBlockletColumnChunks rawChunks = readNextBlockletColumnChunks();
+ if (rawChunks != null) {
+ result = blockletScanner.scanBlocklet(rawChunks);
+ }
+ }
+ }
+ return result;
+ }
+
+ private RawBlockletColumnChunks readNextBlockletColumnChunks() throws IOException {
+ RawBlockletColumnChunks rawBlockletColumnChunks = getNextBlockletColumnChunks();
+ if (rawBlockletColumnChunks != null) {
+ blockletScanner.readBlocklet(rawBlockletColumnChunks);
+ return rawBlockletColumnChunks;
+ }
+ return null;
+ }
+
+ private RawBlockletColumnChunks getNextBlockletColumnChunks() {
+ RawBlockletColumnChunks rawBlockletColumnChunks = null;
+ do {
+ DataRefNode dataBlock = blockletIterator.next();
+ if (dataBlock.getColumnsMaxValue() == null || blockletScanner.isScanRequired(dataBlock)) {
+ rawBlockletColumnChunks = RawBlockletColumnChunks.newInstance(
+ blockExecutionInfo.getTotalNumberDimensionToRead(),
+ blockExecutionInfo.getTotalNumberOfMeasureToRead(), fileReader, dataBlock);
+ }
+ } while (rawBlockletColumnChunks == null && blockletIterator.hasNext());
+ return rawBlockletColumnChunks;
+ }
+
+ private Future<BlockletScannedResult> scanNextBlockletAsync() {
+ return executorService.submit(new Callable<BlockletScannedResult>() {
+ @Override public BlockletScannedResult call() throws Exception {
+ if (futureIo == null) {
+ futureIo = readNextBlockletAsync();
+ }
+ RawBlockletColumnChunks rawBlockletColumnChunks = futureIo.get();
+ futureIo = null;
+ nextRead.set(false);
+ if (rawBlockletColumnChunks != null) {
+ if (blockletIterator.hasNext()) {
+ nextRead.set(true);
+ futureIo = readNextBlockletAsync();
+ }
+ return blockletScanner.scanBlocklet(rawBlockletColumnChunks);
+ }
+ return null;
+ }
+ });
+ }
+
+ private Future<RawBlockletColumnChunks> readNextBlockletAsync() {
+ return executorService.submit(new Callable<RawBlockletColumnChunks>() {
+ @Override public RawBlockletColumnChunks call() throws Exception {
+ try {
+ TaskMetricsMap.getInstance().registerThreadCallback();
+ if (blockletIterator.hasNext()) {
+ return readNextBlockletColumnChunks();
+ } else {
+ return null;
+ }
+ } finally {
+ // update read bytes metrics for this thread
+ TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId());
+ }
+ }
+ });
+ }
+
+ public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+ if (updateScanner()) {
+ this.scannerResultAggregator.collectResultInColumnarBatch(scannedResult, columnarBatch);
+ }
+ }
+
+
+ /**
+ * Close the resources
+ */
+ public void close() {
+ // free the current scanned result
+ if (null != scannedResult && !scannedResult.hasNext()) {
+ scannedResult.freeMemory();
+ }
+ // free any pre-fetched memory if present
+ if (null != future) {
+ try {
+ BlockletScannedResult blockletScannedResult = future.get();
+ if (blockletScannedResult != null) {
+ blockletScannedResult.freeMemory();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
new file mode 100644
index 0000000..6b7e880
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.processor;
+
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.util.BitSetGroup;
+
+/**
+ * Contains dimension and measure raw column chunks of one blocklet
+ */
+public class RawBlockletColumnChunks {
+
+ /**
+ * dimension column data chunk
+ */
+ private DimensionRawColumnChunk[] dimensionRawColumnChunks;
+
+ /**
+ * measure column data chunk
+ */
+ private MeasureRawColumnChunk[] measureRawColumnChunks;
+
+ /**
+ * file reader which will use to read the block from file
+ */
+ private FileReader fileReader;
+
+ /**
+ * data block
+ */
+ private DataRefNode dataBlock;
+
+ private BitSetGroup bitSetGroup;
+
+ private RawBlockletColumnChunks() { }
+
+ public static RawBlockletColumnChunks newInstance(int numberOfDimensionChunk,
+ int numberOfMeasureChunk, FileReader fileReader, DataRefNode dataBlock) {
+ RawBlockletColumnChunks instance = new RawBlockletColumnChunks();
+ instance.dimensionRawColumnChunks = new DimensionRawColumnChunk[numberOfDimensionChunk];
+ instance.measureRawColumnChunks = new MeasureRawColumnChunk[numberOfMeasureChunk];
+ instance.fileReader = fileReader;
+ instance.dataBlock = dataBlock;
+ return instance;
+ }
+
+ /**
+ * @return the dimensionRawColumnChunks
+ */
+ public DimensionRawColumnChunk[] getDimensionRawColumnChunks() {
+ return dimensionRawColumnChunks;
+ }
+
+ /**
+ * @param dimensionRawColumnChunks the dimensionRawColumnChunks to set
+ */
+ public void setDimensionRawColumnChunks(DimensionRawColumnChunk[] dimensionRawColumnChunks) {
+ this.dimensionRawColumnChunks = dimensionRawColumnChunks;
+ }
+
+ /**
+ * @return the measureRawColumnChunks
+ */
+ public MeasureRawColumnChunk[] getMeasureRawColumnChunks() {
+ return measureRawColumnChunks;
+ }
+
+ /**
+ * @param measureRawColumnChunks the measureRawColumnChunks to set
+ */
+ public void setMeasureRawColumnChunks(MeasureRawColumnChunk[] measureRawColumnChunks) {
+ this.measureRawColumnChunks = measureRawColumnChunks;
+ }
+
+ /**
+ * @return the fileReader
+ */
+ public FileReader getFileReader() {
+ return fileReader;
+ }
+
+ /**
+ * @return the dataBlock
+ */
+ public DataRefNode getDataBlock() {
+ return dataBlock;
+ }
+
+ public BitSetGroup getBitSetGroup() {
+ return bitSetGroup;
+ }
+
+ public void setBitSetGroup(BitSetGroup bitSetGroup) {
+ this.bitSetGroup = bitSetGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
deleted file mode 100644
index 1c97725..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.processor.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
-import org.apache.carbondata.core.stats.QueryStatisticsModel;
-
-/**
- * Below class will be used to process the block for detail query
- */
-public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
- /**
- * DataBlockIteratorImpl Constructor
- *
- * @param blockExecutionInfo execution information
- */
- public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader,
- int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) {
- super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, executorService);
- }
-
- /**
- * It scans the block and returns the result with @batchSize
- *
- * @return Result of @batchSize
- */
- public List<Object[]> next() {
- List<Object[]> collectedResult = null;
- if (updateScanner()) {
- collectedResult = this.scannerResultAggregator.collectData(scannedResult, batchSize);
- while (collectedResult.size() < batchSize && updateScanner()) {
- List<Object[]> data = this.scannerResultAggregator
- .collectData(scannedResult, batchSize - collectedResult.size());
- collectedResult.addAll(data);
- }
- } else {
- collectedResult = new ArrayList<>();
- }
- return collectedResult;
- }
-
- public void processNextBatch(CarbonColumnarBatch columnarBatch) {
- if (updateScanner()) {
- this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
deleted file mode 100644
index b089fad..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java
+++ /dev/null
@@ -1,698 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.result;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.mutate.DeleteDeltaVo;
-import org.apache.carbondata.core.mutate.TupleIdEnum;
-import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
-import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * Scanned result class which will store and provide the result on request
- */
-public abstract class AbstractScannedResult {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(AbstractScannedResult.class.getName());
- /**
- * current row number
- */
- protected int currentRow = -1;
-
- protected int pageCounter;
- /**
- * row mapping indexes
- */
- protected int[][] rowMapping;
- /**
- * key size of the fixed length column
- */
- private int fixedLengthKeySize;
- /**
- * total number of rows per page
- */
- private int[] numberOfRows;
-
- /**
- * Total number of rows.
- */
- private int totalNumberOfRows;
- /**
- * to keep track of number of rows process
- */
- protected int rowCounter;
- /**
- * dimension column data chunk
- */
- protected DimensionColumnDataChunk[][] dimensionDataChunks;
-
- /**
- * Raw dimension chunks;
- */
- protected DimensionRawColumnChunk[] dimRawColumnChunks;
-
- /**
- * Raw dimension chunks;
- */
- protected MeasureRawColumnChunk[] msrRawColumnChunks;
- /**
- * measure column data chunk
- */
- protected ColumnPage[][] measureDataChunks;
- /**
- * dictionary column block index in file
- */
- protected int[] dictionaryColumnBlockIndexes;
-
- /**
- * no dictionary column block index in file
- */
- protected int[] noDictionaryColumnBlockIndexes;
-
- /**
- * column group to is key structure info
- * which will be used to get the key from the complete
- * column group key
- * For example if only one dimension of the column group is selected
- * then from complete column group key it will be used to mask the key and
- * get the particular column key
- */
- protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
-
- /**
- *
- */
- private Map<Integer, GenericQueryType> complexParentIndexToQueryMap;
-
- private int totalDimensionsSize;
-
- /**
- * blockedId which will be blockId + blocklet number in the block
- */
- private String blockletId;
-
- private long rowId;
-
- /**
- * parent block indexes
- */
- private int[] complexParentBlockIndexes;
-
- /**
- * blockletid+pageumber to deleted reocrd map
- */
- private Map<String, DeleteDeltaVo> deletedRecordMap;
-
- /**
- * current page delete delta vo
- */
- private DeleteDeltaVo currentDeleteDeltaVo;
-
- /**
- * actual blocklet number
- */
- private String blockletNumber;
-
- public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) {
- this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
- this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes();
- this.dictionaryColumnBlockIndexes = blockExecutionInfo.getDictionaryColumnBlockIndex();
- this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo();
- this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap();
- this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes();
- this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length;
- this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
- }
-
- /**
- * Below method will be used to set the dimension chunks
- * which will be used to create a row
- *
- * @param dataChunks dimension chunks used in query
- */
- public void setDimensionChunks(DimensionColumnDataChunk[][] dataChunks) {
- this.dimensionDataChunks = dataChunks;
- }
-
- /**
- * Below method will be used to set the measure column chunks
- *
- * @param measureDataChunks measure data chunks
- */
- public void setMeasureChunks(ColumnPage[][] measureDataChunks) {
- this.measureDataChunks = measureDataChunks;
- }
-
- public void setDimRawColumnChunks(DimensionRawColumnChunk[] dimRawColumnChunks) {
- this.dimRawColumnChunks = dimRawColumnChunks;
- }
-
- public void setMsrRawColumnChunks(MeasureRawColumnChunk[] msrRawColumnChunks) {
- this.msrRawColumnChunks = msrRawColumnChunks;
- }
-
- /**
- * Below method will be used to get the chunk based in measure ordinal
- *
- * @param ordinal measure ordinal
- * @return measure column chunk
- */
- public ColumnPage getMeasureChunk(int ordinal) {
- return measureDataChunks[ordinal][pageCounter];
- }
-
- /**
- * Below method will be used to get the key for all the dictionary dimensions
- * which is present in the query
- *
- * @param rowId row id selected after scanning
- * @return return the dictionary key
- */
- protected byte[] getDictionaryKeyArray(int rowId) {
- byte[] completeKey = new byte[fixedLengthKeySize];
- int offset = 0;
- for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- offset += dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
- .fillChunkData(completeKey, offset, rowId,
- columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
- }
- rowCounter++;
- return completeKey;
- }
-
- /**
- * Below method will be used to get the key for all the dictionary dimensions
- * in integer array format which is present in the query
- *
- * @param rowId row id selected after scanning
- * @return return the dictionary key
- */
- protected int[] getDictionaryKeyIntegerArray(int rowId) {
- int[] completeKey = new int[totalDimensionsSize];
- int column = 0;
- for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- column = dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
- .fillConvertedChunkData(rowId, column, completeKey,
- columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
- }
- rowCounter++;
- return completeKey;
- }
-
- /**
- * Fill the column data of dictionary to vector
- */
- public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
- int column = 0;
- for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- column = dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
- .fillConvertedChunkData(vectorInfo, column,
- columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
- }
- }
-
- /**
- * Fill the column data to vector
- */
- public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
- int column = 0;
- for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- column = dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
- .fillConvertedChunkData(vectorInfo, column,
- columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
- }
- }
-
- /**
- * Fill the measure column data to vector
- */
- public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
- for (int i = 0; i < measuresOrdinal.length; i++) {
- vectorInfo[i].measureVectorFiller
- .fillMeasureVector(measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
- }
- }
-
- public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
- for (int i = 0; i < vectorInfos.length; i++) {
- int offset = vectorInfos[i].offset;
- int len = offset + vectorInfos[i].size;
- int vectorOffset = vectorInfos[i].vectorOffset;
- CarbonColumnVector vector = vectorInfos[i].vector;
- for (int j = offset; j < len; j++) {
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- DataOutputStream dataOutput = new DataOutputStream(byteStream);
- try {
- vectorInfos[i].genericQueryType
- .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks,
- rowMapping == null ? j : rowMapping[pageCounter][j], pageCounter, dataOutput);
- Object data = vectorInfos[i].genericQueryType
- .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
- vector.putObject(vectorOffset++, data);
- } catch (IOException e) {
- LOGGER.error(e);
- } finally {
- CarbonUtil.closeStreams(dataOutput);
- CarbonUtil.closeStreams(byteStream);
- }
- }
- }
- }
-
- /**
- * Fill the column data to vector
- */
- public void fillColumnarImplicitBatch(ColumnVectorInfo[] vectorInfo) {
- for (int i = 0; i < vectorInfo.length; i++) {
- ColumnVectorInfo columnVectorInfo = vectorInfo[i];
- CarbonColumnVector vector = columnVectorInfo.vector;
- int offset = columnVectorInfo.offset;
- int vectorOffset = columnVectorInfo.vectorOffset;
- int len = offset + columnVectorInfo.size;
- for (int j = offset; j < len; j++) {
- // Considering only String case now as we support only
- String data = getBlockletId();
- if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
- .equals(columnVectorInfo.dimension.getColumnName())) {
- data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter
- + CarbonCommonConstants.FILE_SEPARATOR + (rowMapping == null ?
- j :
- rowMapping[pageCounter][j]);
- }
- vector.putBytes(vectorOffset++,
- data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
- }
- }
- }
-
- /**
- * Just increment the counter incase of query only on measures.
- */
- public void incrementCounter() {
- rowCounter++;
- currentRow++;
- }
-
- /**
- * Just increment the page counter and reset the remaining counters.
- */
- public void incrementPageCounter() {
- rowCounter = 0;
- currentRow = -1;
- pageCounter++;
- fillDataChunks();
- if (null != deletedRecordMap) {
- currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
- }
- }
-
- /**
- * This case is used only in case of compaction, since it does not use filter flow.
- */
- public void fillDataChunks() {
- freeDataChunkMemory();
- if (pageCounter >= numberOfRows.length) {
- return;
- }
- for (int i = 0; i < dimensionDataChunks.length; i++) {
- if (dimensionDataChunks[i][pageCounter] == null && dimRawColumnChunks[i] != null) {
- dimensionDataChunks[i][pageCounter] =
- dimRawColumnChunks[i].convertToDimColDataChunkWithOutCache(pageCounter);
- }
- }
-
- for (int i = 0; i < measureDataChunks.length; i++) {
- if (measureDataChunks[i][pageCounter] == null && msrRawColumnChunks[i] != null) {
- measureDataChunks[i][pageCounter] =
- msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter);
- }
- }
- }
-
- // free the memory for the last page chunk
- private void freeDataChunkMemory() {
- for (int i = 0; i < dimensionDataChunks.length; i++) {
- if (pageCounter > 0 && dimensionDataChunks[i][pageCounter - 1] != null) {
- dimensionDataChunks[i][pageCounter - 1].freeMemory();
- dimensionDataChunks[i][pageCounter - 1] = null;
- }
- }
- for (int i = 0; i < measureDataChunks.length; i++) {
- if (pageCounter > 0 && measureDataChunks[i][pageCounter - 1] != null) {
- measureDataChunks[i][pageCounter - 1].freeMemory();
- measureDataChunks[i][pageCounter - 1] = null;
- }
- }
- }
-
- public int numberOfpages() {
- return numberOfRows.length;
- }
-
- /**
- * Get total rows in the current page
- *
- * @return
- */
- public int getCurrentPageRowCount() {
- return numberOfRows[pageCounter];
- }
-
- public int getCurrentPageCounter() {
- return pageCounter;
- }
-
- /**
- * increment the counter.
- */
- public void setRowCounter(int rowCounter) {
- this.rowCounter = rowCounter;
- }
-
- /**
- * Below method will be used to get the dimension data based on dimension
- * ordinal and index
- *
- * @param dimOrdinal dimension ordinal present in the query
- * @param rowId row index
- * @return dimension data based on row id
- */
- protected byte[] getDimensionData(int dimOrdinal, int rowId) {
- return dimensionDataChunks[dimOrdinal][pageCounter].getChunkData(rowId);
- }
-
- /**
- * Below method will be used to get the dimension key array
- * for all the no dictionary dimension present in the query
- *
- * @param rowId row number
- * @return no dictionary keys for all no dictionary dimension
- */
- protected byte[][] getNoDictionaryKeyArray(int rowId) {
- byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnBlockIndexes.length][];
- int position = 0;
- for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- noDictionaryColumnsKeys[position++] =
- dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId);
- }
- return noDictionaryColumnsKeys;
- }
-
- /**
- * Below method will be used to get the dimension key array
- * for all the no dictionary dimension present in the query
- *
- * @param rowId row number
- * @return no dictionary keys for all no dictionary dimension
- */
- protected String[] getNoDictionaryKeyStringArray(int rowId) {
- String[] noDictionaryColumnsKeys = new String[noDictionaryColumnBlockIndexes.length];
- int position = 0;
- for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- noDictionaryColumnsKeys[position++] = new String(
- dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId),
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
- }
- return noDictionaryColumnsKeys;
- }
-
- /**
- * @return blockletId
- */
- public String getBlockletId() {
- return blockletId;
- }
-
- /**
- * @param blockletId
- */
- public void setBlockletId(String blockletId) {
- this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
- blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
- // if deleted recors map is present for this block
- // then get the first page deleted vo
- if (null != deletedRecordMap) {
- currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
- }
- }
-
- /**
- * @return blockletId
- */
- public long getRowId() {
- return rowId;
- }
-
- /**
- * @param rowId
- */
- public void setRowId(long rowId) {
- this.rowId = rowId;
- }
-
- /**
- * Below method will be used to get the complex type keys array based
- * on row id for all the complex type dimension selected in query
- *
- * @param rowId row number
- * @return complex type key array for all the complex dimension selected in query
- */
- protected byte[][] getComplexTypeKeyArray(int rowId) {
- byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][];
- for (int i = 0; i < complexTypeData.length; i++) {
- GenericQueryType genericQueryType =
- complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- DataOutputStream dataOutput = new DataOutputStream(byteStream);
- try {
- genericQueryType
- .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter,
- dataOutput);
- complexTypeData[i] = byteStream.toByteArray();
- } catch (IOException e) {
- LOGGER.error(e);
- } finally {
- CarbonUtil.closeStreams(dataOutput);
- CarbonUtil.closeStreams(byteStream);
- }
- }
- return complexTypeData;
- }
-
- /**
- * @return return the total number of row after scanning
- */
- public int numberOfOutputRows() {
- return this.totalNumberOfRows;
- }
-
- /**
- * to check whether any more row is present in the result
- *
- * @return
- */
- public boolean hasNext() {
- if (pageCounter < numberOfRows.length && rowCounter < this.numberOfRows[pageCounter]) {
- return true;
- } else if (pageCounter < numberOfRows.length) {
- pageCounter++;
- fillDataChunks();
- rowCounter = 0;
- currentRow = -1;
- if (null != deletedRecordMap) {
- currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
- }
- return hasNext();
- }
- return false;
- }
-
- /**
- * Below method will be used to free the occupied memory
- */
- public void freeMemory() {
- // first free the dimension chunks
- if (null != dimensionDataChunks) {
- for (int i = 0; i < dimensionDataChunks.length; i++) {
- if (null != dimensionDataChunks[i]) {
- for (int j = 0; j < dimensionDataChunks[i].length; j++) {
- if (null != dimensionDataChunks[i][j]) {
- dimensionDataChunks[i][j].freeMemory();
- }
- }
- }
- }
- }
- // free the measure data chunks
- if (null != measureDataChunks) {
- for (int i = 0; i < measureDataChunks.length; i++) {
- if (null != measureDataChunks[i]) {
- for (int j = 0; j < measureDataChunks[i].length; j++) {
- if (null != measureDataChunks[i][j]) {
- measureDataChunks[i][j].freeMemory();
- }
- }
- }
- }
- }
- // free the raw chunks
- if (null != dimRawColumnChunks) {
- for (int i = 0; i < dimRawColumnChunks.length; i++) {
- if (null != dimRawColumnChunks[i]) {
- dimRawColumnChunks[i].freeMemory();
- }
- }
- }
- }
-
- /**
- * As this class will be a flyweight object so
- * for one block all the blocklet scanning will use same result object
- * in that case we need to reset the counter to zero so
- * for new result it will give the result from zero
- */
- public void reset() {
- rowCounter = 0;
- currentRow = -1;
- pageCounter = 0;
- }
-
- /**
- * @param numberOfRows set total of number rows valid after scanning
- */
- public void setNumberOfRows(int[] numberOfRows) {
- this.numberOfRows = numberOfRows;
-
- for (int count : numberOfRows) {
- totalNumberOfRows += count;
- }
- }
-
- /**
- * After applying filter it will return the bit set with the valid row indexes
- * so below method will be used to set the row indexes
- *
- * @param indexes
- */
- public void setIndexes(int[][] indexes) {
- this.rowMapping = indexes;
- }
-
- public int getRowCounter() {
- return rowCounter;
- }
-
- /**
- * will return the current valid row id
- *
- * @return valid row id
- */
- public abstract int getCurrentRowId();
-
- /**
- * @return dictionary key array for all the dictionary dimension
- * selected in query
- */
- public abstract byte[] getDictionaryKeyArray();
-
- /**
- * @return dictionary key array for all the dictionary dimension in integer array forat
- * selected in query
- */
- public abstract int[] getDictionaryKeyIntegerArray();
-
- /**
- * Below method will be used to get the complex type key array
- *
- * @return complex type key array
- */
- public abstract byte[][] getComplexTypeKeyArray();
-
- /**
- * Below method will be used to get the no dictionary key
- * array for all the no dictionary dimension selected in query
- *
- * @return no dictionary key array for all the no dictionary dimension
- */
- public abstract byte[][] getNoDictionaryKeyArray();
-
- /**
- * Below method will be used to get the no dictionary key
- * array in string array format for all the no dictionary dimension selected in query
- *
- * @return no dictionary key array for all the no dictionary dimension
- */
- public abstract String[] getNoDictionaryKeyStringArray();
-
- /**
- * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
- * @param columnarBatch
- * @param startRow
- * @param size
- * @param vectorOffset
- */
- public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
- int vectorOffset) {
- int rowsFiltered = 0;
- if (currentDeleteDeltaVo != null) {
- int len = startRow + size;
- for (int i = startRow; i < len; i++) {
- int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i;
- if (currentDeleteDeltaVo.containsRow(rowId)) {
- columnarBatch.markFiltered(vectorOffset);
- rowsFiltered++;
- }
- vectorOffset++;
- }
- }
- return rowsFiltered;
- }
-
- /**
- * Below method will be used to check row got deleted
- *
- * @param rowId
- * @return is present in deleted row
- */
- public boolean containsDeletedRow(int rowId) {
- if (null != currentDeleteDeltaVo) {
- return currentDeleteDeltaVo.containsRow(rowId);
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java
deleted file mode 100644
index 56ca2ac..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.scan.result;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.carbondata.common.CarbonIterator;
-
-/**
- * Below class holds the query result
- */
-public class BatchResult extends CarbonIterator<Object[]> {
-
- /**
- * list of keys
- */
- protected List<Object[]> rows;
-
- /**
- * counter to check whether all the records are processed or not
- */
- protected int counter;
-
- public BatchResult() {
- this.rows = new ArrayList<>();
- }
-
- /**
- * Below method will be used to get the rows
- *
- * @return
- */
- public List<Object[]> getRows() {
- return rows;
- }
-
- /**
- * Below method will be used to get the set the values
- *
- * @param rows
- */
- public void setRows(List<Object[]> rows) {
- this.rows = rows;
- }
-
- /**
- * This method will return one row at a time based on the counter given.
- * @param counter
- * @return
- */
- public Object[] getRawRow(int counter) {
- return rows.get(counter);
- }
-
- /**
- * For getting the total size.
- * @return
- */
- public int getSize() {
- return rows.size();
- }
-
-
- /**
- * Returns {@code true} if the iteration has more elements.
- *
- * @return {@code true} if the iteration has more elements
- */
- @Override public boolean hasNext() {
- return counter < rows.size();
- }
-
- /**
- * Returns the next element in the iteration.
- *
- * @return the next element in the iteration
- */
- @Override public Object[] next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- Object[] row = rows.get(counter);
- counter++;
- return row;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
new file mode 100644
index 0000000..29404b4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Scanned result class which will store and provide the result on request
+ */
+public abstract class BlockletScannedResult {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BlockletScannedResult.class.getName());
+ /**
+ * current row number
+ */
+ protected int currentRow = -1;
+
+ protected int pageCounter;
+ /**
+ * matched rowId for each page
+ */
+ protected int[][] pageFilteredRowId;
+ /**
+ * key size of the fixed length column
+ */
+ private int fixedLengthKeySize;
+ /**
+ * total number of filtered rows for each page
+ */
+ private int[] pageFilteredRowCount;
+
+ /**
+ * to keep track of number of rows process
+ */
+ protected int rowCounter;
+ /**
+ * dimension column data chunk
+ */
+ protected DimensionColumnPage[][] dimensionColumnPages;
+
+ /**
+ * Raw dimension chunks;
+ */
+ protected DimensionRawColumnChunk[] dimRawColumnChunks;
+
+ /**
+ * Raw dimension chunks;
+ */
+ protected MeasureRawColumnChunk[] msrRawColumnChunks;
+ /**
+ * measure column data chunk
+ */
+ protected ColumnPage[][] measureColumnPages;
+ /**
+ * dictionary column block index in file
+ */
+ protected int[] dictionaryColumnChunkIndexes;
+
+ /**
+ * no dictionary column chunk index in file
+ */
+ protected int[] noDictionaryColumnChunkIndexes;
+
+ /**
+ * column group to is key structure info
+ * which will be used to get the key from the complete
+ * column group key
+ * For example if only one dimension of the column group is selected
+ * then from complete column group key it will be used to mask the key and
+ * get the particular column key
+ */
+ protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
+
+ /**
+ *
+ */
+ private Map<Integer, GenericQueryType> complexParentIndexToQueryMap;
+
+ private int totalDimensionsSize;
+
+ /**
+ * blockedId which will be blockId + blocklet number in the block
+ */
+ private String blockletId;
+
+ /**
+ * parent block indexes
+ */
+ private int[] complexParentBlockIndexes;
+
+ /**
+ * blockletid+pageumber to deleted reocrd map
+ */
+ private Map<String, DeleteDeltaVo> deletedRecordMap;
+
+ /**
+ * current page delete delta vo
+ */
+ private DeleteDeltaVo currentDeleteDeltaVo;
+
+ /**
+ * actual blocklet number
+ */
+ private String blockletNumber;
+
+ public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo) {
+ this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
+ this.noDictionaryColumnChunkIndexes = blockExecutionInfo.getNoDictionaryColumnChunkIndexes();
+ this.dictionaryColumnChunkIndexes = blockExecutionInfo.getDictionaryColumnChunkIndex();
+ this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo();
+ this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap();
+ this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes();
+ this.totalDimensionsSize = blockExecutionInfo.getProjectionDimensions().length;
+ this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap();
+ }
+
+ /**
+ * Below method will be used to set the dimension chunks
+ * which will be used to create a row
+ *
+ * @param columnPages dimension chunks used in query
+ */
+ public void setDimensionColumnPages(DimensionColumnPage[][] columnPages) {
+ this.dimensionColumnPages = columnPages;
+ }
+
+ /**
+ * Below method will be used to set the measure column chunks
+ *
+ * @param columnPages measure data chunks
+ */
+ public void setMeasureColumnPages(ColumnPage[][] columnPages) {
+ this.measureColumnPages = columnPages;
+ }
+
+ public void setDimRawColumnChunks(DimensionRawColumnChunk[] dimRawColumnChunks) {
+ this.dimRawColumnChunks = dimRawColumnChunks;
+ }
+
+ public void setMsrRawColumnChunks(MeasureRawColumnChunk[] msrRawColumnChunks) {
+ this.msrRawColumnChunks = msrRawColumnChunks;
+ }
+
+ /**
+ * Below method will be used to get the chunk based in measure ordinal
+ *
+ * @param ordinal measure ordinal
+ * @return measure column chunk
+ */
+ public ColumnPage getMeasureChunk(int ordinal) {
+ return measureColumnPages[ordinal][pageCounter];
+ }
+
+ /**
+ * Below method will be used to get the key for all the dictionary dimensions
+ * which is present in the query
+ *
+ * @param rowId row id selected after scanning
+ * @return return the dictionary key
+ */
+ protected byte[] getDictionaryKeyArray(int rowId) {
+ byte[] completeKey = new byte[fixedLengthKeySize];
+ int offset = 0;
+ for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+ offset += dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter].fillRawData(
+ rowId, offset, completeKey,
+ columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+ }
+ rowCounter++;
+ return completeKey;
+ }
+
+ /**
+ * Below method will be used to get the key for all the dictionary dimensions
+ * in integer array format which is present in the query
+ *
+ * @param rowId row id selected after scanning
+ * @return return the dictionary key
+ */
+ protected int[] getDictionaryKeyIntegerArray(int rowId) {
+ int[] completeKey = new int[totalDimensionsSize];
+ int column = 0;
+ for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+ column = dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+ .fillSurrogateKey(rowId, column, completeKey,
+ columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+ }
+ rowCounter++;
+ return completeKey;
+ }
+
+ /**
+ * Fill the column data of dictionary to vector
+ */
+ public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+ int column = 0;
+ for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+ column = dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+ .fillVector(vectorInfo, column,
+ columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i]));
+ }
+ }
+
+ /**
+ * Fill the column data to vector
+ */
+ public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+ int column = 0;
+ for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+ column = dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
+ .fillVector(vectorInfo, column,
+ columnGroupKeyStructureInfo.get(noDictionaryColumnChunkIndexes[i]));
+ }
+ }
+
+ /**
+ * Fill the measure column data to vector
+ */
+ public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
+ for (int i = 0; i < measuresOrdinal.length; i++) {
+ vectorInfo[i].measureVectorFiller
+ .fillMeasureVector(measureColumnPages[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
+ }
+ }
+
+ public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
+ for (int i = 0; i < vectorInfos.length; i++) {
+ int offset = vectorInfos[i].offset;
+ int len = offset + vectorInfos[i].size;
+ int vectorOffset = vectorInfos[i].vectorOffset;
+ CarbonColumnVector vector = vectorInfos[i].vector;
+ for (int j = offset; j < len; j++) {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutput = new DataOutputStream(byteStream);
+ try {
+ vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(
+ dimRawColumnChunks,
+ pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
+ dataOutput);
+ Object data = vectorInfos[i].genericQueryType
+ .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+ vector.putObject(vectorOffset++, data);
+ } catch (IOException e) {
+ LOGGER.error(e);
+ } finally {
+ CarbonUtil.closeStreams(dataOutput);
+ CarbonUtil.closeStreams(byteStream);
+ }
+ }
+ }
+ }
+
+ /**
+ * Fill the column data to vector
+ */
+ public void fillColumnarImplicitBatch(ColumnVectorInfo[] vectorInfo) {
+ for (int i = 0; i < vectorInfo.length; i++) {
+ ColumnVectorInfo columnVectorInfo = vectorInfo[i];
+ CarbonColumnVector vector = columnVectorInfo.vector;
+ int offset = columnVectorInfo.offset;
+ int vectorOffset = columnVectorInfo.vectorOffset;
+ int len = offset + columnVectorInfo.size;
+ for (int j = offset; j < len; j++) {
+ // Considering only String case now as we support only
+ String data = getBlockletId();
+ if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
+ .equals(columnVectorInfo.dimension.getColumnName())) {
+ data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter
+ + CarbonCommonConstants.FILE_SEPARATOR + (pageFilteredRowId == null ?
+ j :
+ pageFilteredRowId[pageCounter][j]);
+ }
+ vector.putBytes(vectorOffset++,
+ data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+ }
+ }
+ }
+
+ /**
+ * Just increment the counter incase of query only on measures.
+ */
+ public void incrementCounter() {
+ rowCounter++;
+ currentRow++;
+ }
+
+ /**
+ * Just increment the page counter and reset the remaining counters.
+ */
+ public void incrementPageCounter() {
+ rowCounter = 0;
+ currentRow = -1;
+ pageCounter++;
+ fillDataChunks();
+ if (null != deletedRecordMap) {
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
+ }
+ }
+
+ /**
+ * This case is used only in case of compaction, since it does not use filter flow.
+ */
+ public void fillDataChunks() {
+ freeDataChunkMemory();
+ if (pageCounter >= pageFilteredRowCount.length) {
+ return;
+ }
+ for (int i = 0; i < dimensionColumnPages.length; i++) {
+ if (dimensionColumnPages[i][pageCounter] == null && dimRawColumnChunks[i] != null) {
+ dimensionColumnPages[i][pageCounter] =
+ dimRawColumnChunks[i].convertToDimColDataChunkWithOutCache(pageCounter);
+ }
+ }
+
+ for (int i = 0; i < measureColumnPages.length; i++) {
+ if (measureColumnPages[i][pageCounter] == null && msrRawColumnChunks[i] != null) {
+ measureColumnPages[i][pageCounter] =
+ msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter);
+ }
+ }
+ }
+
+ // free the memory for the last page chunk
+ private void freeDataChunkMemory() {
+ for (int i = 0; i < dimensionColumnPages.length; i++) {
+ if (pageCounter > 0 && dimensionColumnPages[i][pageCounter - 1] != null) {
+ dimensionColumnPages[i][pageCounter - 1].freeMemory();
+ dimensionColumnPages[i][pageCounter - 1] = null;
+ }
+ }
+ for (int i = 0; i < measureColumnPages.length; i++) {
+ if (pageCounter > 0 && measureColumnPages[i][pageCounter - 1] != null) {
+ measureColumnPages[i][pageCounter - 1].freeMemory();
+ measureColumnPages[i][pageCounter - 1] = null;
+ }
+ }
+ }
+
+ public int numberOfpages() {
+ return pageFilteredRowCount.length;
+ }
+
+ /**
+ * Get total rows in the current page
+ *
+ * @return
+ */
+ public int getCurrentPageRowCount() {
+ return pageFilteredRowCount[pageCounter];
+ }
+
+ public int getCurrentPageCounter() {
+ return pageCounter;
+ }
+
+ /**
+ * increment the counter.
+ */
+ public void setRowCounter(int rowCounter) {
+ this.rowCounter = rowCounter;
+ }
+
+ /**
+ * Below method will be used to get the dimension key array
+ * for all the no dictionary dimension present in the query
+ *
+ * @param rowId row number
+ * @return no dictionary keys for all no dictionary dimension
+ */
+ protected byte[][] getNoDictionaryKeyArray(int rowId) {
+ byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnChunkIndexes.length][];
+ int position = 0;
+ for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+ noDictionaryColumnsKeys[position++] =
+ dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter].getChunkData(rowId);
+ }
+ return noDictionaryColumnsKeys;
+ }
+
+ /**
+ * @return blockletId
+ */
+ public String getBlockletId() {
+ return blockletId;
+ }
+
+ /**
+ * Set blocklet id, which looks like
+ * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0"
+ */
+ public void setBlockletId(String blockletId) {
+ this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
+ blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
+ // if deleted recors map is present for this block
+ // then get the first page deleted vo
+ if (null != deletedRecordMap) {
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+ }
+ }
+
+ /**
+ * Below method will be used to get the complex type keys array based
+ * on row id for all the complex type dimension selected in query
+ *
+ * @param rowId row number
+ * @return complex type key array for all the complex dimension selected in query
+ */
+ protected byte[][] getComplexTypeKeyArray(int rowId) {
+ byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][];
+ for (int i = 0; i < complexTypeData.length; i++) {
+ GenericQueryType genericQueryType =
+ complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutput = new DataOutputStream(byteStream);
+ try {
+ genericQueryType
+ .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter,
+ dataOutput);
+ complexTypeData[i] = byteStream.toByteArray();
+ } catch (IOException e) {
+ LOGGER.error(e);
+ } finally {
+ CarbonUtil.closeStreams(dataOutput);
+ CarbonUtil.closeStreams(byteStream);
+ }
+ }
+ return complexTypeData;
+ }
+
+ /**
+ * to check whether any more row is present in the result
+ *
+ * @return
+ */
+ public boolean hasNext() {
+ if (pageCounter
+ < pageFilteredRowCount.length && rowCounter < this.pageFilteredRowCount[pageCounter]) {
+ return true;
+ } else if (pageCounter < pageFilteredRowCount.length) {
+ pageCounter++;
+ fillDataChunks();
+ rowCounter = 0;
+ currentRow = -1;
+ if (null != deletedRecordMap) {
+ currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter);
+ }
+ return hasNext();
+ }
+ return false;
+ }
+
+ /**
+ * Below method will be used to free the occupied memory
+ */
+ public void freeMemory() {
+ // first free the dimension chunks
+ if (null != dimensionColumnPages) {
+ for (int i = 0; i < dimensionColumnPages.length; i++) {
+ if (null != dimensionColumnPages[i]) {
+ for (int j = 0; j < dimensionColumnPages[i].length; j++) {
+ if (null != dimensionColumnPages[i][j]) {
+ dimensionColumnPages[i][j].freeMemory();
+ }
+ }
+ }
+ }
+ }
+ // free the measure data chunks
+ if (null != measureColumnPages) {
+ for (int i = 0; i < measureColumnPages.length; i++) {
+ if (null != measureColumnPages[i]) {
+ for (int j = 0; j < measureColumnPages[i].length; j++) {
+ if (null != measureColumnPages[i][j]) {
+ measureColumnPages[i][j].freeMemory();
+ }
+ }
+ }
+ }
+ }
+ // free the raw chunks
+ if (null != dimRawColumnChunks) {
+ for (int i = 0; i < dimRawColumnChunks.length; i++) {
+ if (null != dimRawColumnChunks[i]) {
+ dimRawColumnChunks[i].freeMemory();
+ }
+ }
+ }
+ }
+
+ /**
+ * @param pageFilteredRowCount set total of number rows valid after scanning
+ */
+ public void setPageFilteredRowCount(int[] pageFilteredRowCount) {
+ this.pageFilteredRowCount = pageFilteredRowCount;
+ }
+
+ /**
+ * After applying filter it will return the bit set with the valid row indexes
+ * so below method will be used to set the row indexes
+ */
+ public void setPageFilteredRowId(int[][] pageFilteredRowId) {
+ this.pageFilteredRowId = pageFilteredRowId;
+ }
+
+ public int getRowCounter() {
+ return rowCounter;
+ }
+
+ /**
+ * will return the current valid row id
+ *
+ * @return valid row id
+ */
+ public abstract int getCurrentRowId();
+
+ /**
+ * @return dictionary key array for all the dictionary dimension
+ * selected in query
+ */
+ public abstract byte[] getDictionaryKeyArray();
+
+ /**
+ * @return dictionary key array for all the dictionary dimension in integer array forat
+ * selected in query
+ */
+ public abstract int[] getDictionaryKeyIntegerArray();
+
+ /**
+ * Below method will be used to get the complex type key array
+ *
+ * @return complex type key array
+ */
+ public abstract byte[][] getComplexTypeKeyArray();
+
+ /**
+ * Below method will be used to get the no dictionary key
+ * array for all the no dictionary dimension selected in query
+ *
+ * @return no dictionary key array for all the no dictionary dimension
+ */
+ public abstract byte[][] getNoDictionaryKeyArray();
+
+ /**
+ * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
+ * @param columnarBatch
+ * @param startRow
+ * @param size
+ * @param vectorOffset
+ */
+ public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size,
+ int vectorOffset) {
+ int rowsFiltered = 0;
+ if (currentDeleteDeltaVo != null) {
+ int len = startRow + size;
+ for (int i = startRow; i < len; i++) {
+ int rowId = pageFilteredRowId != null ? pageFilteredRowId[pageCounter][i] : i;
+ if (currentDeleteDeltaVo.containsRow(rowId)) {
+ columnarBatch.markFiltered(vectorOffset);
+ rowsFiltered++;
+ }
+ vectorOffset++;
+ }
+ }
+ return rowsFiltered;
+ }
+
+ /**
+ * Below method will be used to check row got deleted
+ *
+ * @param rowId
+ * @return is present in deleted row
+ */
+ public boolean containsDeletedRow(int rowId) {
+ if (null != currentDeleteDeltaVo) {
+ return currentDeleteDeltaVo.containsRow(rowId);
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
new file mode 100644
index 0000000..c129161
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.carbondata.common.CarbonIterator;
+
+/**
+ * Below class holds the query result
+ */
+public class RowBatch extends CarbonIterator<Object[]> {
+
+ /**
+ * list of keys
+ */
+ protected List<Object[]> rows;
+
+ /**
+ * counter to check whether all the records are processed or not
+ */
+ protected int counter;
+
+ public RowBatch() {
+ this.rows = new ArrayList<>();
+ }
+
+ /**
+ * Below method will be used to get the rows
+ *
+ * @return
+ */
+ public List<Object[]> getRows() {
+ return rows;
+ }
+
+ /**
+ * Below method will be used to get the set the values
+ *
+ * @param rows
+ */
+ public void setRows(List<Object[]> rows) {
+ this.rows = rows;
+ }
+
+ /**
+ * This method will return one row at a time based on the counter given.
+ * @param counter
+ * @return
+ */
+ public Object[] getRawRow(int counter) {
+ return rows.get(counter);
+ }
+
+ /**
+ * For getting the total size.
+ * @return
+ */
+ public int getSize() {
+ return rows.size();
+ }
+
+
+ /**
+ * Returns {@code true} if the iteration has more elements.
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ @Override public boolean hasNext() {
+ return counter < rows.size();
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ *
+ * @return the next element in the iteration
+ */
+ @Override public Object[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Object[] row = rows.get(counter);
+ counter++;
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 8120310..bcc5634 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -17,7 +17,7 @@
package org.apache.carbondata.core.scan.result.impl;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
* In case of filter query data will be send
* based on filtered row index
*/
-public class FilterQueryScannedResult extends AbstractScannedResult {
+public class FilterQueryScannedResult extends BlockletScannedResult {
public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) {
super(tableBlockExecutionInfos);
@@ -37,7 +37,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
@Override public byte[] getDictionaryKeyArray() {
++currentRow;
- return getDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
+ return getDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
}
/**
@@ -46,7 +46,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
@Override public int[] getDictionaryKeyIntegerArray() {
++currentRow;
- return getDictionaryKeyIntegerArray(rowMapping[pageCounter][currentRow]);
+ return getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
}
/**
@@ -55,7 +55,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return complex type key array
*/
@Override public byte[][] getComplexTypeKeyArray() {
- return getComplexTypeKeyArray(rowMapping[pageCounter][currentRow]);
+ return getComplexTypeKeyArray(pageFilteredRowId[pageCounter][currentRow]);
}
/**
@@ -65,17 +65,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return no dictionary key array for all the no dictionary dimension
*/
@Override public byte[][] getNoDictionaryKeyArray() {
- return getNoDictionaryKeyArray(rowMapping[pageCounter][currentRow]);
- }
-
- /**
- * Below method will be used to get the no dictionary key
- * string array for all the no dictionary dimension selected in query
- *
- * @return no dictionary key array for all the no dictionary dimension
- */
- @Override public String[] getNoDictionaryKeyStringArray() {
- return getNoDictionaryKeyStringArray(rowMapping[pageCounter][currentRow]);
+ return getNoDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
}
/**
@@ -84,7 +74,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
* @return valid row id
*/
@Override public int getCurrentRowId() {
- return rowMapping[pageCounter][currentRow];
+ return pageFilteredRowId[pageCounter][currentRow];
}
/**
@@ -92,10 +82,12 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
int column = 0;
- for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
- column = dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter]
- .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
- columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
+ for (int chunkIndex : this.dictionaryColumnChunkIndexes) {
+ column = dimensionColumnPages[chunkIndex][pageCounter].fillVector(
+ pageFilteredRowId[pageCounter],
+ vectorInfo,
+ column,
+ columnGroupKeyStructureInfo.get(chunkIndex));
}
}
@@ -104,10 +96,12 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
int column = 0;
- for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
- column = dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter]
- .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column,
- columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
+ for (int chunkIndex : this.noDictionaryColumnChunkIndexes) {
+ column = dimensionColumnPages[chunkIndex][pageCounter].fillVector(
+ pageFilteredRowId[pageCounter],
+ vectorInfo,
+ column,
+ columnGroupKeyStructureInfo.get(chunkIndex));
}
}
@@ -116,8 +110,10 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
*/
public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
for (int i = 0; i < measuresOrdinal.length; i++) {
- vectorInfo[i].measureVectorFiller.fillMeasureVectorForFilter(rowMapping[pageCounter],
- measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]);
+ vectorInfo[i].measureVectorFiller.fillMeasureVector(
+ pageFilteredRowId[pageCounter],
+ measureColumnPages[measuresOrdinal[i]][pageCounter],
+ vectorInfo[i]);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index 3978f9e..06687c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -17,14 +17,14 @@
package org.apache.carbondata.core.scan.result.impl;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.BlockletScannedResult;
/**
* Result provide class for non filter query
* In case of no filter query we need to return
* complete data
*/
-public class NonFilterQueryScannedResult extends AbstractScannedResult {
+public class NonFilterQueryScannedResult extends BlockletScannedResult {
public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) {
super(blockExecutionInfo);
@@ -68,16 +68,6 @@ public class NonFilterQueryScannedResult extends AbstractScannedResult {
}
/**
- * Below method will be used to get the no dictionary key
- * string array for all the no dictionary dimension selected in query
- *
- * @return no dictionary key array for all the no dictionary dimension
- */
- @Override public String[] getNoDictionaryKeyStringArray() {
- return getNoDictionaryKeyStringArray(currentRow);
- }
-
- /**
* will return the current valid row id
*
* @return valid row id
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 6172b40..4e628fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -28,18 +28,17 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.DataRefNode;
import org.apache.carbondata.core.datastore.DataRefNodeFinder;
-import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode;
import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.infos.DeleteDeltaInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator;
-import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl;
+import org.apache.carbondata.core.scan.processor.DataBlockIterator;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
@@ -63,23 +62,23 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap =
new ConcurrentHashMap<>();
- protected ExecutorService execService;
+ private ExecutorService execService;
/**
* execution info of the block
*/
- protected List<BlockExecutionInfo> blockExecutionInfos;
+ private List<BlockExecutionInfo> blockExecutionInfos;
/**
* file reader which will be used to execute the query
*/
- protected FileHolder fileReader;
+ protected FileReader fileReader;
- protected AbstractDataBlockIterator dataBlockIterator;
+ DataBlockIterator dataBlockIterator;
/**
* QueryStatisticsRecorder
*/
- protected QueryStatisticsRecorder recorder;
+ private QueryStatisticsRecorder recorder;
/**
* number of cores which can be used
*/
@@ -89,7 +88,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
*/
private QueryStatisticsModel queryStatisticsModel;
- public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+ AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
ExecutorService execService) {
String batchSizeString =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
@@ -107,7 +106,6 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
this.blockExecutionInfos = infos;
this.fileReader = FileFactory.getFileHolder(
FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath()));
- this.fileReader.setQueryId(queryModel.getQueryId());
this.fileReader.setReadPageByPage(queryModel.isReadPageByPage());
this.execService = execService;
intialiseInfos();
@@ -130,22 +128,21 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
blockInfo.setDeletedRecordsMap(deletedRowsMap);
}
DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
- if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
- BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode;
- blockInfo.setFirstDataBlock(wrapper);
- blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
-
+ if (dataRefNode instanceof BlockletDataRefNode) {
+ BlockletDataRefNode node = (BlockletDataRefNode) dataRefNode;
+ blockInfo.setFirstDataBlock(node);
+ blockInfo.setNumberOfBlockToScan(node.numberOfNodes());
} else {
DataRefNode startDataBlock =
finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
- while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
+ while (startDataBlock.nodeIndex() < blockInfo.getStartBlockletIndex()) {
startDataBlock = startDataBlock.getNextDataRefNode();
}
long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
//if number of block is less than 0 then take end block.
if (numberOfBlockToScan <= 0) {
DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey());
- numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ numberOfBlockToScan = endDataBlock.nodeIndex() - startDataBlock.nodeIndex() + 1;
}
blockInfo.setFirstDataBlock(startDataBlock);
blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
@@ -230,7 +227,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
}
}
- @Override public boolean hasNext() {
+ @Override
+ public boolean hasNext() {
if ((dataBlockIterator != null && dataBlockIterator.hasNext())) {
return true;
} else if (blockExecutionInfos.size() > 0) {
@@ -240,7 +238,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
}
}
- protected void updateDataBlockIterator() {
+ void updateDataBlockIterator() {
if (dataBlockIterator == null || !dataBlockIterator.hasNext()) {
dataBlockIterator = getDataBlockIterator();
while (dataBlockIterator != null && !dataBlockIterator.hasNext()) {
@@ -249,17 +247,17 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
}
}
- private DataBlockIteratorImpl getDataBlockIterator() {
+ private DataBlockIterator getDataBlockIterator() {
if (blockExecutionInfos.size() > 0) {
BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
blockExecutionInfos.remove(executionInfo);
- return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel,
+ return new DataBlockIterator(executionInfo, fileReader, batchSize, queryStatisticsModel,
execService);
}
return null;
}
- protected void initQueryStatiticsModel() {
+ private void initQueryStatiticsModel() {
this.queryStatisticsModel = new QueryStatisticsModel();
this.queryStatisticsModel.setRecorder(recorder);
QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
index 1efac30..1235789 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java
@@ -18,7 +18,7 @@
package org.apache.carbondata.core.scan.result.iterator;
import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
/**
* Iterator over row result
@@ -28,14 +28,14 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> {
/**
* iterator over chunk result
*/
- private CarbonIterator<BatchResult> iterator;
+ private CarbonIterator<RowBatch> iterator;
/**
* currect chunk
*/
- private BatchResult currentchunk;
+ private RowBatch currentchunk;
- public ChunkRowIterator(CarbonIterator<BatchResult> iterator) {
+ public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
this.iterator = iterator;
if (iterator.hasNext()) {
currentchunk = iterator.next();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e3077c4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index 747f5a9..c073c78 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -21,14 +21,14 @@ import java.util.concurrent.ExecutorService;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
/**
* In case of detail query we cannot keep all the records in memory so for
* executing that query are returning a iterator over block and every time next
* call will come it will execute the block and return the result
*/
-public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> {
+public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<RowBatch> {
private final Object lock = new Object();
@@ -37,18 +37,18 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
super(infos, queryModel, execService);
}
- @Override public BatchResult next() {
+ @Override public RowBatch next() {
return getBatchResult();
}
- private BatchResult getBatchResult() {
- BatchResult batchResult = new BatchResult();
+ private RowBatch getBatchResult() {
+ RowBatch rowBatch = new RowBatch();
synchronized (lock) {
updateDataBlockIterator();
if (dataBlockIterator != null) {
- batchResult.setRows(dataBlockIterator.next());
+ rowBatch.setRows(dataBlockIterator.next());
}
}
- return batchResult;
+ return rowBatch;
}
}